Node 下 Http Streaming 的跨浏览器实现
发布于 14 年前 作者 qingdu 10887 次浏览 最后一次编辑是 8 年前

<p>最近考虑把整个前端架构使用http streaming方式实现<br /> <br/>对这方面做了一下调研,并在在node上实现了一个简单的原型</p> <br/><p>顺带提一下,<br /> <br/>楼下pengchun同学所提到的node chat使用的是longpoll的模型<br /> <br/>和httpstreaming同属与http comet的解决方案.<br /> <br/>不过在具体http连接的处理上有所不同<br /> <br/>long poll在数据通道每次接收到一个数据包后即关闭连接,并立即重新打开数据通道<br /> <br/>http streaming的数据通道始终处于打开状态.<br /> <br/>具体的介绍可以看这里 <a href=“http://en.wikipedia.org/wiki/Comet_(programming)#Streaming”>http://en.wikipedia.org/wiki/Comet_(programming)#Streaming</a></p> <br/><p>一些细节:<br /> <br/>由于ie下在xhr readystate=3时无法取得responseText,<br/ > <br/>因此在ie下改为通过使用htmlfile控件调用iframe实现<br /> <br/>另在输出正式数据前需现输出1k的噪音数据<br /> <br/>以解决浏览器的阻塞问题 <br/></p> <br/><p>原型功能设计如下</p> <br/><p><img title=“pipe.png” src=“http://static.data.taobaocdn.com/up/nodeclub/2011/01/pipe.png” border=“0” alt=“pipe.png” width=“600” height=“407” /></p> <br/><p>具体代码如下</p> <br/><p>pipe.js: 主服务程序</p> <br/><pre escaped=“true” lang=“javascript” line=“1”> <br/>var http = require(‘http’), <br/> fs = require(‘fs’), <br/> url = require(‘url’), <br/> page = null; <br/>// static files read & watch <br/>var readFile = function(files) { <br/> var buffers = {}; <br/> // sync read <br/> var fread = function(file, cb){ <br/> fs.readFile(file, ‘binary’, function(err, data){ <br/> if (err) { <br/> throw err; <br/> } <br/> buffers[file] = new Buffer(data, ‘binary’); <br/> console.log(‘load’, file) <br/> }); <br/> } <br/> // watch changes <br/> var watch = function watch(file) { <br/> fs.watchFile(file, {persistent: true, interval: 100}, function (curr, prev) { <br/> if (curr.mtime.getTime() != prev.mtime.getTime()) { <br/> fread(file); <br/> } <br/> }); <br/> } <br/> // run all files <br/> for (var i = 0; i < files.length; i++) { <br/> watch(files[i]); <br/> fread(files[i]); <br/> } <br/> return buffers; <br/>} <br/>// http query <br/>var httpQuery = function(u, cb){ <br/> console.log(‘http begin’); <br/> // parse url <br/> var uinfo = url.parse(u); <br/> // create client <br/> var client = http.createClient(uinfo.port ? uinfo.port : 80, uinfo.hostname, false); <br/> var uri = uinfo.pathname + (uinfo.search ? uinfo.search : ‘’); <br/> var req = client.request(‘GET’, uri, {‘host’: uinfo.hostname}); <br/> // send request <br/> req.end(); <br/> console.log(‘http request sent’); <br/> <br/> <br/> var len = 4096; <br/> var pointer = 0; <br/> var extendFactor = 2; <br/> // response start <br/> req.on(‘response’, function (res) { <br/> if (res.headers[‘content-length’]) { <br/> len = parseInt(res.headers[‘content-length’]); <br/> } <br/> // body init <br/> var body = new Buffer(len); <br/> // chunk recived <br/> res.on(‘data’, function(chunk){ <br/> // extends <br/> if (pointer + chunk.length > len) { <br/> len *= extendFactor; <br/> body = body.copy(new Buffer(len), 0, 0); <br/> console.log(‘proxy extend to’, len); <br/> } <br/> // copy chunk to buf <br/> chunk.copy(body, pointer, 0); <br/> // move pointer <br/> pointer += chunk.length; <br/> }) <br/> // response end <br/> res.on(‘end’, function() { <br/> cb(body.length > pointer ? body.slice(0, pointer) : body); <br/> console.log(‘proxy end’, pointer); <br/> }); <br/> }) <br/>} <br/> <br/>// main server <br/>var server = http.createServer(function (req, res){ <br/> // main page <br/> if (req.url == ‘/’) { <br/> res.writeHeader(200); <br/> res.end(page[“pipe.html”]); <br/> // time serve <br/> } else if (req.url == ‘/time’) { <br/> res.writeHeader(200); <br/> res.end(new Date().toString()); <br/> // iframe recv <br/> } else if (req.url.match(/^/iframe//)) { <br/> var clientid = parseInt(req.url.substr(8)); <br/> pipeClient.add(clientid, res, pipeClient.iframe); <br/> console.log(‘iframe connect’, clientid); <br/> // ajax recv <br/> } else if (req.url.match(/^/ajax//)) { <br/> var clientid = parseInt(req.url.substr(6)); <br/> pipeClient.add(clientid, res, pipeClient.ajax); <br/> console.log(‘ajax connect’, clientid); <br/> // request listen <br/> } else if (req.url.match(/^/req//)) { <br/> res.writeHeader(200,{ <br/> ‘Cache-Control’: ‘no-cache, must-revalidate’ <br/> }); <br/> res.end(); <br/> // url parse <br/> var clientid = parseInt(req.url.substr(5, 13)); <br/> // get page <br/> httpQuery(“http://localhost:8000/time”, function (data){ <br/> console.log(data.toString()); <br/> pipeClient.write(clientid, data); <br/> console.log(“write”, clientid, data.length); <br/> }); <br/> // error pages <br/> } else { <br/> res.writeHeader(404, {“Content-Type” : “text/html”}); <br/> res.end(); <br/> } <br/>}); <br/> <br/>var pipeClient = { <br/> timeout : 30000, <br/> client : {}, <br/> prefix : “<script>s(’”, <br/> suffix : “’);</script>”, <br/> iframe : ‘iframe’, <br/> ajax : ‘ajax’, <br/> noise : null, <br/> noiseSize : 1024, <br/> page : null, <br/> init : function(){ <br/> this.noise = new Buffer(1024); <br/> for (var i = 0; i < this.noiseSize; i++) { <br/> this.noise[i] = 32; <br/> } <br/> this.page = readFile([‘iframe.html’]); <br/> }, <br/> add : function(id, res, type) { <br/> <br/> if (type == this.ajax) { <br/> res.writeHeader(200, { <br/> ‘Cache-Control’: ‘no-cache, must-revalidate’ <br/> }); <br/> res.write(this.noise); <br/> } else { <br/> res.writeHeader(200, { <br/> “Content-Type” : “multipart/x-mixed-replace”, <br/> ‘Cache-Control’: ‘no-cache, must-revalidate’ <br/> }); <br/> res.write(this.page[‘iframe.html’]); <br/> res.write(this.noise); <br/> } <br/> this.client[id] = { <br/> res : res, <br/> type : type, <br/> tm : setTimeout(function(){ <br/> pipeClient.close(id); <br/> }, this.timeout) <br/> }; <br/> }, <br/> close : function (id) { <br/> console.log(“client close”, id) <br/> this.client[id].res.end(); <br/> this.client[id].res = null; <br/> delete this.client[id]; <br/> }, <br/> write : function (id, data) { <br/> clearTimeout(this.client[id].tm); <br/> this.client[id].tm = setTimeout(function(){ <br/> pipeClient.close(id); <br/> }, this.timeout); <br/> this.client[id].res.write(this.format(data, this.client[id].type)); <br/> <br/> }, <br/> format : function(data, type) { <br/> // with iframe <br/> if (type == this.iframe) { <br/> var buf = new Buffer(this.prefix.length + data.length + this.suffix.length); <br/> buf.write(this.prefix, 0, ‘binary’); <br/> data.copy(buf, this.prefix.length, 0); <br/> buf.write(this.suffix, this.prefix.length + data.length); <br/> // with ajax <br/> } else { <br/> var buf = new Buffer(data.length + 8); <br/> // set length <br/> buf.write(data.length.toString(16), 0, ‘binary’); <br/> // space padding <br/> for (var i = data.length.toString(16).length; i < 8; i++) { <br/> buf[i] = 32; <br/> } <br/> // set data <br/> data.copy(buf, 8, 0); <br/> } <br/> console.log(buf.toString()); <br/> return buf; <br/> } <br/>} <br/>pipeClient.init(); <br/> <br/>page = readFile([‘pipe.html’]); <br/>setTimeout(function(){ <br/> server.listen(8000); <br/>}, 500); <br/></pre> <br/> <br/><p>pipe.html: 客户端程序</p> <br/><pre escaped=“true” lang=“javascript” line=“1”> <br/><!DOCTYPE html> <br/><html lang=“zh_CN”> <br/><head> <br/><meta charset=“utf-8” /> <br/><link href=‘http://fonts.googleapis.com/css?family=Droid+Serif’ rel=‘stylesheet’ type=‘text/css’> <br/><title>Comet Pipe Demo</title> <br/><script type=“text/javascript” src=“http://www.google.com/jsapi”></script> <br/><script type=“text/javascript”> <br/>google.load(“jquery”, ‘1.4’); <br/>google.setOnLoadCallback(function(){ <br/>$(document.body).ready(function(){ <br/> <br/> <br/>var Comet = function(options) { <br/> // options init <br/> var opt = { <br/> domain : document.domain, <br/> req : ‘/req’, <br/> iframe : ‘/iframe’, <br/> ajax : ‘/ajax’, <br/> timeout : 300, <br/> cb : null, <br/> cl : null, <br/> ajaxFlagLen : 10, <br/> ajaxNoiseLen : 1024 <br/> } <br/> for (k in options) { <br/> opt[k] = options[k]; <br/> } <br/> // calculate bytes length for string <br/> var blen = function(str) { <br/> var arr = str.match(/[^\x00-\xff]/ig); <br/> return arr == null ? str.length : str.length + arr.length; <br/> } <br/> // some arguments init <br/> var pageid = new Date().getTime(); <br/> var type = $.browser.msie ? ‘iframe’ : ‘ajax’; <br/> // public props <br/> var me = { <br/> pageid : pageid, <br/> type : type, <br/> recvUrl : opt[type] + ‘/’ + pageid, <br/> reqUrl : opt.req + ‘/’ + pageid + ‘/’ + type, <br/> conn : null, <br/> div : null, <br/> send : function(){ <br/> $.get(this.reqUrl); <br/> } <br/> } <br/> // init iframe receiver <br/> if (me.type == ‘iframe’) { <br/> // create htmlfire <br/> me.conn = new ActiveXObject(“htmlfile”); <br/> me.conn.open(); <br/> // set domain <br/> me.conn.write("<html><script>document.domain = ‘"+opt.domain+"’</html>"); <br/> me.conn.close(); <br/> me.div = me.conn.createElement(“div”); <br/> me.conn.appendChild(me.div); <br/> me.conn.parentWindow.send = function (data) { <br/> opt.cl ? opt.cb.call(opt.cl, data) : opt.cb(data); <br/> }; <br/> me.div.innerHTML = “<iframe id=‘comet’ src=’” + me.recvUrl + “’></iframe>”; <br/> // init ajax receiver <br/> } else { <br/> me.conn = new XMLHttpRequest(); <br/> <br/> me.conn.open(“GET”, me.recvUrl, true); <br/> var pos = 0; <br/> var chunk = ‘’; <br/> var data = ‘’; <br/> var size = -1; <br/> var noise = 0; <br/> me.conn.onreadystatechange = function(){ <br/> if (this.readyState == 3 || this.readyState == 4) { <br/> // chars length <br/> chunk = this.responseText.substr(pos); <br/> pos = this.responseText.length; <br/> if (noise < opt.ajaxNoiseLen) { <br/> noise += chunk.length; <br/> return; <br/> } <br/> // bytes length <br/> if (size < 0) { <br/> size = parseInt(chunk.substr(0, 8), 16); <br/> chunk = chunk.substr(8); <br/> } <br/> data += chunk; <br/> // one done <br/> if (blen(data) == size) { <br/> opt.cl ? opt.cb.call(opt.cl, data) : opt.cb(data); <br/> size = -1; <br/> data = ‘’; <br/> } <br/> } <br/> <br/> } <br/> me.conn.send(); <br/> } <br/> return me; <br/>} <br/> <br/>function showMessage(msg){ <br/> var dom = document.createElement(‘h4’); <br/> dom.innerHTML = msg; <br/> var prt = document.getElementById(‘msg’); <br/> if (prt.hasChildNodes()) { <br/> prt.insertBefore(dom, prt.firstChild) <br/> } else { <br/> prt.appendChild(dom); <br/> } <br/>} <br/>window.setTimeout(function(){ <br/> if ($.browser.msie) { <br/> $(’#btn’).html(’<input type=“submit” id=“go_iframe” value=“Get time with iframe” />’); <br/> } else { <br/> $(’#btn’).html(’<input type=“submit” id=“go_ajax” value=“Get time with ajax” />’); <br/> } <br/> var c = new Comet({ <br/> cb : function(data) { <br/> showMessage(data); <br/> } <br/> }); <br/> <br/> $(’#go_iframe’).click(function(){ <br/> c.send(); <br/> }); <br/> $(’#go_ajax’).click(function(){ <br/> c.send(); <br/> }); <br/>}, 0); <br/> <br/> <br/>}); <br/>}); <br/></script> <br/></head> <br/><body> <br/><div id=“btn”></div> <br/><div id=“msg”></div> <br/><pre id=“console”></pre> <br/></body> <br/></html> <br/></pre> <br/><p>iframe.html: ie下iframe模式运行的输出头</p> <br/><pre escaped=“true” lang=“html4strict” line=“1”> <br/><!DOCTYPE html> <br/><html lang=“zh_CN”> <br/><head><meta charset=“utf-8” /></head> <br/><body> <br/><script> <br/>function s(msg) { <br/> window.parent.send(msg); <br/>} <br/></script> <br/> <br/></pre>

6 回复

developerworks上的一篇经典的文章: <br/>http://www.ibm.com/developerworks/cn/web/wa-lo-comet/

30秒后由服务器端向客户端传输 数据的通道就关闭了?

对, 这里是开发时为了方便,避免浏览器的http并发上限阻塞用的 <br/>实际系统中可以去掉这块 <br/>或者在client中增加重连的功能

原来就在这里。。。先收藏

pipe通道在一段时间没有数据返回将会中断,服务器端再向它发送数据就会无效了。监听response的error事件又无法捕获到错误事件,这样会导致坏死的client链接越来越多。需要一种机制来处理这个问题。

“另在输出正式数据前需现输出1k的噪音数据” 这个是可以去掉的.只需设置’Content-Type’: ‘text/html;charset=utf-8’ 即可.注意其中charset声明是关键,不然浏览器不会实时渲染.测试案例可参考这个:https://gist.github.com/2419203

回到顶部