利用进程通信实现Cluster共享内存
发布于 11 年前 作者 x6doooo 9788 次浏览 最后一次编辑是 8 年前

Node.js的标准API没有提供进程共享内存,然而通过IPC接口的send方法和对message事件的监听,就可以实现一个多进程之间的协同机制,通过通信来操作共享内存。 ##IPC的基本用法:

// worker进程 发送消息
process.send(‘读取共享内存’);
 
// master进程 接收消息 -> 处理 -> 发送回信
cluster.on('online', function (worker) {
     // 有worker进程建立,即开始监听message事件
     worker.on(‘message’, function(data) {
          // 处理来自worker的请求
          // 回传结果
          worker.send(‘result')
     });
});

在Node.js中,通过send和on(‘message’, callback)实现的IPC通信有几个特点。首先,master和worker之间可以互相通信,而各个worker之间不能直接通信,但是worker之间可以通过master转发实现间接通信。另外,通过send方法传递的数据,会先被JSON.stringify处理后再传递,接收后会再用JSON.parse解析。所以Buffer对象传递后会变成数组,而function则无法直接传递。反过来说,就是可以直接传递除了buffer和function之外的所有数据类型(已经很强大了,而且buffer和function也可以用变通的方法实现传递)。

基于以上特点,我们可以设计一个通过IPC来共享内存的方案:

1、worker进程作为共享内存的使用者,并不直接操作共享内存,而是通过send方法通知master进程进行写入(set)或者读取(get)操作。

2、master进程初始化一个Object对象作为共享内存,并根据worker发来的message,对Object的键值进行读写。

3、由于要使用跨进程通信,所以worker发起的set和get都是异步操作,master根据请求进行实际读写操作,然后将结果返回给worker(即把结果数据send给worker)。

##数据格式

为了实现进程间异步的读写功能,需要对通信数据的格式做一点规范。

首先是worker的请求数据:

requestMessage = {
    isSharedMemoryMessage: true,  // 表示这是一次共享内存的操作通信
    method: ‘set’, // or ‘get’ 操作的方法
    id: cluster.worker.id,  // 发起操作的进程(在一些特殊场景下,用于保证master可以回信)
    uuid: uuid,  // 此次操作的(用于注册/调用回调函数)
    key: key,  // 要操作的键
    value: value  // 键对应的值(写入)
}

master在接到数据后,会根据method执行相应操作,然后根据requestMessage.id将结果数据发给对应的worker,数据格式如下:

responseMessage = {
    isSharedMemoryMessage: true,  // 标记这是一次共享内存通信
    uuid: requestMessage.uuid,  // 此次操作的唯一标示
    value: value  // 返回值。get操作为key对应的值,set操作为成功或失败
}

规范数据格式的意义在于,master在接收到请求后,能够将处理结果发送给对应的worker,而worker在接到回传的结果后,能够调用此次通信对应的callback,从而实现协同。

规范数据格式后,接下来要做的就是设计两套代码,分别用于master进程和worker进程,监听通信并处理通信数据,实现共享内存的功能。

##User类

User类的实例在worker进程中工作,负责发送操作共享内存的请求,并监听master的回信。

var User = function() {
    var self = this;
    self.__uuid__ = 0;
 
    // 缓存回调函数
    self.__getCallbacks__ = {};
 
    // 接收每次操作请求的回信
    process.on('message', function(data) {
        
        if (!data.isSharedMemoryMessage) return;
        // 通过uuid找到相应的回调函数
        var cb = self.__getCallbacks__[data.uuid];
        if (cb && typeof cb == 'function') {
            cb(data.value)
        }
        // 卸载回调函数
        self.__getCallbacks__[data.uuid] = undefined;
    });
};
 
// 处理操作
User.prototype.handle = function(method, key, value, callback) {
 
    var self = this;
    var uuid = self.__uuid__++;
 
    process.send({
        isSharedMemoryMessage: true,
        method: method,
        id: cluster.worker.id,
        uuid: uuid,
        key: key,
        value: value
    });
 
    // 注册回调函数
    self.__getCallbacks__[uuid] = callback;
 
};
 
User.prototype.set = function(key, value, callback) {
    this.handle('set', key, value, callback);
};
 
User.prototype.get = function(key, callback) {
    this.handle('get', key, null, callback);
};

##Manager类

Manager类的实例在master进程中工作,用于初始化一个Object作为共享内存,并根据User实例的请求,在共享内存中增加键值对,或者读取键值,然后将结果发送回去。

var Manager = function() {
 
    var self = this;
    
    // 初始化共享内存
    self.__sharedMemory__ = {};
        
    // 监听并处理来自worker的请求
    cluster.on('online', function(worker) {
        worker.on('message', function(data) {
            // isSharedMemoryMessage是操作共享内存的通信标记
            if (!data.isSharedMemoryMessage) return;
            self.handle(data);
        });
    });
};
 
Manager.prototype.handle = function(data) {
    var self = this;
    var value = this[data.method](data);
 
    var msg = {
        // 标记这是一次共享内存通信
        isSharedMemoryMessage: true,             
        // 此次操作的唯一标示
        uuid: data.uuid,
        // 返回值
        value: value
    };
 
    cluster.workers[data.id].send(msg);
};
 
// set操作返回ok表示成功
Manager.prototype.set = function(data) {
    this.__sharedMemory__[data.key] = data.value;
    return 'OK';
};
 
// get操作返回key对应的值
Manager.prototype.get = function(data) {
    return this.__sharedMemory__[data.key];
};

##使用方法

if (cluster.isMaster) {
 
    // 初始化Manager的实例
    var sharedMemoryManager = new Manager();
 
    // fork第一个worker
    cluster.fork();
 
    // 1秒后fork第二个worker
    setTimeout(function() {
        cluster.fork();
    }, 1000);
      
} else {
 
    // 初始化User类的实例
    var sharedMemoryUser = new User();
 
    if (cluster.worker.id == 1) {
        // 第一个worker向共享内存写入一组数据,用a标记
        sharedMemoryUser.set('a', [0, 1, 2, 3]);
    }
 
    if (cluster.worker.id == 2) {
        // 第二个worker从共享内存读取a的值
        sharedMemoryUser.get('a', function(data) {
            console.log(data);  // => [0, 1, 2, 3]
        });
    }
   
}

以上就是一个通过IPC通信实现的多进程共享内存功能,需要注意的是,这种方法是直接在master进程的内存里缓存数据,必须注意内存的使用情况,这里可以考虑加入一些简单的淘汰策略,优化内存的使用。另外,如果单次读写的数据比较大,IPC通信的耗时也会相应增加。

完整代码:https://github.com/x6doooo/sharedmemory

21 回复

我们服务器是通过socket进行rpc进程通信的

为什么不使用redis或memcache等第三方服务,这样可以使应用更健壮~

你有没有测试并发的? 没有锁机制,你这代码根本用不了 调用 sharedMemoryUser.set(‘a’, [0, 1, 2, 3]); 内面 var uuid = self.uuid++; 这数据根本就乱了,代码根本用不上

真不明白这也能置顶。。。。。

可能你把uuid的作用理解错了。你可以试下能不能正确读写。如果有bug可以提。

var initSharedMemory = require('../lib/sharedmemory').init;
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;
var http = require('http');
var sharedMemoryController = initSharedMemory();
 

if (cluster.isMaster) {

    // transfer
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

} else {

    
	http.createServer(function(req,res){
		sharedMemoryController.set("aaa", "bbbb");
		res.end();
	}).listen(8000);
}
D:\APMServ5.2.6\Apache\bin>ab -c 100 -n 1000 http://127.0.0.1:8000/


245 ' uuid++++++++++'
244 ' uuid++++++++++'
230 ' uuid++++++++++'
246 ' uuid++++++++++'
256 ' uuid++++++++++'
245 ' uuid++++++++++'
231 ' uuid++++++++++'
257 ' uuid++++++++++'
246 ' uuid++++++++++'
258 ' uuid++++++++++'
247 ' uuid++++++++++'
247 ' uuid++++++++++'
232 ' uuid++++++++++'
259 ' uuid++++++++++'
248 ' uuid++++++++++'
233 ' uuid++++++++++'
248 ' uuid++++++++++'
260 ' uuid++++++++++'
234 ' uuid++++++++++'
261 ' uuid++++++++++'
235 ' uuid++++++++++'
262 ' uuid++++++++++'
236 ' uuid++++++++++'
249 ' uuid++++++++++'
237 ' uuid++++++++++'
263 ' uuid++++++++++'
250 ' uuid++++++++++'
249 ' uuid++++++++++'
250 ' uuid++++++++++'
User.prototype.handle = function(method, key, value, cb) {

    var self = this;
    var uuid = self.uuid();
	console.log(uuid," uuid++++++++++");
    process.send({
        isSharedMemoryMessage: true,
        method: method,
        id: cluster.worker.id,
        uuid: uuid,
        key: key,
        value: value
    });

    self.__getCallbacks__[uuid] = cb;

};

我把你的测试代码改了一下,你再用ab试一下,可以看到并发状态下,依然可以根据key读取到正确的值。

var initSharedMemory = require('../lib/sharedmemory').init;
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;
var http = require('http');
var sharedMemoryController = initSharedMemory();

if (cluster.isMaster) {

    // transfer
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

} else {

    var key, value;

    //先存好一些数据,键和值都是worker的id加上一个数字
    for (var i = 0; i < 1000; i++) {
        key = value = cluster.worker.id + '-' + i;
        sharedMemoryController.set(key, value);
    }

    //访问计数
    var count = 0;

    http.createServer(function(req,res){
  //这里根据访问次数获取前面存放的值
        var key = cluster.worker.id + '-' + count++;
        sharedMemoryController.get(key, function(data){
            //key能够获取到对应的值,则说明没问题
            console.log(key, ' => ', data);
        });
        res.end();
    }).listen(8000);
}

…哥,并发就是要解决写操作啊。。。。。。

var key = cluster.worker.id + ‘-’ + count++; 你这行代码已经是有问题了

反正js没有提供锁的机制,用并发都会把数据弄成垃圾 除非是只读,一个网站,有可能全是只读操作没有写操作吗?

还是等出E7 标准吧,看看有没有锁,没有也只能玩玩

哥,以你的例子来看

sharedMemoryController.set("aaa", "bbbb");

并发调用这个写入操作,会制造什么样垃圾数据?请稍微详细描述一下,谢谢。

@x6doooo @x6doooo var cont =0; sharedMemoryController.set(“aaa”, count++); sharedMemoryController.set(“aaa”, count++); sharedMemoryController.set(“aaa”, count++); 你看结果会怎样?? 我们要的结果是 count ==3 结果可能变成 1 或者 2 再来一个 假如我同时执行 操作1 sharedMemoryController.set(“aaa”, “aaa”); 操作2 sharedMemoryController.set(“aaa”, “bbbb”);

因为并发的原因,程序先执行了 操作2 再执行操作1 本来结果是 “bbbb” 变成 “aaa”

@solqkwgkwg 我大概明白你的纠结点了。

245 ' uuid++++++++++'
244 ' uuid++++++++++'
230 ' uuid++++++++++'
246 ' uuid++++++++++'
256 ' uuid++++++++++'
245 ' uuid++++++++++'
231 ' uuid++++++++++'

这是你上面打印的uuid,截了一小段。这里看上去是乱序的,但实际上单个进程是顺序的,并发调度给不同worker的次序不同,所以看上去是乱序的。 我在uuid的生成方法里加上时间戳和worker的id,可以看到每个worker的操作都是顺序的。 截一小段:

1397136263757-1-72
1397136263758-2-30
1397136263759-1-73
1397136263759-1-74
1397136263760-1-75
1397136263760-1-76
1397136263761-3-52
1397136263762-1-77
1397136263762-3-53
1397136263762-1-78
1397136263762-1-79
1397136263762-3-54
1397136263763-3-55
1397136263763-2-31
1397136263764-2-32
1397136263764-1-80
1397136263764-2-33
1397136263764-1-81

多个进程并发也是有顺序的,只不过在单位时间里调度给每个worker的请求不太均匀。(这个可能和系统有关,我暂时不太确定。)

如果单个进程里的set操作必须有顺序,可以用set的回调执行,比如: sharedMemoryController.set(“aaa”, “aaa”, function() { sharedMemoryController.set(“aaa”, “bbbb”); });

@x6doooo 你还是不理解并发啊。。。。。。。

@x6doooo

250 ’ uuid++++++++++' 249 ’ uuid++++++++++‘ 250 ’ uuid++++++++++’

我1000次请求,最终是 1000才对, 你的 uid 又是跟处理方法绑定的, 你的内部代码已经乱了,还不是最严重?

@x6doooo 对并发有兴趣的话,可以看看 java 并发编程

@solqkwgkwg

以你说的这个count问题来解释一下。

假设共享内存里有一个count,初始值是0,每次worker接到一个http请求,就给master发通信,让这个count值递增一次。那么不考虑并发的话可以这样写:

// 先取回当前的count值 sharedMemoryController.get(‘count’, function(data){ // 在当前count值上递增 并写回共享内存 sharedMemoryController.set(‘count’, data + 1); });

非并发情况,这个流程会执行正确,但如果是并发情况,这个操作确实不成立。因为get和set都是异步,并发时这样一个流程不会连续执行。解决这个问题其实很简单,也不需要锁,还是在现有的通信机制上完成。因为无论并发多少,最终写入操作都是master进程执行的,所以只要发一个递增信号给master,让master做递增就好了。比如可以增加一个plusOne方法,每次worker接到请求,发送一个plusOne(‘count’)请求,master就会给count加1,这种情况并发多少,master也会将count计算正确。

plusOne方法的通信机制和set是一只样的,只不过递增这一步放在了master进程执行。

我把测试代码放到github上了,可以按下面的步骤来执行:

// 1、启动服务
node sharedmemory/test/test-acc.js

// 2、ab
ab -c 100 -n 1000 http://127.0.0.1:1337/

脚本会每10秒打印一次count值,并且把每个worker响应请求的次数打印出来。可以看到类似这种结果:

共享内存中count值:1000
worker-3响应请求次数:242
worker-4响应请求次数:279
worker-2响应请求次数:233
worker-1响应请求次数:246

ps:

我总觉得你误会uuid的作用了。user类里的uuid主要是标记通信用的,每次user发的通信会有一个uuid,用于注册和调用回调函数用的,不论是写还是读,每次操作能够得到这次通信的回信,那么就视为执行正确。

最初的set就是一个单纯的写入方法,一些简单的写入操作,即便并发状态,用set也没问题,这个要看使用场景。有些并发确实不能简单的使用set,比如上面这个计数问题。但基于这个模块的通信机制可以做扩展,去处理不同场景的并发需求。

以上。

@x6doooo 哥这根本行不通 进行数据修改必须是先读取这一步 至于写操作全丢给主进程处理,也行不通,因为一个处理方法很多读写逻辑代码,根本无法分离。

@solqkwgkwg 我只是想证明通过IPC通信是可以实现多进程协同的,而且可以在一些并发场景下工作。比如上面说的计数问题,证明并发的写入是可以实现的。当然这里逻辑比较简单,随着逻辑复杂度的增加,肯定还要引入其他处理机制。异步IO不是万能的,但也不是所有并发都需要锁,都要看应用场景。

我觉得浪费时间纠结在Cluster上很没必要。 如果是缓存数据, 无论是考虑到scale或者轻node进程, memcached或者redis肯定是最好的选择。 如果是信息通知, 考虑到scale, 一步到位直接redis或者zmq多方便。

回到顶部