[egg.js] 扩展定时任务与阿里云消息服务MNS长轮询
发布于 6 年前 作者 jerryhu 4128 次浏览 来自 问答

目前的项目是一个App应用,后端API采用egg.js框架。 现在有这样一个需求

业务场景:

  • API服务发送消息到阿里云消息服务 (MNS)
  • 需要一个服务,获取阿里云消息服务(MNS)队列的消息,然后进行相关的业务处理
  • 需要多个服务处理不同的业务 (比如发送短信,帖子新回复的提醒)

考虑用egg.js框架的扩展定时任务+阿里云消息服务(MNS)长轮询

代码示例: https://github.com/jerryhu/egg-mns-long-polling-experiment

task部分的代码如下:

'use strict';
module.exports = {
  schedule: {
    type: 'mns',
  },
  async task(ctx, message) {
    console.log(`task1: ${message}`);

    const condition = true;
    while (condition) {
      try {
        // 获取消息
        const resp = await ctx.app.alicloudMns.receiveMessage('test-post-reminder-queue', 25);
        const data = Buffer.from(resp.body.MessageBody, 'base64').toString('utf8');
        console.log(`data: ${data}`);

        // TODO: 新评论提醒

        // 删除消息
        await ctx.app.alicloudMns.deleteMessage('test-post-reminder-queue', resp.body.ReceiptHandle);
      } catch (err) {
        if (err.name === 'MNSMessageNotExistError') {
          console.log(`没有消息: ${new Date()}`);
          continue;
        }

        console.error(`获取消息出错 ${err.stack}`);
        break;
      }
    }
  },
};

问题:

  1. 扩展定时任务中长时间执行任务是否可行?
  2. 有没有更优的解决方案(nodejs后端服务应用+阿里云MNS长轮询)?
11 回复

这代码有问题吧,一会cpu就飙满了,node不太适合这么用,阿里云MNS没提供subscribe机制吗?感觉还是换个思路 这种也比while(true)好吧

	module.exports = {
		  schedule: {
			  type: 'mns',
		  },
	  
		  async handle(ctx, message) {
			  console.log(`task1: ${message}`);
	  
			  try {
				  // 获取消息
				  const resp = await ctx.app.alicloudMns.receiveMessage('test-post-reminder-queue', 25);
				  const data = Buffer.from(resp.body.MessageBody, 'base64').toString('utf8');
				  console.log(`data: ${data}`);
	  
				  // TODO: 新评论提醒
	  
				  // 删除消息
				  await ctx.app.alicloudMns.deleteMessage('test-post-reminder-queue', resp.body.ReceiptHandle);
			  } catch (err) {
				  if (err.name !== 'MNSMessageNotExistError') {
					  console.error(`获取消息出错 ${err.stack}`);
					  return;
				  }
			  }
			  setTimeout(() => {
				  this.handle(ctx);
			  }, 0);
		  },
	  
		  async task(ctx, message) {
			  this.handle(ctx, message);
		  },
	  }
	  
  • 阿里云MNS目前好像没有subscribe机制
  • 使用递归跟while(true)本质上没区别吧
  • CPU是否飙满要看队列的消息量吧? (如果每分钟最多几百个消息,是否会有这问题)

我没有用 while true 用的是递归,目前跑了一年多 是没有问题的 一次执行完成了 才执行下一次的拉取

@jerryhu js是单线程运行的,while(true)会把cpu占满,回调不能执行

@carlisliu 我看了一些文章, while(true)有时候确实会导致事件堵塞 例如以下代码

main();

function main() {
  let open = false;

  setTimeout(function() {
    open = true;
  }, 1000);

  while (!open) {
    console.log('wait');
  }

  console.log('open sesame');
}

但是我如果修改下代码,就没有了堵塞这个问题

main();

async function main() {
  let open = false;

  setTimeout(function() {
    open = true;
  }, 1000);

  while (!open) {
    await sleep(0);
    console.log('wait');
  }

  console.log('open sesame');
}

async function sleep(ms) {
  return new Promise(resolve => {
    setTimeout(resolve, ms);
  });
}

参考了cheerego的做法,我把while(true)改为了递归调用

'use strict';

const Subscription = require('egg').Subscription;

class Task1 extends Subscription {
  static get schedule() {
    return {
      type: 'mns',
    };
  }

  async subscribe() {
    this.logger.info('task1任务开始');

    await this.processMessage();
  }

  // 消息处理
  async processMessage() {
    const queueName = 'test-post-reminder-queue';

    try {
      // 获取消息
      const resp = await this.app.alicloudMns.receiveMessage(queueName, 25);
      const data = Buffer.from(resp.body.MessageBody, 'base64').toString('utf8');
      console.log(`data: ${data}`);

      // TODO: 新评论提醒

      // 删除消息
      await this.app.alicloudMns.deleteMessage(queueName, resp.body.ReceiptHandle);
    } catch (err) {
      if (err.name === 'MNSMessageNotExistError') {
        console.log(`没有消息: ${new Date()}`);
      } else {
        console.error(`获取消息出错 ${err.stack}`);
        return;
      }
    }

    // 递归调用: 消息处理
    await this.processMessage();
  }

}

module.exports = Task1;

是不是阿里云的MQ比MNS更合适

egg 的定时任务可以指定间隔时间吧。用框架提供的功能来控制频度是否更好?

这样如何:

import { defer, interval, of } from 'rxjs'
import {
  catchError,
  concatMap,
  delay,
  map,
  mapTo,
  mergeMap,
  retry,
  shareReplay,
  take,
  timeout
} from 'rxjs/operators'

module.exports = {
  schedule: {
    type: 'mns',
  },

  handle(ctx, message) {
    console.log(`task1: ${message}`);

    // 获取消息
    const body$ = defer(() => ctx.app.alicloudMns.receiveMessage('test-post-reminder-queue', 25)).pipe(
      map(resp => {
        return {
          msgBody: resp.body.MessageBody,
          handle: resp.body.ReceiptHandle,
        }),
      }
      timeout(3000), 		// 请求超时时间 3 秒
      retry(2),					// 当超时或者请求异常 可最多重试 2 次 (总共 3 次请求)
      shareReplay(1),
    )
    // 执行操作
    const msgBody$ = body$.pipe(
      map(body => body.msgBody),
      map(msgBody => {
        return Buffer.from(msgBody, 'base64').toString('utf8')
      }),
      concatMap(buf => {
        // TODO: 新评论提醒
      }),
    )
    // 清理操作
    const handle$ = body$.pipe(
      map(body => body.handle),
      mergeMap(handle => {
        return ctx.app.alicloudMns.deleteMessage('test-post-reminder-queue', handle)
      }),
    )

    const ret$ = interval(1000).pipe(// 每秒钟触发一个事件
      concatMap(() => body$),
      concatMap(() => {
        return concat(msgBody$, handle$).pipe(
          mapTo(null),
        )
      }),
      catchError(err => {
        if (err.name !== 'MNSMessageNotExistError') {
          console.error(`获取消息出错 ${err.stack}`)
          throw err // 退出循环
        }
        return of(null) // 继续循环
      }),
    )
  },

  task(ctx, message) {
    return this.handle(ctx, message)
      .catch(console.error)
  },	
}

@464085647 理论上MQ比MNS更合适。 但是在项目中,考虑到以下两点,我选择了MNS

  • MNS费用更低
  • MNS产品更成熟,接入更简单 (可能是因为我不熟悉消息队列RocketMQ)

@waitingsong

  1. 时间间隔的定时任务也是可行的。但这种方式需要一些处理,保证同一时间只有一个任务在执行
  2. RxJS/Observable 我不熟悉,所以不能确定你的代码是否可行。代码中的timeout(3000)会不会有问题?长轮询会等待25秒时间

@jerryhu timeout(3000) 这个只是个例子,自己根据实际情况修改超时值甚至不用 timeout() 控制就行了

回到顶部