精华 写了个用 redis 做消息(任务)队列的包——redis-as-queue
发布于 10 年前 作者 XadillaX 16015 次浏览 最后一次编辑是 8 年前 来自 分享

较之于其它包有一个功能点不一样,就是分 Normal QueueUnique QueueNormal Queue 就是一个普通的先进先出队列,而 Unique Queue 就是一个唯一的队列,当我们往 Unique Queue 中插入一个已有的消息时,你可以自主选择是更新这个消息到队尾还是不做任何事情。

包名叫 redis-as-queue,仓库地址在:https://github.com/XadillaX/node-redis-as-queue,求 star,求 follow。

所以安装只需要:

$ npm install redis-as-queue

最后为了凑字数帖上流氓的 README:


Usage

There are two kinds of queue - normal queue and unique queue.

Each value in unique queue is unique. If a new value that equals any value in the queue, you may choose to update or not.

First you should require this package:

var raq = require("redis-as-queue");

Normal Queue

Create

var normalQueue = new raq.NormalQueue(QUEUE_NAME, [...]);

Note: [...] is the option(s) to connect redis server. Refer to node-redis document.

Eg.

new raq.NormalQueue(QUEUE_NAME, 6379, '127.0.0.1', {});
new raq.NormalQueue(QUEUE_NAME, unix_socket, options);
...

Push

To push a message to your queue, you may use this function:

normalQueue.push("YOUR MESSAGE", function(err) {
    console.log(err);
});

Get

To get message(s) from your redis queue, you may use get function.

Get earlist one message
normalQueue.get(function(err, messages) {
    console.log(err);
    if(messages.length) console.log(messages[0]);
});
Get earlist N message(s)
// This call gets 10 earlist messages
normalQueue.get(10, function(err, messages) {
    console.log(err);
    for(var i = 0; i < messages.length; i++) console.log(messages[i]);
});
Get all message(s)
normalQueue.get(-1, function(err, messages) {
    console.log(err);
    for(var i = 0; i < messages.length; i++) console.log(messages[i]);
});

Remove

To remove message(s) from your redis queue, you may use removeAmount function.

Remove earlist one message
normalQueue.removeAmount(function(err) {
    console.log(err);
});
Remove earlist N message(s)
normalQueue.removeAmount(5, function(err) {
    console.log(err);
});
Remove all the message(s)
normalQueue.removeAmount(-1, function(err) {
    console.log(err);
});

Length

Get the length of current queue.

normalQueue.length(function(err, len) {
    console.log(len);
});

Unique Queue

Each message in unique queue is unique.

Create

var uniqueQueue = new raq.UniqueQueue(QUEUE_NAME, [...]);

Note: [...] is the option(s) to connect redis server. Refer to node-redis document.

Eg.

new raq.UniqueQueue(QUEUE_NAME, 6379, '127.0.0.1', {});
new raq.UniqueQueue(QUEUE_NAME, unix_socket, options);
...

Push

Push a message to the queue.

Need update

This call will check if your message is existing. If existing, it will move the previous message to the end of queue. Otherwise, your message will be pushed at the end of queue.

uniqueQueue.push("YOUR_MESSAGE", true, function(err) {
    console.log(err);
});
Do not update

This call will check if your message is existing. If existing, it won’t do anything. Otherwise, your message will be pushed at the end of queue.

uniqueQueue.push("YOUR_MESSAGE", function(err) {
    console.log(err);
});

// or you can do this

uniqueQueue.push("YOUR_MESSAGE", false, function(err) {
    console.log(err);
});

Get

Get first message
uniqueQueue.get(function(err, messages) {
    console.log(err);
    if(messages.length) {
        console.log(messages[0].message);
        console.log(messages[0].updatedAt);
    }
});
Get first N message(s)
uniqueQueue.get(3, function(err, messages) {
    console.log(err);
    for(var i = 0; i < messages.length; i++) {
        console.log(messages[i].message);
        console.log(messages[i].updatedAt);
    }
});
Get all the message(s)
uniqueQueue.get(-1, function(err, messages) {
    console.log(err);
    for(var i = 0; i < messages.length; i++) {
        console.log(messages[i].message);
        console.log(messages[i].updatedAt);
    }
});

Remove Amount

Refer to remove section in Normal Queue.

Remove Messages

Remove one or more certain message(s) in the queue.

uniqueQueue.get(10, function(err, messages) {
    for(var i = 0; i < 5; i++) messages.shift();

    // Remove Messages
    uniqueQueue.removeMessages(messages, function(err, removeCount, notRemovedMessages) {
        console.log(err);
        console.log(removeCount);
        for(var i = 0; i < notRemovedMessages.length; i++) console.log(notRemovedMessages[i].message);
    });
});

Length

Refer to length section in Normal Queue.

Common Functions

Delete Queue

Delete this queue key and values in redis.

uniqueQueue.deleteQueue(function() {});
normalQueue.deleteQueue(function() {});

Destroy Queue Object

Let the queue object disconnect from redis server and let this instance can’t do any other thing any more.

uniqueQueue.destroy();
normalQueue.destroy();
12 回复

顶!看了一下代码,之前也利用redis的list写过类似但是没有你这么全的queue,想问一下,如果我需要一次性push出来多个元素的需求,有什么好的方法吗,redis的lis貌似没有提供这样的方法。

@showen 你指的 push 出来是 pop 或者获取的意思么😒

nodejs本身是不是做一个队列呢?

var container=[]; onConnect(function(){ onData(‘push’,function(data){ container.push(data); }); onData(“pull”,function(){ return container.pop(); } });

@yakczh 要考虑到分布式和持久化。如果你就 node 做一个内部的队列的话就无法分布式了。如果用 node 做一个队列服务的化,要你自己写各种持久化操作各种数据相关东西,不然服务万一挂了,下次重启的时候原来队列里的数据不是都没了?

既然有现成的方案在,我们为什么不在他上面做一层简单的封装来得方便些呢?

@xadillax 嗯 笔误哈 就是指一次性pop出来多个 我觉得redis的list应该提供这样的接口啊 不然如果queue非常大的话每次都是pop出来一个性能极低

@showen 有类似接口的,你看看我的代码就知道了。获取多个然后删除多个。

uniqueQueue内部采用有序集合,顶一个

正在找这个,用用看。

路过,看到这个觉得还是不错啊,mark

数据入队列后, 取队列的话,怎么能矜夸监听到有数据了, 需要处理呢

希望楼主出现,看了代码,希望有些问题质疑,给个联系QQ啥的就好

我在测试uniqueQueue.push的时候,提示没有zadd这个方法,看了一下代码,我这里在async中self.redis的值是undefined,我需要在进入async前把redis的值保存下来,在async中才可以调用成功,原因有些搞不懂

回到顶部