使用RabbitMQ,WebSocket搭建IM (抛砖引玉)
发布于 9 年前 作者 Tonyce 23946 次浏览 最后一次编辑是 8 年前 来自 分享

尝试使用RabbitMQ,WebSocket搭建IM

最近一直在了解 RabbitMQ,想着用消息队列来实现 IMWebSocket 已经成熟,即时通讯发展的也很好,可是自己还没了解尝试过。之前想着是把所有的 socket 连同 id 做一个 k-v。可总感觉不靠谱。后来听到可以使用消息队列。所以就尝试下,摸索下,发现困难,解决困难。

RebbitMQ 的资料没有找多少,官网看起来有些费力。索性就直接看demo。这样会比较快些的了解。感觉一定了解后再去扒文档就会比较简单了。

跟着官网的六个demo走了两天。迷迷糊糊的了解点。所以就开始上实际业务需求,至少先跟 WebSocket 扯上边。千里之行,始于足下。关键得开始。

$ npm init

$ npm install websocket --save

$ npm install amqplib --save

走起项目才是真的。copy WebSocket 中的 example。WebSocket GitHub。 先把 WebSocket 跑起来。

WebSocket 起来了,下一步就是将socket连到 RabbitMQ 。让 RabbitMQ 根据 message 中的 to.id来分配给相应的socket。

message格式:

{
	"from": {
		"id": "1"
	},
	"to": {
		"id": "2"
	},
	"content": "hello"
}

贴下server.js代码:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');
var WebSocketServer = require('websocket').server;
var http = require('http');

global._MQConn = '';

amqp.connect('amqp://localhost', function(err, conn) {
    global._MQConn = conn;
    conn.on('close', function () {
        global._MQConn = '';
    })
});

var server = http.createServer(function(request, response) {
    console.log((new Date()) + ' Received request for ' + request.url);
    // response.writeHead(404);
    response.end("hello");
});

server.listen(8080, function() {
    console.log((new Date()) + ' Server is listening on port 8080');
});

wsServer = new WebSocketServer({
    httpServer: server,
    autoAcceptConnections: false
});

wsServer.on('request', function(request) {

    var connection = request.accept('echo-protocol', request.origin);
    var id = request.resourceURL.path.replace('/', '');

    consumeToClient(connection, id);
    
    connection.on('message', function(message) {
        // console.log("message", message);
        pushToMQ(message);
    });
    connection.on('close', function(reasonCode, description) {
        //stopConsume
        connection.MQChannel && connection.MQChannel.close();
        console.log((new Date()) + ' Peer ' + connection.remoteAddress + ' disconnected.');
    });
});

//emit to mq
function pushToMQ (message) {
    // console.log(typeof message)
    var data = ''
    if (message.type === 'utf8') {
        data = message.utf8Data;
    } else if (message.type === 'binary') {
        data = message.binaryData;
    }
    data = JSON.parse(data)
    var _ch = ""
    var MQConn = global._MQConn;
    var ex = 'direct_message';
    var severity = `${data.to.id}`
    
    if (!MQConn) {
        console.log("MQConn closed");
        return;
    }

    message = JSON.stringify(message);
    if (_ch) {
        _ch.assertExchange(ex, 'direct', {durable: true});
        _ch.publish(ex, severity, new Buffer(message), {persistent: true});
    }else {
        MQConn.createChannel(function(err, ch) {
            _ch = ch
            ch.assertExchange(ex, 'direct', {durable: true});
            ch.publish(ex, severity, new Buffer(message), {persistent: true});
        });
    }
}

//consume mq
function consumeToClient (connection, id) {
    
    var MQConn = global._MQConn;
    var _ch = "";
    
    if (!MQConn) {
        console.log("MQConn closed");
        return;
    }

    var ex = 'direct_message';
    var severity = `${id}`

    if (_ch) {
        consume(_ch)
    }else {
        MQConn.createChannel(function(err, ch) {
            _ch = ch
            consume(_ch)
        });
    }

    function consume(_ch) {
        connection.MQChannel = _ch;

        _ch.assertExchange(ex, 'direct', {durable: true});

        _ch.assertQueue('', {exclusive: true}, function(err, q) {
            _ch.bindQueue(q.queue, ex, severity);
            _ch.consume(q.queue, function(msg) {
                var message = msg.content.toString();
                    message = JSON.parse(message);
                // console.log("consumeing", message);
                if (message.type === 'utf8') {
                    // console.log('Received Message: ' + message.utf8Data);
                    connection.sendUTF(message.utf8Data);
                } else if (message.type === 'binary') {
                    // console.log('Received Binary Message of ' + message.binaryData.length + ' bytes');
                    connection.sendBytes(message.binaryData);
                }
            }, {noAck: true});
        });
    }
}

大概逻辑就是,用户登录后,将id作为path连到 WebSocket 这样就能将id作为‘路由’将 socket 绑定到相应的 queue上

	···
    var severity = `${id}`
    ···
	_ch.bindQueue(q.queue, ex, severity);
	···

**这样发给这个 idmessage 就会被相应的 channel 消耗到这个id的 socket。将message放到相应的路由下也是同样的逻辑。pushToMQ (message) **

这是server端,client就比较简单了,就不再贴了。详细请看 GitHub

现在初步的就先完成了这些,还有一些像离线处理或其它问题还没考虑到,也肯定有很多不足和纰漏,也希望大家能建议更好的解决方法或书籍资料或方向。

谢谢,期待大家指点。

Blog 地址 尝试使用RabbitMQ,WebSocket搭建IM

14 回复

我把RabbitMQ的Demo翻译了一下,且用了Nodejs来实现,献丑了,你看对你有没帮助

目录

Hello_World

@CoderIvan 太有帮助啦,谢谢

目前正在做类似的事情~ 可以交流下

可以试一下用socket.io.js

刚继续写这个聊天的东西。http://www.stechclub.com/ 这里有。另外,可以查看github,目前在剥离一些核心业务,做成一个类似qq的web聊天工具。

@jasoncodingnow 好的 学习 共勉

问一下楼主,这么设计是不是每有一个用户登录,就会创建一个属于他自己的queue,然后设置routingkey为自己的ID?

@decadenceqi 目前我是这样做的

@Tonyce 然后没一个用户创建一个mq的connection嘛 mq有connection上限吧

@decadenceqi 上限这个问题还没考虑过

@Tonyce 噢 明白了 谢谢哈

@Tonyce 楼主还在做im 吗?可以分享下吗?

socket.io完全就可以满足需求啊

回到顶部