egg集成消息队列遇到的问题
发布于 5 年前 作者 acodercat 5527 次浏览 来自 问答

在用egg集成rabbitmq的时候,我希望使用egg自带的Subscription抽象来实现。首先我在agent进程里面创建链接和信道。 在agent.js里用如下代码创建链接和信道。并且消费队列。

module.exports = agent => {
    amqp(agent);
    class AMQPScheduleStrategy extends agent.ScheduleStrategy {
        async start() {
            // const channel = await this.agent.amqp.channel;
            const client = await amqplib.connect('amqp://admin:adminglzt@192.168.1.115');
            const channel = await client.createChannel();
            channel.assertQueue(this.schedule.queue);
            channel.consume(this.schedule.queue, (msg) => {
                // this.agent.amqp.channel.ack(msg);
                this.sendOne(msg);
            });
            // channel.close();
        }
    }
    agent.schedule.use('amqp', AMQPScheduleStrategy);
};

有一个致命的问题就是在worker进程里面需要对消息进行确认,那么这个确认需要channel实例,但是这个对象是没办法传递到worker进程里面的

6 回复

@atian25 确认消息需要这样

await channel.ack(msg);

为什么要在 worker 里面确认?如何确认? worker 是服务用户请求的。

@atian25 当消息收到以后,确认被正确处理之后就告诉rabbitmq,这样就不会再次收到这条数据了。我在Subscription的subscribe中确认,因为在这里处理消息

const Subscription = require('egg').Subscription;

class Tasks extends Subscription {
  static get schedule() {
    return {
      type: 'amqp',
      queue: 'asdfg',
      // env: ['prod'],
      // disable: true,
    };
  }

  async subscribe(msg) {
  // 这里确认
    await this.app.amqp.channel.ack(msg);
    console.log(msg);
  }
}

module.exports = Tasks;

schedule 这边做完后,再发一条消息给 agent 那边去确认?

@atian25 这个办法可以,通过sendToAgent通知agent来做消息确认操作。

回到顶部