发布一个node.js编写的消息服务器,支持最新的websocket草案
发布于 13 年前 作者 mallon 8054 次浏览 最后一次编辑是 8 年前

<p>简介 Json Messaging是使用node.js技术构建的发布/订阅类型的消息服务器,具有如下特性: 1、支持TCP和WebSocket协议; 2、传输帧使用Json格式; 3、可以使用正则表达式订阅消息目的地,正则表达式中可以包含“捕获”,所有目的地匹配该正则表达式的消息,连同目的地的“捕获”都将发送到订阅方; 4、一个客户端可以订阅多个消息目的地; 5、为了简化设计,服务器端不持久化消息。 项目 https://sourceforge.net/projects/jsonmessaging/ 下载 https://sourceforge.net/projects/jsonmessaging/files/ 致谢 Json Messaging消息服务器使用了很多第三方的框架和技术,感谢他们辛勤的工作。 Json:http://www.json.org node.js:http://nodejs.org node-uuid:https://github.com/broofa/node-uuid WebSocket-Node:https://github.com/Worlize/WebSocket-Node 使用方法 1、编译安装node.js; 2、打开server/config.js,可以配置TCP和WebSocket端口; 3、启动消息服务器:node server/server.js。 例子 使用最新版本的Firefox或者Chrome打开下面的HTML文件可以发送和接收消息。 <!DOCTYPE html>

Json Messaging Example

    div#output {
        border: 1px solid #000;
        width: 960px;
        height: 450px;
        overflow: auto;
        background-color: #333;
        color: #6cf;
    }</p>

<pre><code> strong { color: #f66; }

input#input {
    border: 1px solid #000;
    width: 640px;
}

button {
    border: 1px solid #000;
    width: 100px;
}

</style> <script> // connect to the Json Messaging server and return an ‘connection’ object function connect(host, port, messageListener, errorListener) { window.WebSocket = window.WebSocket || window.MozWebSocket;

    if (!window.WebSocket) {
        alert('Your browser does not support WebSocket.');
        return null;
    }

    var connection = new WebSocket('ws://' + host + ':' + port);

    connection.onmessage = function(message) {
        try {
            var parsed = JSON.parse(message.data);
            switch (parsed.type) {
                case 'message':
                    if (messageListener) {
                        messageListener(parsed.content, parsed.match);
                    }
                    break;
                case 'error':
                    if (errorListener) {
                        errorListener(parsed.content);
                    }
                    break;
                default:
                    throw new Error('Unknown message type ' + parsed.type);
                    break;
            }
        } catch (e) {
            console.warn(e);
            alert(e);
        }
    };

    connection.publish = function(content, destination) {
        connection.send(JSON.stringify({
            type: 'publish',
            destination: destination,
            content: content
        }));
    };

    connection.subscribe = function(destination) {
        connection.send(JSON.stringify({
            type: 'subscribe',
            destination: destination
        }));
    };

    connection.unsubscribe = function(destination) {
        connection.send(JSON.stringify({
            type: 'unsubscribe',
            destination: destination
        }));
    };

    return connection;
}

// the 'connection' object
var connection = null;

var output = null;

var input = null;

// initialize
window.onload = function() {
    output = document.getElementById('output');
    input = document.getElementById('input');

    // connect to the local server
    connection = connect(
            'localhost',
            8155,
            // message handler
            function(content, match) {
                output.innerHTML += ('&lt;strong&gt;Message: &lt;/strong&gt;' + content + '&lt;br&gt;\n');
            },
            // error handler
            function(content) {
                output.innerHTML += ('&lt;strong&gt;Error: &lt;/strong&gt;' + content + '&lt;br&gt;\n');
            }
    );

    // subscribe a topic
    connection.onopen = function() {
        connection.subscribe('test');
    };
};

function _send() {
    connection.publish(input.value, 'test');
}

function _clear() {
    output.innerHTML = '';
}

</script> </code></pre>

<p> </p>

<p> Send Clear

下面的C程序发送三条HelloWorld消息,第一条是英文,第二条是中文,第三条是Unicode转义的中文。注意源代码必须以UTF-8编码保存。</p>

<h1>include </h1>

<h1>include </h1>

<h1>include </h1>

<h1>include </h1>

<h1>include </h1>

<h1>include </h1>

<h1>include </h1>

<h1>include </h1>

<p>int main(int argc, char ** argv) { int fd; struct sockaddr<em>in addr; int ret; const char publish</em>frame<em>1[] = "{“type”:“publish”,“destination”:“test”,“content”:“Hello World”}"; const char publish</em>frame<em>2[] = "{“type”:“publish”,“destination”:“test”,“content”:“你好世界”}"; const char publish</em>frame_3[] = "{“type”:“publish”,“destination”:“test”,“content”:"\u4f60\u597d\u4e16\u754c"}";</p>

<pre><code>fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(“127.0.0.1”); addr.sin_port = htons(8153);

ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); printf("%d\n", ret);

ret = write(fd, publish_frame_1, sizeof(publish_frame_1)); printf("%d\n", ret);

ret = write(fd, publish_frame_2, sizeof(publish_frame_2)); printf("%d\n", ret);

ret = write(fd, publish_frame_3, sizeof(publish_frame_3)); printf("%d\n", ret); </code></pre>

<p>} 帧格式 消息服务器的应用层数据帧采用Json格式,使用UTF-8编码的纯文本,在TCP协议中,使用’\0’作为帧间分隔,在WebSocket协议中遵循WebSocket草案标准。 帧格式有5类,其中,客户端到服务器端的3类,服务器端到客户端的2类。 客户端到服务器端 发布帧 客户端发送一条消息到服务器的目的地中,所有连接到服务器并且订阅了该目的地(正则表达式匹配)的客户端都能接收到该消息。 帧格式为: { “type”: “publish”, “destination”: <消息目的地>, “content”: <消息内容> } 其中,消息目的地为字符串类型;消息内容同样也必须是Json格式的。 订阅帧 客户端订阅服务器的一个目的地,所有匹配该目的地的消息都会发送到该客户端。 帧格式为: { “type”: “subscribe”, “destination”: <消息目的地> } 其中,消息目的地可以是正则表达式,且正则表达式中可以含有“捕获”,服务器会使用该正则表达式匹配发送的消息目的地,如果符合,则会把该消息连同匹配结果一并发给客户端,在下面的“消息帧”介绍中有具体的例子。 同一个客户端可以订阅多个目的地。 取消订阅帧 客户端取消订阅服务器的一个目的地。 帧格式为: { “type”: “subscribe”, “destination”: <消息目的地> } 其中,消息目的地等于订阅帧中的消息目的地。 当客户端断开连接后,服务器端会自动取消该客户端的所有订阅。 服务器端到客户端 消息帧 一旦消息目的地匹配,服务器端会把匹配结果连同消息内容发给客户端。 帧格式为: { “type”: “message”, “match”: <匹配结果>, “content”: <消息内容> } 匹配结果为一个数组,至少包含一个元素,即订阅的消息目的地;如果订阅的消息目的地是正则表达式且其中含有“捕获”,那么从第二往后的元素为捕获结果,参考JavaScript 正则表达式规范。 举例: 假设设备的网口状态信息在消息服务中发布,规定目的地格式为:“/devices/<设备名>/<网口名>”;消息内容为:“down”表示停止、“up”表示启动。 下面两个发布帧,表示设备a的第1个网口停止了,而设备b的第0个网口启动了: {“type”:“publish”,“destination”:"/devices/a/if1",“content”:“down”} {“type”:“publish”,“destination”:"/devices/b/if0",“content”:“up”} 如果客户端订阅目的地为“/devices/.<em>”,那么它将能收到所有设备的所有网口的状态消息,接收到的消息帧如下: {“type”:“message”,“match”:["/devices/a/if1"],“content”:“down”} {“type”:“message”,“match”:["/devices/b/if0"],“content”:“up”} 如果想在程序中更方便地对设备和网口做分类处理,可以把订阅目的地改为“/devices/(.</em>)/(.*)”,其中小括号即为“捕获”。 接收到的消息帧会变为: {“type”:“message”,“match”:["/devices/a/if1",“a”,“if1”],“content”:“down”} {“type”:“message”,“match”:["/devices/b/if0",“b”,“if0”],“content”:“up”} 可以看到,match中增加了捕获的结果。 错误帧 如果服务器端产生错误,例如客户端发送的帧超长、非Json格式等,将会向客户端返回错误帧。 帧格式为: { “type”: “error”, “content”: <错误内容> } 客户端可以对错误进行相应的处理。 源代码结构 server.js 程序入口。 config.js 全局配置,其中的udpPort并没有使用,因为UDP难以知晓客户端状态,所以不打算实现UDP协议。 log.js 控制台日志,相比其它第三方的日志模块的特点是使用简单,而且能够输出日志产生的源代码的位置,便于调试。 protocol.js 协议帧的包装。 exchange.js 负责处理发布和订阅的消息,是服务器代码的核心部分。 tcp.js TCP协议的实现。 ws.js WebSocket协议的实现。</p>

1 回复

@mallon 格式有问题喔

回到顶部