求助: activemq中的topic,有时会丢失,消息接收端收不到消息
业务逻辑
一共3个项目.其中一个是和wx相关的.他在收到wx发过来的支付成功消息之后,将信息广播出去,另外两个项目监听这个topic,去做对应的处理.
遇到的问题:
另外2个项目会出现接收不到topic的情况.但是不是一直没有,是偶尔出现. 并且这两个项目丢失的情况不是同一个topic丢失了,而是有的topic一个项目监听到了,另外一个没有收到,每个项目都有大约1/5的几率收不到信息,
这种的不知道该如何下手去处理.
/**
* activemq
*
* [stompit](https://github.com/gdaws/node-stomp)
*/
"use strict";
const {activemq} = require('../config/settings').mq;
const stompit = require('stompit');
const connectOptions = {
'host' : activemq.host,
'port' : 61613,
'connectHeaders': {
'host' : '/',
'login' : activemq.user,
'passcode' : activemq.passwd,
'heart-beat': '5000,5000'
}
};
const connectionManager = new stompit.ConnectFailover();
connectionManager.addServer(connectOptions);
// alwarsConnected:
const channel = new stompit.Channel(connectionManager); // 使用这种的会自动重连
// 将任务添加到队列中
exports.publishQueue = publishQueue;
function publishQueue(name, message) {
const sendHeaders = {
'destination' : '/queue/' + name,
'content-type': 'text/plain'
};
log.silly('添加任务: ', name);
channel.send(sendHeaders, message, function (err) {
log.silly('添加任务成功: ', name);
if (err) {
log.error('amq -> publishQueue: ' + err)
}
});
}
// 发送广播
exports.publishTopic = publishTopic;
function publishTopic(name, message) {
const sendHeaders = {
'destination' : '/topic/' + name,
'content-type': 'text/plain'
};
channel.send(sendHeaders, message, function (err) {
log.silly('发送广播成功:', name, message);
if (err) {
log.error('amq -> publishTopic: ' + err)
}
});
}
// 接受广播
exports.subscribeTopic = subscribeTopic;
async function subscribeTopic(name, cb) {
const subscribeHeaders = {
'destination': '/topic/' + name,
'ack' : 'client-individual'
};
channel.subscribe(subscribeHeaders, function (error, message) {
log.silly('接收广播成功: ', name);
if (error) {
return log.error('activemq, subscribeTopic:', error);
}
message.readString('utf-8', function (error, body) {
if (error) {
return log.error('activemq, subscribeTopic: readString: ', error);
}
// channel.ack(message);
typeof cb === 'function' && cb(body);
channel.ack(message);
});
})
}
// 读取任务
exports.subscribeQueue = subscribeQueue;
async function subscribeQueue(name, cb) {
const subscribeHeaders = {
'destination': '/queue/' + name,
'ack' : 'client-individual'
};
channel.subscribe(subscribeHeaders, function (error, message) {
log.silly('读取任务成功: ', name);
if (error) {
return log.error('activemq, subscribeQueue:', error, message);
}
message.readString('utf-8', function (error, body) {
if (error) {
return log.error('activemq, subscribeQueue readString:', error, body);
}
// channel.ack(message);
typeof cb === 'function' && cb(body, channel, message);
// channel.ack(message); // 这个在每个处理完队列中的任务的时候都必须执行,否则系统会认为这个任务没有执行完成,下次重启的时候会重新读出来,并且堆积的也有数量限制
});
})
}