目前的项目是一个App应用,后端API采用egg.js框架。 现在有这样一个需求
业务场景:
- API服务发送消息到阿里云消息服务 (MNS)
- 需要一个服务,获取阿里云消息服务(MNS)队列的消息,然后进行相关的业务处理
- 需要多个服务处理不同的业务 (比如发送短信,帖子新回复的提醒)
考虑用egg.js框架的扩展定时任务+阿里云消息服务(MNS)长轮询
代码示例: https://github.com/jerryhu/egg-mns-long-polling-experiment
task部分的代码如下:
'use strict';
module.exports = {
schedule: {
type: 'mns',
},
async task(ctx, message) {
console.log(`task1: ${message}`);
const condition = true;
while (condition) {
try {
// 获取消息
const resp = await ctx.app.alicloudMns.receiveMessage('test-post-reminder-queue', 25);
const data = Buffer.from(resp.body.MessageBody, 'base64').toString('utf8');
console.log(`data: ${data}`);
// TODO: 新评论提醒
// 删除消息
await ctx.app.alicloudMns.deleteMessage('test-post-reminder-queue', resp.body.ReceiptHandle);
} catch (err) {
if (err.name === 'MNSMessageNotExistError') {
console.log(`没有消息: ${new Date()}`);
continue;
}
console.error(`获取消息出错 ${err.stack}`);
break;
}
}
},
};
问题:
- 扩展定时任务中长时间执行任务是否可行?
- 有没有更优的解决方案(nodejs后端服务应用+阿里云MNS长轮询)?
这代码有问题吧,一会cpu就飙满了,node不太适合这么用,阿里云MNS没提供subscribe机制吗?感觉还是换个思路 这种也比while(true)好吧
module.exports = {
schedule: {
type: 'mns',
},
async handle(ctx, message) {
console.log(`task1: ${message}`);
try {
// 获取消息
const resp = await ctx.app.alicloudMns.receiveMessage('test-post-reminder-queue', 25);
const data = Buffer.from(resp.body.MessageBody, 'base64').toString('utf8');
console.log(`data: ${data}`);
// TODO: 新评论提醒
// 删除消息
await ctx.app.alicloudMns.deleteMessage('test-post-reminder-queue', resp.body.ReceiptHandle);
} catch (err) {
if (err.name !== 'MNSMessageNotExistError') {
console.error(`获取消息出错 ${err.stack}`);
return;
}
}
setTimeout(() => {
this.handle(ctx);
}, 0);
},
async task(ctx, message) {
this.handle(ctx, message);
},
}
- 阿里云MNS目前好像没有subscribe机制
- 使用递归跟while(true)本质上没区别吧
- CPU是否飙满要看队列的消息量吧? (如果每分钟最多几百个消息,是否会有这问题)
我没有用 while true 用的是递归,目前跑了一年多 是没有问题的 一次执行完成了 才执行下一次的拉取
@jerryhu js是单线程运行的,while(true)会把cpu占满,回调不能执行
@carlisliu 我看了一些文章, while(true)有时候确实会导致事件堵塞 例如以下代码
main();
function main() {
let open = false;
setTimeout(function() {
open = true;
}, 1000);
while (!open) {
console.log('wait');
}
console.log('open sesame');
}
但是我如果修改下代码,就没有了堵塞这个问题
main();
async function main() {
let open = false;
setTimeout(function() {
open = true;
}, 1000);
while (!open) {
await sleep(0);
console.log('wait');
}
console.log('open sesame');
}
async function sleep(ms) {
return new Promise(resolve => {
setTimeout(resolve, ms);
});
}
参考了cheerego的做法,我把while(true)改为了递归调用
'use strict';
const Subscription = require('egg').Subscription;
class Task1 extends Subscription {
static get schedule() {
return {
type: 'mns',
};
}
async subscribe() {
this.logger.info('task1任务开始');
await this.processMessage();
}
// 消息处理
async processMessage() {
const queueName = 'test-post-reminder-queue';
try {
// 获取消息
const resp = await this.app.alicloudMns.receiveMessage(queueName, 25);
const data = Buffer.from(resp.body.MessageBody, 'base64').toString('utf8');
console.log(`data: ${data}`);
// TODO: 新评论提醒
// 删除消息
await this.app.alicloudMns.deleteMessage(queueName, resp.body.ReceiptHandle);
} catch (err) {
if (err.name === 'MNSMessageNotExistError') {
console.log(`没有消息: ${new Date()}`);
} else {
console.error(`获取消息出错 ${err.stack}`);
return;
}
}
// 递归调用: 消息处理
await this.processMessage();
}
}
module.exports = Task1;
是不是阿里云的MQ比MNS更合适
egg 的定时任务可以指定间隔时间吧。用框架提供的功能来控制频度是否更好?
这样如何:
import { defer, interval, of } from 'rxjs'
import {
catchError,
concatMap,
delay,
map,
mapTo,
mergeMap,
retry,
shareReplay,
take,
timeout
} from 'rxjs/operators'
module.exports = {
schedule: {
type: 'mns',
},
handle(ctx, message) {
console.log(`task1: ${message}`);
// 获取消息
const body$ = defer(() => ctx.app.alicloudMns.receiveMessage('test-post-reminder-queue', 25)).pipe(
map(resp => {
return {
msgBody: resp.body.MessageBody,
handle: resp.body.ReceiptHandle,
}),
}
timeout(3000), // 请求超时时间 3 秒
retry(2), // 当超时或者请求异常 可最多重试 2 次 (总共 3 次请求)
shareReplay(1),
)
// 执行操作
const msgBody$ = body$.pipe(
map(body => body.msgBody),
map(msgBody => {
return Buffer.from(msgBody, 'base64').toString('utf8')
}),
concatMap(buf => {
// TODO: 新评论提醒
}),
)
// 清理操作
const handle$ = body$.pipe(
map(body => body.handle),
mergeMap(handle => {
return ctx.app.alicloudMns.deleteMessage('test-post-reminder-queue', handle)
}),
)
const ret$ = interval(1000).pipe(// 每秒钟触发一个事件
concatMap(() => body$),
concatMap(() => {
return concat(msgBody$, handle$).pipe(
mapTo(null),
)
}),
catchError(err => {
if (err.name !== 'MNSMessageNotExistError') {
console.error(`获取消息出错 ${err.stack}`)
throw err // 退出循环
}
return of(null) // 继续循环
}),
)
},
task(ctx, message) {
return this.handle(ctx, message)
.catch(console.error)
},
}
@464085647 理论上MQ比MNS更合适。 但是在项目中,考虑到以下两点,我选择了MNS
- MNS费用更低
- MNS产品更成熟,接入更简单 (可能是因为我不熟悉消息队列RocketMQ)
- 时间间隔的定时任务也是可行的。但这种方式需要一些处理,保证同一时间只有一个任务在执行
- RxJS/Observable 我不熟悉,所以不能确定你的代码是否可行。代码中的timeout(3000)会不会有问题?长轮询会等待25秒时间
@jerryhu timeout(3000) 这个只是个例子,自己根据实际情况修改超时值甚至不用 timeout() 控制就行了