egg集成消息队列遇到的问题
在用egg集成rabbitmq的时候,我希望使用egg自带的Subscription抽象来实现。首先我在agent进程里面创建链接和信道。 在agent.js里用如下代码创建链接和信道。并且消费队列。
module.exports = agent => {
amqp(agent);
class AMQPScheduleStrategy extends agent.ScheduleStrategy {
async start() {
// const channel = await this.agent.amqp.channel;
const client = await amqplib.connect('amqp://admin:adminglzt@192.168.1.115');
const channel = await client.createChannel();
channel.assertQueue(this.schedule.queue);
channel.consume(this.schedule.queue, (msg) => {
// this.agent.amqp.channel.ack(msg);
this.sendOne(msg);
});
// channel.close();
}
}
agent.schedule.use('amqp', AMQPScheduleStrategy);
};
有一个致命的问题就是在worker进程里面需要对消息进行确认,那么这个确认需要channel
实例,但是这个对象是没办法传递到worker进程里面的
6 回复
@atian25 求解
@atian25 确认消息需要这样
await channel.ack(msg);
为什么要在 worker 里面确认?如何确认? worker 是服务用户请求的。
@atian25 当消息收到以后,确认被正确处理之后就告诉rabbitmq,这样就不会再次收到这条数据了。我在Subscription的subscribe中确认,因为在这里处理消息
const Subscription = require('egg').Subscription;
class Tasks extends Subscription {
static get schedule() {
return {
type: 'amqp',
queue: 'asdfg',
// env: ['prod'],
// disable: true,
};
}
async subscribe(msg) {
// 这里确认
await this.app.amqp.channel.ack(msg);
console.log(msg);
}
}
module.exports = Tasks;
schedule 这边做完后,再发一条消息给 agent 那边去确认?
@atian25 这个办法可以,通过sendToAgent通知agent来做消息确认操作。