Node.js的pipe和unpipe问题,可能是个bug或?
发布于 10 年前 作者 wangjingbo 11459 次浏览 最后一次编辑是 8 年前 来自 问答

1、看代码

var net=require('net');
var file=require('fs').createWriteStream('./message.txt');
var server=net.createServer();
server.on('connection',function(socket){
	socket.setTimeout(10*1000);
	socket.pause();
	 //socket.pipe(file); //推荐仅调用一次pipe
	socket.on('timeout',function(){
		socket.resume();
		socket.pipe(file);  //结果是:每一次调用都会增加一遍数据写入
	});

	socket.on('data',function(data){
		socket.unpipe(file); //无法成功,????????????????
        socket.pause();
	});
});
server.listen('8431','localhost');

2、测试结果 第一次输入a,第二次输入b,第三次输入c,结果变为abbccc而不是abc 3、分析 猜测:每次调用pipe就生成一个全新的隧道,因此,每次写数据都要从递增的多个隧道进行写入——就变成N倍了。

4、用unpipe取消每次递增的隧道

var net=require('net'); 
var file=require('fs').createWriteStream('./message.txt'); 
var server=net.createServer(); 
server.on('connection',function(socket){ 
    socket.setTimeout(10*1000); 
    socket.pause(); 
     //socket.pipe(file); 
    socket.on('timeout',function(){ 
        socket.resume(); 
        socket.pipe(file); 
    }); 
 
    socket.on('data',function(data){ 
        socket.unpipe(file); //无法成功,???????????????? 
               socket.pause(); 
    }); 
}); 
server.listen('8431','localhost'); 

结果:失败。unpipe根本无法取消pipe创建的隧道。 原因:参考https://groups.google.com/forum/#!topic/nodejs/75R0dOZV1UI 反正没找到解决方案(除非调用socket.removeAllListeners(); 但socket就无法继续监听data了,除非另建监听器,在循环下基本不可能)。

5、根据原因,仅调用一次pipe

var net=require('net'); 
var file=require('fs').createWriteStream('./message.txt'); 
var server=net.createServer(); 
server.on('connection',function(socket){ 
    socket.setTimeout(10*1000); 
    socket.pause(); 
        socket.pipe(file); // 只调用一次pipe 
    socket.on('timeout',function(){ 
        socket.resume(); 
        //socket.pipe(file); //注释,避免多次调用pipe 
    }); 
    socket.on('data',function(data){ 
        socket.pause(); 
    }); 
}); 
server.listen('8431','localhost'); 

结果:OK

6、问题:依然无解。 还是无法unpipe,除非没有任何嵌套,例如:

var net=require('net');
var fs=require('fs');
var file=fs.createWriteStream('./message.txt');
var server=net.createServer();
server.on('connection',function(socket){
    socket.pipe(file,{end:false});       //当客户端关闭时,不会立即关闭file的写操作
    socket.on('data',function(data){         
         if(data.toString()=='q'){       //收到字符q之后,不会再向file写入新的数据(但字符q可以被写入)
           socket.unpipe(file);         //取消指定pipe隧道
         }                        
    });
});
server.listen(8431,'localhost');
server.on('error',
function(err) {
    console.log('出错了,err=' + err.code);
});
server.on('close',
function() {
    console.log('本服务器要关闭了。');
});

上面结果:可以unpipe成功。

但,有嵌套就无法成功,例如:

var net=require('net');
var file=require('fs').createWriteStream('./message.txt');
var server=net.createServer();
server.on('connection',function(socket){
    socket.pause();
    setTimeout(function(){ // 本例子,仅能在每个新socket开始的30秒内进行暂停数据写入
        socket.resume();
        socket.pipe(file);
    }, 30000);
    socket.on('data',function(data){ //resume方法可以触发data事件,但data事件无法阻止pipe写入文件
        socket.unpipe();
        console.log('客户端触发data事件,数据为:'+data.toString());
    });
});
 
server.listen(8431,'localhost');

结果:无法unpipe成功。因为有嵌套,例如setTimeout。

真的是这样么? 我不知道,仅仅通过测试,瞎猜的! 请高人指点!

2 回复

楼主,你的问题解决了么?我想在forever模块里data时间触发时判断是否创建当前日期的新日志,unpipe掉旧日志,pipe一个新日志。pipe有效,unpipe无效。具体可以看这里 https://cnodejs.org/topic/54dc0f5550194b3e191bc483

  socket.unpipe(file); //无法成功,????????????????

用的不对,自然无法unpipe。 不能在on(‘data’)里面unpipe,因为当前的pipe还有很多工作没执行完,需要把unpipe放在setImmediate / process.nextTick里。 打个比方吧,如果想要拆一个房子,肯定不能在房子里面拆,会砸到自己

回到顶部