Node.js + Redis Sorted Set 任务队列
发布于 8 年前 作者 DuanPengfei 10714 次浏览 来自 分享

需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" },这样就需要间隔一段时间后再去调用第三方 API 获取数据。为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求。然后定时从任务队列里中取出任务调用第三方 API,若返回状态为”异步处理中“,将该任务再次加入任务队列,若返回状态为”已处理完毕“,将返回数据入库。

根据以上问题,想到使用 Node.js + Redis sorted set 来实现任务队列。Node.js 实现自身应用 API 用来接受用户请求,合并数据库已存数据与 API 返回的部分数据返回给用户,并将任务加入到任务队列中。利用 Node.js child process 与 cron 定时从任务队列中取出任务执行。

在设计任务队列的过程中需要考虑到的几个问题

  1. 并行执行多个任务
  2. 任务唯一性
  3. 任务成功或失败后的处理

针对以上问题的解决方案

  1. 并行执行多个任务利用 Promise.all 来实现
  2. 任务唯一性利用 Redis sorted set 来实现。使用时间戳作为分值可以实现将 sorted set 作为 list 来使用,在加入任务时判断任务是否已经存在,在取出任务执行时将该任务分值设置为 0,每次取出分值大于 0 的任务来执行,可以避免重复执行任务。
  3. 执行任务成功后删除任务,执行任务失败后将任务分值更新为当前时间时间戳,这样就可以将失败的任务重新加入任务队列尾部

示例代码

// remote_api.js 模拟第三方 API
'use strict';

const app = require('express')();

app.get('/', (req, res) => {
    setTimeout(() => {
        let arr = [200, 300];  // 200 代表成功,300 代表失败需要重新请求
        res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });
    }, 3000);
});

app.listen('9001', () => {
    console.log('API 服务监听端口:9001');
});

// producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列
'use strict';

const app = require('express')();
const redisClient = require('redis').createClient();

const QUEUE_NAME = 'queue:example';

function addTaskToQueue(taskName, callback) {
    // 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列
    redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {
        if (error) {
            callback(error);
        } else {
            if (task) {
                console.log('任务已存在,不新增相同任务');
                callback(null, task);
            } else {
                redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
                    if (error) {
                        callback(error);
                    } else {
                        callback(null, result);
                    }
                });
            }
        }
    });
}

app.get('/', (req, res) => {
    let taskName = req.query['task-name'];
    addTaskToQueue(taskName, (error, result) => {
        if (error) {
            console.log(error);
        } else {
            res.status(200).send('正在查询中......');
        }
    });
});

app.listen(9002, () => {
    console.log('生产者服务监听端口:9002');
});
// consumer.js 定时获取任务并执行
'use strict';

const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule');

const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2;  // 并行执行任务数量

function getTasksFromQueue(callback) {
    // 获取多个任务
    redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {
        if (error) {
            callback(error);
        } else {
            // 将任务分值设置为 0,表示正在处理
            if (tasks.length > 0) {
                let tmp = [];
                tasks.forEach((task) => {
                    tmp.push(0);
                    tmp.push(task);
                });
                redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {
                    if (error) {
                        callback(error);
                    } else {
                        callback(null, tasks)
                    }
                });
            }
        }
    });
}

function addFailedTaskToQueue(taskName, callback) {
    redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
        if (error) {
            callback(error);
        } else {
            callback(null, result);
        }
    });
}

function removeSucceedTaskFromQueue(taskName, callback) {
    redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {
        if (error) {
            callback(error);
        } else {
            callback(null, result);
        }
    })
}

function execTask(taskName) {
    return new Promise((resolve, reject) => {
        let requestOptions = {
            'url': 'http://127.0.0.1:9001',
            'method': 'GET',
            'timeout': 5000
        };
        request(requestOptions, (error, response, body) => {
            if (error) {
                resolve('failed');
                console.log(error);
                addFailedTaskToQueue(taskName, (error) => {
                    if (error) {
                        console.log(error);
                    } else {

                    }
                });
            } else {
                try {
                    body = typeof body !== 'object' ? JSON.parse(body) : body;
                } catch (error) {
                    resolve('failed');
                    console.log(error);
                    addFailedTaskToQueue(taskName, (error, result) => {
                        if (error) {
                            console.log(error);
                        } else {

                        }
                    });
                    return;
                }
                if (body.status !== 200) {
                    resolve('failed');
                    addFailedTaskToQueue(taskName, (error, result) => {
                        if (error) {
                            console.log(error);
                        } else {

                        }
                    });
                } else {
                    resolve('succeed');
                    removeSucceedTaskFromQueue(taskName, (error, result) => {
                        if (error) {
                            console.log(error);
                        } else {

                        }
                    });
                }
            }
        });
    });
}

// 定时,每隔 5 秒获取新的任务来执行
let job = schedule.scheduleJob('*/5 * * * * *', () => {
    console.log('获取新任务');
    getTasksFromQueue((error, tasks) => {
        if (error) {
            console.log(error);
        } else {
            if (tasks.length > 0) {
                console.log(tasks);

                Promise.all(tasks.map(execTask))
                .then((results) => {
                    console.log(results);
                })
                .catch((error) => {
                    console.log(error);
                });
                
            }
        }
    });
});
20 回复

// 更新: 咦,我楼上的呢,怎么没了。。。。原帖如下:

不同意楼上,写 Node 为什么不能缩进 4 格,再就是 if else 怎么了。。很清晰啊。。

谢谢楼主

一般简单的用 list RPUSH + BLPOP 就好啦

@magicdawn 是的,一般情况下可以这样的,我这边的需求是,取出任务时不能移除任务,而是要将任务设置为正在执行状态,避免同一任务执行很多次。如果使用 List,当任务取出时,有相同的新任务进来又会加入队列,又重新执行了一次。我也没有想到什么更好的解决方式,就使用了 Sorted Set 实现了。

@HugoJing 这几天忙着写代码,都没来看,好像错过了什么。

@magicdawn list rpush 这个方案。两个问题,1 是重复任务会被添加 2 是当 node.js 抛错之后,取出的任务在数据库就直接消失了,很不严谨的做法。 @DuanPengfei https://zhuanlan.zhihu.com/p/20293493 《一个简单的 mysql 队列问题》 之前类似的问题我是这么做的。不过这并不是一种推荐的做法,只是我当时是这么做的。

你提的

1. 并行执行多个任务
2. 任务唯一性
3. 任务成功或失败后的处理

三点,我觉得最重要的就是第 2 点。怎么让任务不被 worker 重复拿到。以及还要加上第 4 点,按已被执行的时间排序从而雨露均沾。 第 4 点你是通过 score 做到的。

@DuanPengfei 话说你的方案也存在的问题是:当 node.js 抛错之后,取出的任务在数据库就直接消失了。因为它的 score 为 0 之后,就没人帮它恢复了

??

关于上述提到的执行时间排序的问题,1 是要每次cron运行后,更新任务的时间值 2 是需要恢复过期太久的任务

用 redis 的 score 的话,第2点不容易做到吧?

你现在的架构之上,有两点需要改进地方:

  1. 保证任务被唯一的 worker 拿到
  2. worker 挂逼之后,任务不会被一直搁置。

首先,你需要引入一个 touch time 的概念,这个概念就是说,一个任务被 touch 了,那么别的任务需要避开他。任务每次被 touch 的时间,都通过时间来记录一下。

解决 1 问题,引入 redis 的事务机制貌似也不行,我指的 multi exec 之类的。现在你的做法,zrangebyscore 和 zadd 之间有时间差,没法保证两个操作组合起来的原子性。还是可能多个worker拿到同一批任务。而如果用了 multi 的话,你没法知道自己需要 zadd 的对象是哪些。 解决 2 问题,开始引入 touch time 的概念。每个 worker 拿到任务之后,对任务的 score 设成当前。而取任务时,设置一个

zrangebyscore([QUEUE_NAME, 1, new Date().getTime() - 5 * 1000 这样的时间差。

要保证【保证任务被唯一的 worker 拿到】,就得使得 任务获得+任务touchtime更新 这两个操作被原子性的完成。抽象来说,更新+获取 需要原子性完成。 这一点 redis 应该是做不到了,mysql 能做到。

再回头看这个问题,如果我来做。无论量大量小,我还是觉得我文章里面的方式更好。。。。

redis 除非一个 worker 对应一个QUEUE_NAME,否则做不到原子性。

但 mysql 当量大的时候,也存在竞争的问题,不过我的方案已经是行级锁了,每秒 1w qps 轻松能上去。如果高于这个 qps 的话,获取我也要分成不同的 mysql 表或库,也就是进行水平拆分。

@magicdawn 如果可以容忍数据丢失的话,感觉 list 这个方案还行

关注

来自酷炫的 CNodeMD

既然用了redis, 为什么不用sub和pub

@alsotang 一针见血,这两个问题当时在设计时就是存在的,后来存储方案没有改变,只是不再使用 crontab 方式运行,而是守护进程采取批处理的方式,取出一批过期的处理完了再取一批。我要去看看你的方案,努力学习一下,在这方面没有什么经验,第一次做。

@richenlin sub 和 pub 的方式有个问题就是断开连接后再连接后,断开那一时刻的任务都处理不到了,像 alsotang 说的,我这样处理失败了也会变成僵尸任务,不过我们在每次重启后都会把所有 score 为 0 的处理一遍。

我现在都是用mysql做事务,用行锁保证唯一,不然后面重复执行了会有很多问题

恩恩,MySQL 是比较好的方案,毕竟有原子性操作,估计我这边也会迁到 MySQL 上去

@richenlin sub / pub 连 redis 中的持久化都没有…比list更糟

@magicdawn pub /sub 是跨系统跨平台业务分离的异步事件的通知机制,本身并不能作为队列,需要配合 list或者set 来实现持久化,且能够自动发现和广播。 可以自行实现两个队列,分别是生产/消费,以及失败通知队列

@DuanPengfei 对了,用 redis 会不会还存在持久化的问题。。。

@alsotang 单纯从 redis 上讲会存在持久化问题,但是我们公司的 redis 是怎么做的我也不是很清楚,貌似是不用担心 redis 挂掉的问题,而且我这个对数据一致和安全要求不太高的情况还能用,后续我应该会迁移到 MySQL 上,会再仔细研究和参考资料尽量完善一下

回到顶部