精华 编写一个简单的 Redis 客户端
发布于 8 年前 作者 leizongmin 9134 次浏览 来自 分享

原文链接:http://morning.work/page/2016-05/how-to-write-a-nodejs-redis-client.html

转载请注明出处


前几天写了篇文章《如何用 Node.js 编写一个 API 客户端》http://morning.work/page/2016-05/how-to-write-a-nodejs-api-client-package.html ),有人说这 不能算是一个 API 客户端,顶多是一个支持 GET / POST 操作的模块 ,羞得我老脸微微一红,故作镇静地自然自语道,简单是简单点了,好歹也是个 API 客户端嘛。

这次要写的这个 Redis 客户端应该算是个客户端了,需要直接发起TCP/IP连接去跟服务器通讯,需要自己解析客户端返回的结果,还要做一些简单的容错处理,如果要做到足够健壮也不容易,不过就本文要实现一个基本可用的例子来说,还是简单了点。

无论是实现 REST 的 API 客户端还是这样一个 Redis 客户端,虽然具体实现的细节不同,但是,套路还是一样的。二十一世纪行走江湖最重要的是什么?套路!套路!套路!所以呢,本文还是跟之前一样的套路。

Redis 协议

要开始编写一个 Redis 客户端,我们首先要知道怎么去跟 Redis 通讯,比如要执行GET a应该按照什么样的格式给服务器发送指令,服务器返回的结果又是什么样的格式这些。Redis 协议的详细介绍可以参考这里:http://redis.cn/topics/protocol.html

假如我要执行命令KEYS *,只要往服务器发送KEYS *\r\n即可,这时服务器会直接响应结果,返回的结果格式如下:

  • 用单行回复,回复的第一个字节将是+
  • 错误消息,回复的第一个字节将是-
  • 整型数字,回复的第一个字节将是:
  • 批量回复,回复的第一个字节将是$
  • 多个批量回复,回复的第一个字节将是*

每一行都使用\r\n来分隔。

为了查看具体的返回结果是怎样的,我们可以用nc命令来测试。假定本机已经运行了 Redis 服务,其监听端口为6379,我们可以执行以下命令连接:

$ nc 127.0.0.1 6379

如果本机没有nc命令(比如 Windows 用户),可以使用telnet命令:

$ telnet 127.0.0.1 6379

下面我们分别测试各个命令返回的结果(其中第一行表示客户端输入的命令,行尾的表示按回车发送,第二行开始表示服务器端返回的内容):

1、返回错误

help ↵

-ERR unknown command 'help'

2、操作成功

set abc 123456 ↵

+OK

3、得到结果

get abc ↵

$6
123456

4、得不到结果

get aaa ↵

$-1

5、得到的结果是整形数字

hlen aaa ↵

:5

6、数组结果

keys a* ↵

*3
$3
abc
$3
aa1
$1
a

7、多命令执行

multi ↵

+OK

get a ↵

+QUEUED

get b ↵

+QUEUED

get c ↵

+QUEUED

exec ↵

*3
$5
hello
$-1
$5
world

解析结果

实现一个 Redis 客户端大概的原理是,客户端依次把需要执行的命令发送给服务器,而服务器会按照先后顺序把结果返回给用户。在本文我们使用 Node.js 内置的net模块来操作,通过data事件来接收结果。需要注意的是,有时候结果太长我们可能要几次data事件才能拿到完整的结果,有时可能是一个data事件中包含了几条命令的执行结果,也有可能当前命令的结果还没有传输完,剩下一半的结果在下一个data事件中。

为了方便调试,我们将解析结果的部分独立封装成一个函数,接口如下:

const proto = new RedisProto();

// 接受到数据
proto.push('*3\r\n$3\r\nabc\r\n$3\r\naa1\r\n$1\r\na\r\n');
proto.push('$6\r\n123456\r\n');
proto.push('-ERR unknown command \'help\'\r\n');
proto.push('+OK\r\n');
proto.push(':5\r\n');
proto.push('*3\r\n$5\r\nhe');
proto.push('llo\r\n$-');
proto.push('1\r\n$5\r\nworld\r\n');

while (proto.next()) {
  // proto.next() 如果有解析出完整的结果则返回结果,没有则返回false
  // 另外可以通过 proto.result 获得
  console.log(proto.result);
}

接下来开始编写相应的代码。

按照套路,我们先初始化项目:

$ mkdir redis_client
$ cd redis_client
$ git init
$ npm init

新建文件proto.js

'use strict';

/**
 * 简单Redis客户端
 *
 * @author Zongmin Lei <leizongmin@gmail.com>
 */

class RedisProto {

  constructor() {

    this._lines = []; // 已初步解析出来的行
    this._text = '';  // 剩余不能构成一行的文本

  }

  // 将收到的数据添加到缓冲区
  push(text) {

    // 将结果按照\r\n分隔
    const lines = (this._text + text).split('\r\n');
    // 如果结尾是\r\n,那么数组最后一个元素肯定是一个空字符串
    // 否则,我们应该将剩余的部分跟下一个data事件接收到的数据连起来
    this._text = lines.pop();
    this._lines = this._lines.concat(lines);

  }

  // 解析下一个结果,如果没有则返回null
  next() {

    const lines = this._lines;
    const first = lines[0];

    // 去掉指定数量的行,并且返回结果
    const popResult = (lineNumber, result) => {
      this._lines = this._lines.slice(lineNumber);
      return this.result = result;
    };

    // 返回空结果
    const popEmpty = () => {
      return this.result = false;
    };

    if (lines.length < 1) return popEmpty();

    switch (first[0]) {

      case '+':
        return popResult(1, {data: first.slice(1)});

      case '-':
        return popResult(1, {error: first.slice(1)});

      case ':':
        return popResult(1, {data: Number(first.slice(1))});

      case '$': {
        const n = Number(first.slice(1));
        if (n === -1) {
          // 如果是 $-1 表示空结果
          return popResult(1, {data: null});
        } else {
          // 否则取后面一行作为结果
          const second = lines[1];
          if (typeof second !== 'undefined') {
            return popResult(2, {data: second});
          } else {
            return popEmpty();
          }
        }
      }

      case '*': {
        const n = Number(first.slice(1));
        if (n === 0) {
          return popResult(1, {data: []});
        } else {
          const array = [];
          let i = 1;
          for (; i < lines.length && array.length < n; i++) {
            const a = lines[i];
            const b = lines[i + 1];
            if (a.slice(0, 3) === '$-1') {
              array.push(null);
            } else if (a[0] === ':') {
              array.push(Number(a.slice(1)));
            } else {
              if (typeof b !== 'undefined') {
                array.push(b);
                i++;
              } else {
                return popEmpty();
              }
            }
          }
          if (array.length === n) {
            return popResult(i, {data: array});
          } else {
            return popEmpty();
          }
        }
      }

      default:
        return popEmpty();

    }

  }

}

module.exports = RedisProto;

执行上文中的测试代码可得到如下结果:

{ data: '123456' }
{ data: [ 'abc', 'aa1', 'a' ] }
{ error: 'ERR unknown command \'help\'' }
{ data: 'OK' }
{ data: 5 }
{ data: [ 'hello', null, 'world' ] }

实现 Redis 客户端

上文我们已经实现了一个简单的解析器,其可以通过push()将接收到的数据片段加进去,然后我们只需要不断地调用next()来获取下一个解析出来的结果即可,直到其返回false,在下一次收到数据时,重复刚才的动作。

新建文件index.js

'use strict';

/**
 * 简单Redis客户端
 *
 * @author Zongmin Lei <leizongmin@gmail.com>
 */

const events = require('events');
const net = require('net');
const RedisProto = require('./proto');

class Redis extends events.EventEmitter {

  constructor(options) {
    super();

    // 默认连接配置
    options = options || {};
    options.host = options.host || '127.0.0.1';
    options.port = options.port || 6379;
    this.options = options;

    // 连接状态
    this._isClosed = false;
    this._isConnected = false;

    // 回调函数列表
    this._callbacks = [];

    this._proto = new RedisProto();

    this.connection = net.createConnection(options.port, options.host, () => {
      this._isConnected = true;
      this.emit('connect');
    });

    this.connection.on('error', err => {
      this.emit('error', err);
    });

    this.connection.on('close', () => {
      this._isClosed = true;
      this.emit('close');
    });

    this.connection.on('end', () => {
      this.emit('end');
    });

    this.connection.on('data', data => {
      this._pushData(data);
    });

  }

  // 发送命令给服务器
  sendCommand(cmd, callback) {
    return new Promise((resolve, reject) => {

      const cb = (err, ret) => {
        callback && callback(err, ret);
        err ? reject(err) : resolve(ret);
      };

      // 如果当前连接已断开,直接返回错误
      if (this._isClosed) {
        return cb(new Error('connection has been closed'));
      }

      // 将回调函数添加到队列
      this._callbacks.push(cb);
      // 发送命令
      this.connection.write(`${cmd}\r\n`);

    });
  }

  // 接收到数据,循环结果
  _pushData(data) {

    this._proto.push(data);

    while (this._proto.next()) {

      const result = this._proto.result;
      const cb = this._callbacks.shift();

      if (result.error) {
        cb(new Error(result.error));
      } else {
        cb(null, result.data);
      }

    }

  }

  // 关闭连接
  end() {
    this.connection.destroy();
  }

}

module.exports = Redis;

说明:

  • 每次data事件接收到结果时,直接将其push()RedisProto中,并尝试执行next()获得结果
  • 因为命令的执行结果都是按照顺序返回的,所以我们只需要按顺序从this._callbacks中取出最前面的元素,直接执行回调
  • 如果连接已经断开,则不允许再执行命令,直接返回connection has been closed错误
  • sendCommand()同时支持callbackpromise方式的回调,但是套路跟上一篇文章《如何用 Node.js 编写一个 API 客户端》稍有不同

新建测试文件test.js

'use strict';

const Redis = require('./index');
const client = new Redis();

client.sendCommand('GET a', (err, ret) => {
  console.log('a=%s, err=%s', ret, err);
});

client.sendCommand('GET b', (err, ret) => {
  console.log('b=%s, err=%s', ret, err);
});

client.sendCommand('KEYS *IO*', (err, ret) => {
  console.log('KEYS *IO*=%s, err=%s', ret, err);
});

client.sendCommand('OOXX', (err, ret) => {
  console.log('OOXX=%s, err=%s', ret, err);
});

client.sendCommand('SET a ' + Date.now())
  .then(ret => console.log('success', ret))
  .catch(err => console.log('error', err))
  .then(() => client.end());

执行测试文件node test.js可看到类似如下的结果:

a=1463041835231, err=null
b=null, err=null
KEYS *IO*=sess:cz5F-npwOnw0FmesT6JjqJPL13IO8AzV,sess:NS90IkF6uZNAm-FPEAWXHuh3JrIW1-IO, err=null
OOXX=undefined, err=Error: ERR unknown command 'OOXX'
success OK

从结果中可以看出我们这个 Redis 客户端已经基本能用了。

更友好的接口

上文我们实现了一个sendCommand()方法,理论上可以通过该方法执行任意的 Redis 命令,但是我们可能更希望每条命令有一个对应的方法,比如sendCommand('GET a')我们可以写成get('a'),这样看起来会更直观。

首先在index.js文件头部载入fspath模块:

const fs = require('fs');
const path = require('path');

然后给Redis类增加_bindCommands()方法:

_bindCommands() {

  const self = this;

  // 绑定命令
  const bind = (cmd) => {
    return function () {

      let args = Array.prototype.slice.call(arguments);
      let callback;
      if (typeof args[args.length - 1] === 'function') {
        callback = args.pop();
      }

      // 每个参数使用空格分隔
      args = args.map(item => Array.isArray(item) ? item.join(' ') : item).join(' ');

      return self.sendCommand(`${cmd} ${args}`, callback);

    };
  };

  // 从文件读取命令列表
  const cmdList = fs.readFileSync(path.resolve(__dirname, 'cmd.txt')).toString().split('\n');
  for (const cmd of cmdList) {

    // 同时支持大写和小写的函数名
    this[cmd.toLowerCase()] = this[cmd.toUpperCase()] = bind(cmd);

  }

}

然后在Redis类的constructor()方法尾部增加以下代码:

this._bindCommands();

由于在_bindCommands()中通过读取cmd.txt文件来读取 Redis 的命令列表,所以还需要新建文件cmd.txt,内容格式为每条命令一行(由于篇幅限制,本文只列出需要用到的几条命令):

GET
SET
AUTH
MULTI
EXEC
KEYS

把测试文件test.js改为以下代码:

'use strict';

const Redis = require('./index');
const client = new Redis();

client.get('a', (err, ret) => {
  console.log('a=%s, err=%s', ret, err);
});

client.get('b', (err, ret) => {
  console.log('b=%s, err=%s', ret, err);
});

client.keys('*IO*', (err, ret) => {
  console.log('KEYS *IO*=%s, err=%s', ret, err);
});

client.set('a', Date.now())
  .then(ret => console.log('success', ret))
  .catch(err => console.log('error', err))
  .then(() => client.end())

重新执行node test.js可看到结果跟上文还是一致的。

简单容错处理

假如将测试文件test.js改为这样:

'use strict';

const Redis = require('./index');
const client = new Redis();

client.get('a', (err, ret) => {
  console.log('a=%s, err=%s', ret, err);

  client.end();

  client.get('b', (err, ret) => {
    console.log('b=%s, err=%s', ret, err);
  });
});

在完成get('a')的时候,我们执行client.end()关闭了连接,然后再执行get('b'),大多数情况下将会得到如下的结果:

a=1463042964235, err=null

get('b')的回调函数并没有被执行,因为我们在关闭连接后,再也没有收到服务端返回的结果。另外也有可能是因为其他原因,客户端与服务端的连接断开了,此时我们应该能执行回调并返回一个错误。

在文件index.js中给Redis类增加一个方法_callbackAll()

_callbackAll() {

  for (const cb of this._callbacks) {
    cb(new Error('connection has been closed'));
  }
  this._callbacks = [];

}

另外,在constructor()方法内,将监听连接的close事件部分代码改成这样:

this.connection.on('close', () => {
  this._isClosed = true;
  this.emit('close');
  this._callbackAll();
});

重新执行node test.js,从执行结果可看出所有回调函数均已被执行:

a=1463042964235, err=null
b=undefined, err=Error: connection has been closed

还存在的问题

看起来这个模块已经能正常使用了,但是其实并不完善。跟NPM上的ioredis模块起来还存在以下问题:

  • 不支持multi()命令
  • 不支持publishsubscribe命令
  • 不能解析更复杂的返回结果,比如command命令的返回结果
  • 不支持更多的连接选项,比如密码验证
  • 可能存在unicode字符被截断问题
  • 因为结果是通过\r\n来分行的,如果一条数据里面本身包含\r\n字符,可能会解析出错
  • 没有严格的测试,假如服务端返回了一个非预期的格式,我也不知道程序会咋样
  • RedisProto解析结果的算法还是可以优化的,目前这个只能算是大概能用
  • 如果连接意外断开了,我们可能希望能自动重新连接而不是直接报错

好了,剩下的就交给你啦。

参考链接

24 回复

很有意思的招聘,可惜不招前端

不错,赞一个。

都是干货,cool!

新手就问一下,后端要从哪些地方入手比较好。。

雷总是要成为第二个轮子哥的节奏。。

老雷你那么牛,为什么不来我的团队。

get √ 多谢分享

老雷你那么牛,为什么不来我的团队。

@sofish @JacksonTian 你们啥时候成立广州研发根据地?

总之是学到东西了,一不小心跟着老雷学了好多东西……

感觉又是一次误导。 如果面试题是这样的,说明这个面试的人根本不知道如何面试一个人。

@calidion 求指教:这样存在什么问题,应该怎么面试?

@leizongmin

首先,这样的面试并不全面,考察的点太窄。 其次,处理一个问题可以有多种办法,也许他的回答并不是跟你的思路是一致的。 再次,对于问题域的理解需要更多的时间,你问的问题越细,所能考察的人数越有限,知识面越有限。 然后,面试应该是一次综合的考查,并不完全是考察代码。 还有,解决你所描述的问题,花费的时间太长,如果你想找到合适的人选择,你一天最多面试两个,跟一天面试20个,那个效率高? 最后,如果你面试通过对话都不能了解对方,你是不是在评价能力上有问题?那么让你招聘本身是不是就存在问题。

题外的,即使他写出来你满意的代码,他的人让你不爽,写这么多代码又有毛用?面试也不完全是考察技术,技术再牛跟你不搭也是没用的。

面试是一个交流了解的过程,并不是炫技术的过程,也不需要难倒面试者。。 一方面是知识面的考察,一方面的代码能力的考察。 你要准备100%个知识点。然后设定技术门槛,比如60%通过表示技术过关。 写复杂代码在面试过程中是没有意义的。 据我的经验,如果事先没有准备,90%以上的人是写不好冒泡排序的。 但是这些不影响他们写出来正确的代码,不影响他们有正确的逻辑分析能力。 当然最重要的是不影响你对他们的技术能力的评价。

@calidion 首先,整个面试过程并不是一上来就写代码做题,是通过QQ聊了多次,且面试的时候也当面聊过,这只是最后一个环节 其次,这个面试题是针对特定一个人的,根据之前了解的结果设计的,并不是一套题通吃 再次,这确实是一个很花费时间的题目,而且也不如你想要的那样一天面试20个的高效率,整个下午只面试了一个人 还有,这样的面试题目只是为了了解对方在Node.js后端项目编程的能力,题目本身也没有标准答案,也不管对方能不能做出来,其出发点并不是为了难道面试者

最后,你以上所有的判断都是基于一个不了解背景的前提下得出的结论

题外的,当时面试的情景与本文并没有直接的关系,我也没有在此透露更多细节。如果你觉得写这样的一篇文章是在炫技术,我也不知道该说什么好

在作为面试官的角色我确实还缺乏经验,也非常感谢你能分享你的心得 你所说的观点我大部分是认同的,但依然对你不了解具体情况的前提下乱贴标签的行为感到不爽

@leizongmin

我很高兴你找到愿意跟你这样玩的技术人员。但是我不认为你这种面试是一种有效的面试。

你可以很个性化的去面试,但是我不认为这个具备推广价值。

我对你的特定背景我是不了解的,但是我了解如何去评价一个技术人员。

我不知道贴了什么标签,我只是质疑这样面试是不是有效。

如果标题只是如何编写redis客户端,我想很多人会喜欢这种分享。

我也不会在任何异议,但是写着面试题,我觉得会有一定的误导。

@calidion 我并无意推广这样的面试方式,前面已经强调,这文章与面试无太大的联系。当然为了避免你再次看到这样的标题而引起不快,已经将该部分敏感内容删除了。

@leizongmin

谢谢分享,感谢理解。

Mark 自豪地采用 CNodeJS ionic

学习了

来自酷炫的 CNodeMD

好文,自己tcp/ip协议理解还比较弱,还要再学习一下。

老雷,我自己写了一个demo,发现一个问题,redis在发送完数据之后,会关闭tcp连接,请问这是怎么回事?我想让客户端和redis的tcp连接一直保持长连接,该怎么办?麻烦你有空的时候,帮我看一下。代码如下

const net = require("net");
const client = net.createConnection({port:6379,host:'127.0.0.1'},()=>{
	var data = "";
	client.on('data',(chunk)=>{
		data += chunk;
	});
	client.on('end',()=>{
		console.log("data:",data);
		console.log('end');
	});
	client.on('close',()=>{
		console.log('close');
	});
	client.on('error',()=>{
		console.log('error');
	});
	client.end('get name\r\n');
});

输出结果如图: 1.png

@muyoucun557 你使用的是client.end('get name\r\n');来发送命令,在命令发送完毕之后是你这一段主动关闭了传输通道,所以Redis服务端也关闭了它那一端。你应该改成client.send('get name\r\n');

@leizongmin 谢谢老雷。还是得多看文档,QAQ

@leizongmin 看老雷的这篇文章,处处都是惊喜(^__^) 嘻嘻……

回到顶部