原文链接:http://morning.work/page/2016-05/how-to-write-a-nodejs-redis-client.html
转载请注明出处
前几天写了篇文章《如何用 Node.js 编写一个 API 客户端》(http://morning.work/page/2016-05/how-to-write-a-nodejs-api-client-package.html ),有人说这 不能算是一个 API 客户端,顶多是一个支持 GET / POST 操作的模块 ,羞得我老脸微微一红,故作镇静地自然自语道,简单是简单点了,好歹也是个 API 客户端嘛。
这次要写的这个 Redis 客户端应该算是个客户端了,需要直接发起TCP/IP
连接去跟服务器通讯,需要自己解析客户端返回的结果,还要做一些简单的容错处理,如果要做到足够健壮也不容易,不过就本文要实现一个基本可用的例子来说,还是简单了点。
无论是实现 REST 的 API 客户端还是这样一个 Redis 客户端,虽然具体实现的细节不同,但是,套路还是一样的。二十一世纪行走江湖最重要的是什么?套路!套路!套路!所以呢,本文还是跟之前一样的套路。
Redis 协议
要开始编写一个 Redis 客户端,我们首先要知道怎么去跟 Redis 通讯,比如要执行GET a
应该按照什么样的格式给服务器发送指令,服务器返回的结果又是什么样的格式这些。Redis 协议的详细介绍可以参考这里:http://redis.cn/topics/protocol.html
假如我要执行命令KEYS *
,只要往服务器发送KEYS *\r\n
即可,这时服务器会直接响应结果,返回的结果格式如下:
- 用单行回复,回复的第一个字节将是
+
- 错误消息,回复的第一个字节将是
-
- 整型数字,回复的第一个字节将是
:
- 批量回复,回复的第一个字节将是
$
- 多个批量回复,回复的第一个字节将是
*
每一行都使用\r\n
来分隔。
为了查看具体的返回结果是怎样的,我们可以用nc
命令来测试。假定本机已经运行了 Redis 服务,其监听端口为6379
,我们可以执行以下命令连接:
$ nc 127.0.0.1 6379
如果本机没有nc
命令(比如 Windows 用户),可以使用telnet
命令:
$ telnet 127.0.0.1 6379
下面我们分别测试各个命令返回的结果(其中第一行表示客户端输入的命令,行尾的↵
表示按回车发送,第二行开始表示服务器端返回的内容):
1、返回错误
help ↵
-ERR unknown command 'help'
2、操作成功
set abc 123456 ↵
+OK
3、得到结果
get abc ↵
$6
123456
4、得不到结果
get aaa ↵
$-1
5、得到的结果是整形数字
hlen aaa ↵
:5
6、数组结果
keys a* ↵
*3
$3
abc
$3
aa1
$1
a
7、多命令执行
multi ↵
+OK
get a ↵
+QUEUED
get b ↵
+QUEUED
get c ↵
+QUEUED
exec ↵
*3
$5
hello
$-1
$5
world
解析结果
实现一个 Redis 客户端大概的原理是,客户端依次把需要执行的命令发送给服务器,而服务器会按照先后顺序把结果返回给用户。在本文我们使用 Node.js 内置的net
模块来操作,通过data
事件来接收结果。需要注意的是,有时候结果太长我们可能要几次data
事件才能拿到完整的结果,有时可能是一个data
事件中包含了几条命令的执行结果,也有可能当前命令的结果还没有传输完,剩下一半的结果在下一个data
事件中。
为了方便调试,我们将解析结果的部分独立封装成一个函数,接口如下:
const proto = new RedisProto();
// 接受到数据
proto.push('*3\r\n$3\r\nabc\r\n$3\r\naa1\r\n$1\r\na\r\n');
proto.push('$6\r\n123456\r\n');
proto.push('-ERR unknown command \'help\'\r\n');
proto.push('+OK\r\n');
proto.push(':5\r\n');
proto.push('*3\r\n$5\r\nhe');
proto.push('llo\r\n$-');
proto.push('1\r\n$5\r\nworld\r\n');
while (proto.next()) {
// proto.next() 如果有解析出完整的结果则返回结果,没有则返回false
// 另外可以通过 proto.result 获得
console.log(proto.result);
}
接下来开始编写相应的代码。
按照套路,我们先初始化项目:
$ mkdir redis_client
$ cd redis_client
$ git init
$ npm init
新建文件proto.js
:
'use strict';
/**
* 简单Redis客户端
*
* @author Zongmin Lei <leizongmin@gmail.com>
*/
class RedisProto {
constructor() {
this._lines = []; // 已初步解析出来的行
this._text = ''; // 剩余不能构成一行的文本
}
// 将收到的数据添加到缓冲区
push(text) {
// 将结果按照\r\n分隔
const lines = (this._text + text).split('\r\n');
// 如果结尾是\r\n,那么数组最后一个元素肯定是一个空字符串
// 否则,我们应该将剩余的部分跟下一个data事件接收到的数据连起来
this._text = lines.pop();
this._lines = this._lines.concat(lines);
}
// 解析下一个结果,如果没有则返回null
next() {
const lines = this._lines;
const first = lines[0];
// 去掉指定数量的行,并且返回结果
const popResult = (lineNumber, result) => {
this._lines = this._lines.slice(lineNumber);
return this.result = result;
};
// 返回空结果
const popEmpty = () => {
return this.result = false;
};
if (lines.length < 1) return popEmpty();
switch (first[0]) {
case '+':
return popResult(1, {data: first.slice(1)});
case '-':
return popResult(1, {error: first.slice(1)});
case ':':
return popResult(1, {data: Number(first.slice(1))});
case '$': {
const n = Number(first.slice(1));
if (n === -1) {
// 如果是 $-1 表示空结果
return popResult(1, {data: null});
} else {
// 否则取后面一行作为结果
const second = lines[1];
if (typeof second !== 'undefined') {
return popResult(2, {data: second});
} else {
return popEmpty();
}
}
}
case '*': {
const n = Number(first.slice(1));
if (n === 0) {
return popResult(1, {data: []});
} else {
const array = [];
let i = 1;
for (; i < lines.length && array.length < n; i++) {
const a = lines[i];
const b = lines[i + 1];
if (a.slice(0, 3) === '$-1') {
array.push(null);
} else if (a[0] === ':') {
array.push(Number(a.slice(1)));
} else {
if (typeof b !== 'undefined') {
array.push(b);
i++;
} else {
return popEmpty();
}
}
}
if (array.length === n) {
return popResult(i, {data: array});
} else {
return popEmpty();
}
}
}
default:
return popEmpty();
}
}
}
module.exports = RedisProto;
执行上文中的测试代码可得到如下结果:
{ data: '123456' }
{ data: [ 'abc', 'aa1', 'a' ] }
{ error: 'ERR unknown command \'help\'' }
{ data: 'OK' }
{ data: 5 }
{ data: [ 'hello', null, 'world' ] }
实现 Redis 客户端
上文我们已经实现了一个简单的解析器,其可以通过push()
将接收到的数据片段加进去,然后我们只需要不断地调用next()
来获取下一个解析出来的结果即可,直到其返回false
,在下一次收到数据时,重复刚才的动作。
新建文件index.js
:
'use strict';
/**
* 简单Redis客户端
*
* @author Zongmin Lei <leizongmin@gmail.com>
*/
const events = require('events');
const net = require('net');
const RedisProto = require('./proto');
class Redis extends events.EventEmitter {
constructor(options) {
super();
// 默认连接配置
options = options || {};
options.host = options.host || '127.0.0.1';
options.port = options.port || 6379;
this.options = options;
// 连接状态
this._isClosed = false;
this._isConnected = false;
// 回调函数列表
this._callbacks = [];
this._proto = new RedisProto();
this.connection = net.createConnection(options.port, options.host, () => {
this._isConnected = true;
this.emit('connect');
});
this.connection.on('error', err => {
this.emit('error', err);
});
this.connection.on('close', () => {
this._isClosed = true;
this.emit('close');
});
this.connection.on('end', () => {
this.emit('end');
});
this.connection.on('data', data => {
this._pushData(data);
});
}
// 发送命令给服务器
sendCommand(cmd, callback) {
return new Promise((resolve, reject) => {
const cb = (err, ret) => {
callback && callback(err, ret);
err ? reject(err) : resolve(ret);
};
// 如果当前连接已断开,直接返回错误
if (this._isClosed) {
return cb(new Error('connection has been closed'));
}
// 将回调函数添加到队列
this._callbacks.push(cb);
// 发送命令
this.connection.write(`${cmd}\r\n`);
});
}
// 接收到数据,循环结果
_pushData(data) {
this._proto.push(data);
while (this._proto.next()) {
const result = this._proto.result;
const cb = this._callbacks.shift();
if (result.error) {
cb(new Error(result.error));
} else {
cb(null, result.data);
}
}
}
// 关闭连接
end() {
this.connection.destroy();
}
}
module.exports = Redis;
说明:
- 每次
data
事件接收到结果时,直接将其push()
到RedisProto
中,并尝试执行next()
获得结果 - 因为命令的执行结果都是按照顺序返回的,所以我们只需要按顺序从
this._callbacks
中取出最前面的元素,直接执行回调 - 如果连接已经断开,则不允许再执行命令,直接返回
connection has been closed
错误 sendCommand()
同时支持callback
和promise
方式的回调,但是套路跟上一篇文章《如何用 Node.js 编写一个 API 客户端》稍有不同
新建测试文件test.js
:
'use strict';
const Redis = require('./index');
const client = new Redis();
client.sendCommand('GET a', (err, ret) => {
console.log('a=%s, err=%s', ret, err);
});
client.sendCommand('GET b', (err, ret) => {
console.log('b=%s, err=%s', ret, err);
});
client.sendCommand('KEYS *IO*', (err, ret) => {
console.log('KEYS *IO*=%s, err=%s', ret, err);
});
client.sendCommand('OOXX', (err, ret) => {
console.log('OOXX=%s, err=%s', ret, err);
});
client.sendCommand('SET a ' + Date.now())
.then(ret => console.log('success', ret))
.catch(err => console.log('error', err))
.then(() => client.end());
执行测试文件node test.js
可看到类似如下的结果:
a=1463041835231, err=null
b=null, err=null
KEYS *IO*=sess:cz5F-npwOnw0FmesT6JjqJPL13IO8AzV,sess:NS90IkF6uZNAm-FPEAWXHuh3JrIW1-IO, err=null
OOXX=undefined, err=Error: ERR unknown command 'OOXX'
success OK
从结果中可以看出我们这个 Redis 客户端已经基本能用了。
更友好的接口
上文我们实现了一个sendCommand()
方法,理论上可以通过该方法执行任意的 Redis 命令,但是我们可能更希望每条命令有一个对应的方法,比如sendCommand('GET a')
我们可以写成get('a')
,这样看起来会更直观。
首先在index.js
文件头部载入fs
和path
模块:
const fs = require('fs');
const path = require('path');
然后给Redis
类增加_bindCommands()
方法:
_bindCommands() {
const self = this;
// 绑定命令
const bind = (cmd) => {
return function () {
let args = Array.prototype.slice.call(arguments);
let callback;
if (typeof args[args.length - 1] === 'function') {
callback = args.pop();
}
// 每个参数使用空格分隔
args = args.map(item => Array.isArray(item) ? item.join(' ') : item).join(' ');
return self.sendCommand(`${cmd} ${args}`, callback);
};
};
// 从文件读取命令列表
const cmdList = fs.readFileSync(path.resolve(__dirname, 'cmd.txt')).toString().split('\n');
for (const cmd of cmdList) {
// 同时支持大写和小写的函数名
this[cmd.toLowerCase()] = this[cmd.toUpperCase()] = bind(cmd);
}
}
然后在Redis
类的constructor()
方法尾部增加以下代码:
this._bindCommands();
由于在_bindCommands()
中通过读取cmd.txt
文件来读取 Redis 的命令列表,所以还需要新建文件cmd.txt
,内容格式为每条命令一行(由于篇幅限制,本文只列出需要用到的几条命令):
GET
SET
AUTH
MULTI
EXEC
KEYS
把测试文件test.js
改为以下代码:
'use strict';
const Redis = require('./index');
const client = new Redis();
client.get('a', (err, ret) => {
console.log('a=%s, err=%s', ret, err);
});
client.get('b', (err, ret) => {
console.log('b=%s, err=%s', ret, err);
});
client.keys('*IO*', (err, ret) => {
console.log('KEYS *IO*=%s, err=%s', ret, err);
});
client.set('a', Date.now())
.then(ret => console.log('success', ret))
.catch(err => console.log('error', err))
.then(() => client.end())
重新执行node test.js
可看到结果跟上文还是一致的。
简单容错处理
假如将测试文件test.js
改为这样:
'use strict';
const Redis = require('./index');
const client = new Redis();
client.get('a', (err, ret) => {
console.log('a=%s, err=%s', ret, err);
client.end();
client.get('b', (err, ret) => {
console.log('b=%s, err=%s', ret, err);
});
});
在完成get('a')
的时候,我们执行client.end()
关闭了连接,然后再执行get('b')
,大多数情况下将会得到如下的结果:
a=1463042964235, err=null
而get('b')
的回调函数并没有被执行,因为我们在关闭连接后,再也没有收到服务端返回的结果。另外也有可能是因为其他原因,客户端与服务端的连接断开了,此时我们应该能执行回调并返回一个错误。
在文件index.js
中给Redis
类增加一个方法_callbackAll()
:
_callbackAll() {
for (const cb of this._callbacks) {
cb(new Error('connection has been closed'));
}
this._callbacks = [];
}
另外,在constructor()
方法内,将监听连接的close
事件部分代码改成这样:
this.connection.on('close', () => {
this._isClosed = true;
this.emit('close');
this._callbackAll();
});
重新执行node test.js
,从执行结果可看出所有回调函数均已被执行:
a=1463042964235, err=null
b=undefined, err=Error: connection has been closed
还存在的问题
看起来这个模块已经能正常使用了,但是其实并不完善。跟NPM上的ioredis
模块起来还存在以下问题:
- 不支持
multi()
命令 - 不支持
publish
和subscribe
命令 - 不能解析更复杂的返回结果,比如
command
命令的返回结果 - 不支持更多的连接选项,比如密码验证
- 可能存在unicode字符被截断问题
- 因为结果是通过
\r\n
来分行的,如果一条数据里面本身包含\r\n
字符,可能会解析出错 - 没有严格的测试,假如服务端返回了一个非预期的格式,我也不知道程序会咋样
RedisProto
解析结果的算法还是可以优化的,目前这个只能算是大概能用- 如果连接意外断开了,我们可能希望能自动重新连接而不是直接报错
好了,剩下的就交给你啦。
参考链接
很有意思的招聘,可惜不招前端
不错,赞一个。
都是干货,cool!
新手就问一下,后端要从哪些地方入手比较好。。
雷总是要成为第二个轮子哥的节奏。。
老雷你那么牛,为什么不来我的团队。
get √ 多谢分享
老雷你那么牛,为什么不来我的团队。
@sofish @JacksonTian 你们啥时候成立广州研发根据地?
总之是学到东西了,一不小心跟着老雷学了好多东西……
感觉又是一次误导。 如果面试题是这样的,说明这个面试的人根本不知道如何面试一个人。
@calidion 求指教:这样存在什么问题,应该怎么面试?
首先,这样的面试并不全面,考察的点太窄。 其次,处理一个问题可以有多种办法,也许他的回答并不是跟你的思路是一致的。 再次,对于问题域的理解需要更多的时间,你问的问题越细,所能考察的人数越有限,知识面越有限。 然后,面试应该是一次综合的考查,并不完全是考察代码。 还有,解决你所描述的问题,花费的时间太长,如果你想找到合适的人选择,你一天最多面试两个,跟一天面试20个,那个效率高? 最后,如果你面试通过对话都不能了解对方,你是不是在评价能力上有问题?那么让你招聘本身是不是就存在问题。
题外的,即使他写出来你满意的代码,他的人让你不爽,写这么多代码又有毛用?面试也不完全是考察技术,技术再牛跟你不搭也是没用的。
面试是一个交流了解的过程,并不是炫技术的过程,也不需要难倒面试者。。 一方面是知识面的考察,一方面的代码能力的考察。 你要准备100%个知识点。然后设定技术门槛,比如60%通过表示技术过关。 写复杂代码在面试过程中是没有意义的。 据我的经验,如果事先没有准备,90%以上的人是写不好冒泡排序的。 但是这些不影响他们写出来正确的代码,不影响他们有正确的逻辑分析能力。 当然最重要的是不影响你对他们的技术能力的评价。
@calidion 首先,整个面试过程并不是一上来就写代码做题,是通过QQ聊了多次,且面试的时候也当面聊过,这只是最后一个环节 其次,这个面试题是针对特定一个人的,根据之前了解的结果设计的,并不是一套题通吃 再次,这确实是一个很花费时间的题目,而且也不如你想要的那样一天面试20个的高效率,整个下午只面试了一个人 还有,这样的面试题目只是为了了解对方在Node.js后端项目编程的能力,题目本身也没有标准答案,也不管对方能不能做出来,其出发点并不是为了难道面试者
最后,你以上所有的判断都是基于一个不了解背景的前提下得出的结论
题外的,当时面试的情景与本文并没有直接的关系,我也没有在此透露更多细节。如果你觉得写这样的一篇文章是在炫技术,我也不知道该说什么好
在作为面试官的角色我确实还缺乏经验,也非常感谢你能分享你的心得 你所说的观点我大部分是认同的,但依然对你不了解具体情况的前提下乱贴标签的行为感到不爽
我很高兴你找到愿意跟你这样玩的技术人员。但是我不认为你这种面试是一种有效的面试。
你可以很个性化的去面试,但是我不认为这个具备推广价值。
我对你的特定背景我是不了解的,但是我了解如何去评价一个技术人员。
我不知道贴了什么标签,我只是质疑这样面试是不是有效。
如果标题只是如何编写redis客户端,我想很多人会喜欢这种分享。
我也不会在任何异议,但是写着面试题,我觉得会有一定的误导。
@calidion 我并无意推广这样的面试方式,前面已经强调,这文章与面试无太大的联系。当然为了避免你再次看到这样的标题而引起不快,已经将该部分敏感内容删除了。
谢谢分享,感谢理解。
Mark 自豪地采用 CNodeJS ionic
学习了
来自酷炫的 CNodeMD
好文,自己tcp/ip协议理解还比较弱,还要再学习一下。
老雷,我自己写了一个demo,发现一个问题,redis在发送完数据之后,会关闭tcp连接,请问这是怎么回事?我想让客户端和redis的tcp连接一直保持长连接,该怎么办?麻烦你有空的时候,帮我看一下。代码如下
const net = require("net");
const client = net.createConnection({port:6379,host:'127.0.0.1'},()=>{
var data = "";
client.on('data',(chunk)=>{
data += chunk;
});
client.on('end',()=>{
console.log("data:",data);
console.log('end');
});
client.on('close',()=>{
console.log('close');
});
client.on('error',()=>{
console.log('error');
});
client.end('get name\r\n');
});
输出结果如图:
@muyoucun557 你使用的是client.end('get name\r\n');
来发送命令,在命令发送完毕之后是你这一段主动关闭了传输通道,所以Redis服务端也关闭了它那一端。你应该改成client.send('get name\r\n');
@leizongmin 谢谢老雷。还是得多看文档,QAQ
@leizongmin 看老雷的这篇文章,处处都是惊喜(^__^) 嘻嘻……