求助: activemq中的topic,有时会丢失,消息接收端收不到消息
发布于 5 年前 作者 zhilongyan 4439 次浏览 来自 问答

业务逻辑

一共3个项目.其中一个是和wx相关的.他在收到wx发过来的支付成功消息之后,将信息广播出去,另外两个项目监听这个topic,去做对应的处理.

遇到的问题:

另外2个项目会出现接收不到topic的情况.但是不是一直没有,是偶尔出现. 并且这两个项目丢失的情况不是同一个topic丢失了,而是有的topic一个项目监听到了,另外一个没有收到,每个项目都有大约1/5的几率收不到信息,

这种的不知道该如何下手去处理.

/**
 * activemq
 *
 * [stompit](https://github.com/gdaws/node-stomp)
 */
"use strict";

const {activemq} = require('../config/settings').mq;

const stompit = require('stompit');

const connectOptions = {
  'host'          : activemq.host,
  'port'          : 61613,
  'connectHeaders': {
    'host'      : '/',
    'login'     : activemq.user,
    'passcode'  : activemq.passwd,
    'heart-beat': '5000,5000'
  }
};

const connectionManager = new stompit.ConnectFailover();

connectionManager.addServer(connectOptions);

// alwarsConnected:
const channel = new stompit.Channel(connectionManager);  // 使用这种的会自动重连

// 将任务添加到队列中
exports.publishQueue = publishQueue;

function publishQueue(name, message) {
  const sendHeaders = {
    'destination' : '/queue/' + name,
    'content-type': 'text/plain'
  };
  
  log.silly('添加任务: ', name);
  channel.send(sendHeaders, message, function (err) {
    log.silly('添加任务成功: ', name);
    if (err) {
      log.error('amq -> publishQueue: ' + err)
    }
  });
}

// 发送广播
exports.publishTopic = publishTopic;

function publishTopic(name, message) {
  
  const sendHeaders = {
    'destination' : '/topic/' + name,
    'content-type': 'text/plain'
  };
  
  channel.send(sendHeaders, message, function (err) {
    log.silly('发送广播成功:', name, message);
    if (err) {
      log.error('amq -> publishTopic: ' + err)
    }
  });
}

// 接受广播
exports.subscribeTopic = subscribeTopic;

async function subscribeTopic(name, cb) {
  
  const subscribeHeaders = {
    'destination': '/topic/' + name,
    'ack'        : 'client-individual'
  };
  
  channel.subscribe(subscribeHeaders, function (error, message) {
    log.silly('接收广播成功: ', name);
    if (error) {
      return log.error('activemq, subscribeTopic:', error);
    }
    message.readString('utf-8', function (error, body) {
      if (error) {
        return log.error('activemq, subscribeTopic: readString: ', error);
      }
      // channel.ack(message);
      typeof cb === 'function' && cb(body);
      channel.ack(message);
    });
  })
}

// 读取任务
exports.subscribeQueue = subscribeQueue;

async function subscribeQueue(name, cb) {
  const subscribeHeaders = {
    'destination': '/queue/' + name,
    'ack'        : 'client-individual'
  };
  
  channel.subscribe(subscribeHeaders, function (error, message) {
    log.silly('读取任务成功: ', name);
    if (error) {
      return log.error('activemq, subscribeQueue:', error, message);
    }
    message.readString('utf-8', function (error, body) {
      if (error) {
        return log.error('activemq, subscribeQueue readString:', error, body);
      }
      // channel.ack(message);
      typeof cb === 'function' && cb(body, channel, message);
      // channel.ack(message);  // 这个在每个处理完队列中的任务的时候都必须执行,否则系统会认为这个任务没有执行完成,下次重启的时候会重新读出来,并且堆积的也有数量限制
    });
  })
}



回到顶部