Node系列之Event模块
发布于 8 年前 作者 zhangmingkai4315 5714 次浏览 来自 分享

最近单位事情不太多,决定重新看一下最新的node api,做了一些总结,思路和内容上如有问题还请大家多多指正。

Node的很多核心的API都是构建在Event模块上面的,比如stream模块,可在数据到来时触发data事件,流对象为可读状态触发readable事件,当数据读写完毕后发送end事件等。发送事件其实并非stream本身实现,而是借助于Event对象来实现事件的发送和监听回调绑定。本节内容基于最新的Node v5.7版本,介绍了Node的事件模块的主要API函数的使用以及部分源码阅读,所有例子均可成功运行。

另外欢迎大家访问:Mike的个人Blog jsmean

###创建自己的事件模块

下面我们通过继承EventEmitter来实现自己己的事件对象,该对象监听一个hello事件,当有事件到达的时候,触发回调函数,并打印输出传递的参数。

	import EventEmitter from "events"
	class MyEmitter extends EventEmitter{
		constructor(){
			super()
		}
	}
	const emitter1=new MyEmitter()
	emitter1.on("hello",(username)=>{
		console.log("Welcome:"+username)
	})
	emitter1.emit("hello","Mike")
	
	// Welcome:Mike

注:官方的例子中并未使用super函数去调用父类的构造函数,这样的初始化将导致以下父类的构造函数中的内容得不到执行,无法将domain等对象绑定到新的自定义事件上,但是如果你不使用domain特性的话则不会产生任何影响.

	function EventEmitter() {
	  EventEmitter.init.call(this);
	}
	
	EventEmitter.init = function() {
	  this.domain = null;
	  if (EventEmitter.usingDomains) {
	    // if there is an active domain, then attach to it.
	    domain = domain || require('domain');
	    if (domain.active && !(this instanceof domain.Domain)) {
	      
	      //绑定domain对象到函数对象中
	      this.domain = domain.active;
	    }
	  }
	
	  下面的初始化步骤,在addEventListener函数中做了判断如果为空则执行重新的初始化赋值
	  if (!this._events || this._events === Object.getPrototypeOf(this)._events) {
	    this._events = {};
	    this._eventsCount = 0;
	  }
	
	  this._maxListeners = this._maxListeners || undefined;
	};

###传递错误信息到回调函数

上述的例子中我们传递了一个参数名字到回调函数中,这里我们可以同时传递多个参数,只要在回调函数中包含即可,并且可传递第一个参数为:Error对象,供后续的回调函数来判断。如果Error!=null则代表有错误发生。

	import EventEmitter from "events"
	class MyEmitter extends EventEmitter{
		constructor(){
			super()
		}
	}
	const env=process.env.NODE_ENV
	const emitter1=new MyEmitter()
	
	emitter1.on("Debug",(err,info)=>{
		if(err==null){
			console.log("Debug:"+info)
		}
	})
	
	if(env&&env=="production"){
		emitter1.emit("Debug",new Error("Debug"),null)  //useless	
	}else{
		emitter1.emit("Debug",null,"this is debug info")	
	}

上述例子中当我们使用 NODE_ENV=“production” node 2.js 来执行的时候,不会产生任何的信息,而直接node 2.js则会执行Debug信息输出.

监听行为

####once

node的核心event模块在处理事件的时候,通过多个函数来支持不同的监听方式 比如once,绑定一个事件,但是只处理一次后就不再监听。这里我们看一下源码,once的实现其实将我们自己传递的回调函数做了二次封装,再绑定上封装后的函数,封装的函数首先执行了removeListener()移除了回调函数与事件的绑定,然后才执行的回调函数:

	EventEmitter.prototype.once = function once(type, listener) {
	  if (typeof listener !== 'function')
	    throw new TypeError('"listener" argument must be a function');
	
	  var fired = false;
	
	  function g() {
	    this.removeListener(type, g);
		
	    if (!fired) {
	      fired = true;
	      listener.apply(this, arguments);
	    }
	  }
	
	  g.listener = listener;
	  this.on(type, g);
	
	  return this;
	};

setMaxListeners()

event模块支持设置最大的监听数目,默认情况下当监听数目超过10个则产生一条warn信息,该最大值可通过函数setMaxListeners()来设置,当设置为0的时候默认没有限制。下面的例子中我们生成了20个监听函数,并设置最大监听数目为10,当第11个监听函数绑定上之后,产生了一个warn信息,并附带了trace信息,但是程序仍旧执行了下去,并未暂停退出。

	import EventEmitter from "events"
	class MyEmitter extends EventEmitter{
		constructor(){
			super()
		}
	}
	
	const emitter1=new MyEmitter()
	
	emitter1.setMaxListeners(10);
	
	for(let i=0;i<20;i++){
	   emitter1.on("hello",(username)=>{
		  console.log("Welcome:"+username+"-"+i)
	   })
	}
	emitter1.emit("hello","Mike")
	
	//output

	(node) warning: possible EventEmitter memory leak detected. 11 hello listeners added. Use emitter.setMaxListeners() to increase limit.
	Trace
	    at MyEmitter.addListener (events.js:252:17)
	    at _loop (/Users/zhangmingkai/workshop/node/event/4.js:32:11)
	    at Object.<anonymous> (/Users/zhangmingkai/workshop/node/event/4.js:38:2)
	    at Module._compile (module.js:413:34)
	    at Object.Module._extensions..js (module.js:422:10)
	    at Module.load (module.js:357:32)
	    at Function.Module._load (module.js:314:12)
	    at Function.Module.runMain (module.js:447:10)
	    at startup (node.js:139:18)
	    at node.js:999:3
	Welcome:Mike-0
	Welcome:Mike-1
	Welcome:Mike-2
	...
	Welcome:Mike-18
	Welcome:Mike-19

addListener与removeListener

node的源码写的比较清晰,如下面代码所示,我们可以看出on函数其实不过是一个addListener函数的别名而已,removeListener(事件名称,回调函数)则执行清理操作,移除回调函数和绑定的事件。

	EventEmitter.prototype.on = EventEmitter.prototype.addListener;
	
	EventEmitter.prototype.addListener = function addListener(type, listener) {
	  var m;
	  var events;
	  var existing;
		
	  //判断是否为函数,如果类型不是函数类型则抛出异常
	  if (typeof listener !== 'function')
	    throw new TypeError('"listener" argument must be a function');
	  
	  // 获得事件的列表
	  events = this._events;
	  if (!events) {
	    //初始化事件对象和事件计数
	    events = this._events = {};
	    this._eventsCount = 0;
	  } else {
	    // To avoid recursion in the case that type === "newListener"! Before
	    // adding it to the listeners, first emit "newListener".
	    
	    // 当有新的绑定产生的时候,发射一个newListener的事件
	    if (events.newListener) {
	      this.emit('newListener', type,
	                listener.listener ? listener.listener : listener);
	
	      // Re-assign `events` because a newListener handler could have caused the
	      // this._events to be assigned to a new object
	      events = this._events;
	    }
	    // 绑定同一个事件的回调函数列表
	    existing = events[type];
	  }
	
	  if (!existing) {
	    //第一次绑定的时候直接将函数赋值给existing变量
	    existing = events[type] = listener;
	    ++this._eventsCount;
	  } else {
	    if (typeof existing === 'function') {
	      // 如果是第二次绑定则将原来的existing变量变为一个函数数组
	      existing = events[type] = [existing, listener];
	    } else {
	      // 第二次之后的绑定直接push即可
	      existing.push(listener);
	    }
	    // 判断是否超出了默认的或者设置的最大绑定数目   
	    if (!existing.warned) {
	      m = $getMaxListeners(this);
	      if (m && m > 0 && existing.length > m) {
	        existing.warned = true;
	        if (!internalUtil)
	          internalUtil = require('internal/util');
	
	        internalUtil.error('warning: possible EventEmitter memory ' +
	                           'leak detected. %d %s listeners added. ' +
	                           'Use emitter.setMaxListeners() to increase limit.',
	                           existing.length, type);
	        console.trace();
	      }
	    }
	  }
	  //返回this则可执行级联操作
	  return this;
	};

events模块与监听者设计模式

在设计模式中,广义上的监听者模式意味着了一个主体,维护一个或者多个称之为监听者的对象,并且当主体发生变化的时候主动通知这些监听者。这样监听者就不用一直去询问主体是否发生了变化。就像我们现实生活中去办理一些业务的时候,工作人员一般会记录一些我们的信息比如手机号,一旦他们处理完成后就联系我们,而不用我们隔一段时间就去查询一下是否处理完毕了。

监听者模式的最大好处是解耦了两个对象之间的联系,甚至是独立的两个服务。比如我们使用Redis或者RabbitMQ的订阅发布模式。数据的生产者和消费者可以是不同的组件,甚至是不同语言编写的程序等。同样在node的代码世界里,events模块已经帮我们实现了该模式,我们可以单独使用events作为中间件来处理事件的传递,也可以继承实现自己的events来处理发生与接收事件。

2 回复

最近在写node后台,就了解了很多的第三方模块,最后发现其实node原本就提供了这些功能,完成不必要使用第三方模块了。哈哈,谢谢你的分享!

多读node的文档和源码,可能会有不一样的收获

回到顶部