node源码粗读(12):通过`net.createConnection`来看socket的Event Emitter的实现
发布于 7 年前 作者 xtx1130 3415 次浏览 来自 分享

这篇文章主要从net.createConnection入手,详细讲解socket常用的connet、data、error、close等事件是如何实现的。

socket的创建

相信用过net.createConnection都比较了解了。这个API会创建一个客户端的socket链接:

net.createConnection(options[, connectListener])

其中connectListener将被添加为返回 socket 上的 ‘connect’ 事件上的监听器。 简单了解了API,我们直奔./lib/net.js来看一下是如何实现的:

// ./lib/net.js connect构造函数
function connect(...args) {
  var normalized = normalizeArgs(args);
  var options = normalized[0];
  // ...
  var socket = new Socket(options);
  // ...
  return Socket.prototype.connect.call(socket, normalized);
}

首先我们关注一下new Socket(options)的实现:

const {
  TCP,
  TCPConnectWrap,
  constants: TCPConstants
} = process.binding('tcp_wrap');
// ... socket构造函数
function Socket(options) {
  // ...
  this._hadError = false;
  this._handle = null;
  this._parent = null;
  this._host = null;
  this[kLastWriteQueueSize] = 0;
  this[kTimeout] = null;

  // ...
  stream.Duplex.call(this, options);
  this.allowHalfOpen = Boolean(allowHalfOpen);

  if (options.handle) {
    this._handle = options.handle; // private
    this[async_id_symbol] = getNewAsyncId(this._handle);
  } else if (options.fd !== undefined) {
    const { fd } = options;
    this._handle = createHandle(fd, false);
    this._handle.open(fd);
    this[async_id_symbol] = this._handle.getAsyncId();
    // ...
  }
  initSocketHandle(this);
  // ...
  }
 // ...
}
util.inherits(Socket, stream.Duplex);
// ... createHandle 函数
function createHandle(fd, is_server) {
  const type = TTYWrap.guessHandleType(fd);
  // ...
  if (type === 'TCP') {
    return new TCP(
      is_server ? TCPConstants.SERVER : TCPConstants.SOCKET
    );
  }
  throw new ERR_INVALID_FD_TYPE(type);
}

通过上面这一段代码我们可以得到两个关键的信息:

  • this._handle = createHandle(fd, false);TCP的实例
  • stream.Duplex.call(this, options); socket的实例继承了stream.Duplex的属性和方法 接下来是connect:
Socket.prototype.connect = function(...args) {
  if (pipe) {
    // ...
  } else {
    lookupAndConnect(this, options);
  }
}
/// connect调用的核心函数
function internalConnect(
  self, address, port, addressType, localAddress, localPort) {
   if (addressType === 6 || addressType === 4) {
    const req = new TCPConnectWrap();
    req.oncomplete = afterConnect;
    req.address = address;
    req.port = port;
    req.localAddress = localAddress;
    req.localPort = localPort;
    if (addressType === 4)
      err = self._handle.connect(req, address, port);
    // ...
  } else {
    // ...
  }
}

里面需要注意的地方:

  • self._handle.connect(req, address, port);这一段是真正的实现socket连接的地方
  • req.oncomplete = afterConnect; 这一段是连接成功之后的回调

this._handle和’connect’事件

在上一节我们知道this._handleTCP的实例,进而我们长驱直入tcp_wrap.cc

void TCPWrap::Initialize(Local<Object> target,
                         Local<Value> unused,
                         Local<Context> context) {
  // ...
  Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
  Local<String> tcpString = FIXED_ONE_BYTE_STRING(env->isolate(), "TCP");
  t->SetClassName(tcpString);
  t->InstanceTemplate()->SetInternalFieldCount(1);
  t->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "reading"),
                             Boolean::New(env->isolate(), false));
  t->InstanceTemplate()->Set(env->owner_string(), Null(env->isolate()));
  t->InstanceTemplate()->Set(env->onread_string(), Null(env->isolate()));
  t->InstanceTemplate()->Set(env->onconnection_string(), Null(env->isolate()));
   // ...
  env->SetProtoMethod(t, "connect", Connect);
  // ...
}
void TCPWrap::New(const FunctionCallbackInfo<Value>& args) {
  // ...
  Environment* env = Environment::GetCurrent(args);

  int type_value = args[0].As<Int32>()->Value();
  TCPWrap::SocketType type = static_cast<TCPWrap::SocketType>(type_value);

  ProviderType provider;
  switch (type) {
    case SOCKET:
      provider = PROVIDER_TCPWRAP;
      break;
    case SERVER:
      provider = PROVIDER_TCPSERVERWRAP;
      break;
    default:
      UNREACHABLE();
  }

  new TCPWrap(env, args.This(), provider);
}

TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
    : ConnectionWrap(env, object, provider) {
  int r = uv_tcp_init(env->event_loop(), &handle_);
  CHECK_EQ(r, 0);  // How do we proxy this error up to javascript?
                   // Suggestion: uv_tcp_init() returns void.
}

通过上面的代码,我们可以清晰地看到TCPWrap向js层抛出的构造函数TCP,重点注意下

env->SetProtoMethod(t, “connect”, Connect);

这里对TCP的实例添加了connect方法,意即this._handle.connect调用的为TCPWrap::Connect

void TCPWrap::Connect(const FunctionCallbackInfo<Value>& args) {
  // ...
  if (err == 0) {
    AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
    ConnectWrap* req_wrap =
        new ConnectWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_TCPCONNECTWRAP);
    err = req_wrap->Dispatch(uv_tcp_connect,
                             &wrap->handle_,
                             reinterpret_cast<const sockaddr*>(&addr),
                             AfterConnect);
    // ...
  }
  args.GetReturnValue().Set(err);
}

通过这段代码可以清晰地看到,uv_tcp_connect的cb为AfterConnect,libuv中的逻辑就不做详细介绍了,有兴趣的可以观光一下tcp.cuv__tcp_connect函数,最终会走入到uv__io_poll的轮回中。我们重点看一下AfterConnect,代码在connection_wrap.cc中:

void ConnectionWrap<WrapType, UVType>::AfterConnect(uv_connect_t* req,
                                                    int status) {
  // ...
  req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
  delete req_wrap;
}

可以看到最终调用的函数为oncomplete,这时又回到了net.js中:

const req = new TCPConnectWrap();
req.oncomplete = afterConnect;

afterConnect的定义中有这样一句:

function afterConnect(status, handle, req, readable, writable) {
  // ...
  if (status === 0) {
    // ...
    self.emit('connect');
    self.emit('ready');
  }
}

最终在这里emit出了connect方法,即连接成功的回调,也就是net.createConnection(options[, connectListener])connectListener函数最终被emit触发的地方。

stream和’data’事件

不知道读者还有没有记得,文中的第一节曾分析过:

stream.Duplex.call(this, options); socket的实例继承了stream.Duplex的属性和方法

没错,'data’事件就是在stream.Duplex中emit的,在上一篇文章中我曾介绍过stream的写入过程,而在这里值stream的读取过程,刚才在分析TCPWrap的时候,在c++中构造js的TCP构造函数时候有这样一句:

t->InstanceTemplate()->Set(env->onread_string(), Null(env->isolate()));

由于socket继承了stream.Duplex,整个的溯源过程就不详细查找了,最终onread的触发在stream_base.cc中:

void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) {
  Local<Value> argv[] = {
    Integer::New(env->isolate(), nread),
    buf
  };
  // ...
  wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
}

而在js中的触发则在net.js:

function onread(nread, buffer) {
  // ...
  if (nread > 0) {
    // ...
    var ret = self.push(buffer);
    // ...
  }
}

push方法则是从stream继承来的,视线转移到_stream_readable.js中:

Readable.prototype.push = function(chunk, encoding) {
  // ...
  return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
}

readableAddChunk函数的作用是对流不断进行拼接并在过程中进行容错处理:

function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
  // ...
  if (addToFront) {
        if (state.endEmitted)
          stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
        else
          addChunk(stream, state, chunk, true);
      } else if (state.ended) {
        stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
      } else if (state.destroyed) {
        return false;
      } else {
        state.reading = false;
        if (state.decoder && !encoding) {
          chunk = state.decoder.write(chunk);
          if (state.objectMode || chunk.length !== 0)
            addChunk(stream, state, chunk, false);
          else
            maybeReadMore(stream, state);
        } else {
          addChunk(stream, state, chunk, false);
        }
}

在这里我只截取了关键代码,**可以看到在addChunk的同时,如果出错会立刻emit出’error’事件。在上游net.js中还有一些情况会emit 'error’事件,就不做详尽分析了。**接下来我们看下addChunk:

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync) {
    state.awaitDrain = 0;
    stream.emit('data', chunk);
  } else {
    // update the buffer info.
    state.length += state.objectMode ? 1 : chunk.length;
    if (addToFront)
      state.buffer.unshift(chunk);
    else
      state.buffer.push(chunk);

    if (state.needReadable)
      emitReadable(stream);
  }
  maybeReadMore(stream, state);
}

通过stream.emit('data', chunk);会emit出’data’事件,并且把chunk传入到参数中。

总结

至此,socket的’connect’、‘data’、'error’事件便分析完毕了。有意思的是,这三个事件的来源不尽相同:

  • 'connect’是通过TCPWrap类及一系列函数触发的emit
  • 'data’则是和StreamBase类相关
  • 'error’则相当于全程的兜底,不管在哪里出了问题,总会emit出’error’事件。

by 小菜
原文地址:https://github.com/xtx1130/blog/issues/26 欢迎watch和star。如果文中有表述错误的地方还请大神斧正

回到顶部