nodejs解析http协议源码解析
发布于 4 年前 作者 theanarkh 4740 次浏览 来自 分享

之前文章讲到nodejs创建一个服务并且建立tcp连接的过程。接下来分析一下,在建立tcp连接后,nodejs是如何解析http协议的。我们首先看一下nodejs在建立tcp连接时执行net.js层的回调时做了什么操作。下面是核心代码。

// clientHandle代表一个和客户端建立tcp连接的实体
function onconnection(err, clientHandle) {
  var handle = this;
  var self = handle.owner;

  debug('onconnection');

  if (err) {
    self.emit('error', errnoException(err, 'accept'));
    return;
  }
  // 建立过多,关掉
  if (self.maxConnections && self._connections >= self.maxConnections) {
    clientHandle.close();
    return;
  }

  var socket = new Socket({
    handle: clientHandle,
    allowHalfOpen: self.allowHalfOpen,
    pauseOnCreate: self.pauseOnConnect
  });
  socket.readable = socket.writable = true;
}
...

建立tcp连接后nodejs新建了一个Socket实体。我们看一下new Socket的核心逻辑。

 stream.Duplex.call(this, options);
 this._handle = options.handle; 
 initSocketHandle(this);
 // 触发底层注册一些函数
 this.read(0);
function initSocketHandle(self) {
	if (self._handle) {
	    self._handle.owner = self;
	    // 这个函数在底层有数据时会回调
	    self._handle.onread = onread;
	    self[async_id_symbol] = getNewAsyncId(self._handle);
	}
}

另一个重点是read(0)这个函数的逻辑。

Socket.prototype.read = function(n) {
  if (n === 0)
    return stream.Readable.prototype.read.call(this, n);

  this.read = stream.Readable.prototype.read;
  this._consuming = true;
  return this.read(n);
};

read函数最终会调用_read函数读数据。我们看一下ReadableStream里的read。

在read里会执行_read
this._read(state.highWaterMark);

而_read是由Socket函数实现的。因为Socket继承了ReadableStream。_read执行了一个很重要的操作。

this._handle.readStart();

_handle代表的是一个TCP对象,即tcp_wrapper.cc里创建的。所以我们去看tcp_wrapper的代码。但是没找到该函数。原来该函数在tcp_wrapper的子类stream_wrap里实现的。

int LibuvStreamWrap::ReadStart() {
  return uv_read_start(stream(), [](uv_handle_t* handle,
                                    size_t suggested_size,
                                    uv_buf_t* buf) {
    static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
  }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
    static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
  });
}

其实就是调用了libuv的uv_read_start函数。该函数在stream.c里。我们继续往下看。

  stream->read_cb = read_cb;
  stream->alloc_cb = alloc_cb;
  // 注册读事件
  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);

主要是注册了事件和回调函数,该函数会在数据到来时被执行。到此,就告一段落了。现在就要等数据的到来。上篇文章我们分析过,数据到来时执行的函数是uv__stream_io。

 if (events & (POLLIN | POLLERR | POLLHUP))
    uv__read(stream)

有读事件到来的时候,uv__stream_io会调uv_read函数。

	buf = uv_buf_init(NULL, 0);
    stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
   if (buf.base == NULL || buf.len == 0) {
     /* User indicates it can't or won't handle the read. */
     stream->read_cb(stream, UV_ENOBUFS, &buf);
     return;
   }

这两个函数就是刚才注册的。我们再次回到nodejs的c++代码。看一下这两个函数做了什么。

void LibuvStreamWrap::OnUvRead(...) {
	EmitRead(nread, *buf);
}

EmitRead在stream_base-inl.h里定义,他又是一个子类。

inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
  if (nread > 0)
    bytes_read_ += static_cast<uint64_t>(nread);
  listener_->OnStreamRead(nread, buf);
}

在stream_base.c定义

OnStreamRead() {
	 stream->CallJSOnreadMethod(nread, obj);
}
CallJSOnreadMethod() {
	wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
}

在env.h里我们知道onread_string就是onread,所以这里就是执行js层的onread函数。该函数就是在new Socket的时候注册的。我们回到js的代码。

	function onread() {
		var ret = self.push(buffer);
	}

push函数是在readableStream里定义的。他经过一系列处理触发ondata事件。

function addChunk(...) {
	...
	stream.emit('data', chunk);
	...
}

那是谁监听了ondata事件呢,我们首先看一下nodejs在建立一个连接到再_http_server.js层做了什么处理。

function Server(requestListener) {
  if (!(this instanceof Server)) return new Server(requestListener);
  net.Server.call(this, { allowHalfOpen: true });
  // 收到http请求时执行的回调
  if (requestListener) {
    this.on('request', requestListener);
  }
  this.httpAllowHalfOpen = false;
  // 建立tcp连接的回调
  this.on('connection', connectionListener);

  this.timeout = 2 * 60 * 1000;
  this.keepAliveTimeout = 5000;
  this._pendingResponseData = 0;
  this.maxHeadersCount = null;
}

connectionListener代码如下。

function connectionListener(socket) {
  defaultTriggerAsyncIdScope(
    getOrSetAsyncId(socket), connectionListenerInternal, this, socket
  );
}

function connectionListenerInternal(server, socket) {
   httpSocketSetup(socket);
	if (socket.server === null)
    socket.server = server;
	if (server.timeout && typeof socket.setTimeout === 'function')
    socket.setTimeout(server.timeout);
  
  socket.on('timeout', socketOnTimeout);
  var parser = parsers.alloc();
  parser.reinitialize(HTTPParser.REQUEST);
  parser.socket = socket;
  socket.parser = parser;
  parser.incoming = null;

  // Propagate headers limit from server instance to parser
  if (typeof server.maxHeadersCount === 'number') {
    parser.maxHeaderPairs = server.maxHeadersCount << 1;
  } else {
    // Set default value because parser may be reused from FreeList
    parser.maxHeaderPairs = 2000;
  }

  var state = {
    onData: null,
    onEnd: null,
    onClose: null,
    onDrain: null,
    outgoing: [],
    incoming: [],
    outgoingData: 0,
    keepAliveTimeoutSet: false
  };
  // 收到tcp连接中的数据时回调
  state.onData = socketOnData.bind(undefined, server, socket, parser, state);
  state.onEnd = socketOnEnd.bind(undefined, server, socket, parser, state);
  state.onClose = socketOnClose.bind(undefined, socket, state);
  state.onDrain = socketOnDrain.bind(undefined, socket, state);
  socket.on('data', state.onData);
  socket.on('error', socketOnError);
  socket.on('end', state.onEnd);
  socket.on('close', state.onClose);
  socket.on('drain', state.onDrain);
  parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state);

  // We are consuming socket, so it won't get any actual data
  socket.on('resume', onSocketResume);
  socket.on('pause', onSocketPause);

  // Override on to unconsume on `data`, `readable` listeners
  socket.on = socketOnWrap;

  // We only consume the socket if it has never been consumed before.
  if (socket._handle) {
    var external = socket._handle._externalStream;
    if (!socket._handle._consumed && external) {
      parser._consumed = true;
      socket._handle._consumed = true;
      parser.consume(external);
    }
  }
  parser[kOnExecute] =
    onParserExecute.bind(undefined, server, socket, parser, state);

  socket._paused = false;
}

主要是注册了一系列的回调函数,这些函数在收到数据或者解析数据时会被执行。所以收到数据后执行的函数是socketOnData。该函数就是把数据传进http解析器然后进行解析。

	function socketOnData(server, socket, parser, state, d) {
	 	...
	    var ret = parser.execute(d);
	    onParserExecuteCommon(server, socket, parser, state, ret, d);
	}

我们先看一下parser是个什么。parser是在_http_server.js的onconnection回调里,parsers.alloc()分配的。而parsers又是个啥呢?他在_http_common.js里定义。

var parsers = new FreeList('parsers', 1000, function() {
  var parser = new HTTPParser(HTTPParser.REQUEST);

  parser._headers = [];
  parser._url = '';
  parser._consumed = false;

  parser.socket = null;
  parser.incoming = null;
  parser.outgoing = null;

  // Only called in the slow case where slow means
  // that the request headers were either fragmented
  // across multiple TCP packets or too large to be
  // processed in a single run. This method is also
  // called to process trailing HTTP headers.
  parser[kOnHeaders] = parserOnHeaders;
  parser[kOnHeadersComplete] = parserOnHeadersComplete;
  parser[kOnBody] = parserOnBody;
  parser[kOnMessageComplete] = parserOnMessageComplete;
  parser[kOnExecute] = null;

  return parser;
});

class FreeList {
  constructor(name, max, ctor) {
    this.name = name;
    this.ctor = ctor;
    this.max = max;
    this.list = [];
  }

  alloc() {
    return this.list.length ?
      this.list.pop() :
      this.ctor.apply(this, arguments);
  }

  free(obj) {
    if (this.list.length < this.max) {
      this.list.push(obj);
      return true;
    }
    return false;
  }
}

他其实是管理http解析器的。重点是HTTPParser,他定义在node_http_parser.cc是对http解析器的封装。真正的解析器在http_parser.c。回到刚才的地方。nodejs收到数据后执行 parser.execute(d);execute函数对应的是node_http_parser里的Execute。该函数进行了重载。入口是下面这个函数。

static void Execute(const FunctionCallbackInfo<Value>& args) {
	Local<Value> ret = parser->Execute(buffer_data, buffer_len);
}


Local<Value> Execute(char* data, size_t len) {
      http_parser_execute(&parser_, &settings, data, len);
 }

http_parser_execute函数定义在http_parser.c,该函数就是进行真正的http协议解析。它里面会有一些钩子函数。在解析的某个阶段会执行。例如解析完头部。

if (settings->on_headers_complete) {
      switch (settings->on_headers_complete(parser)) {
       	...
 	  }
}

具体的定义在node_http_parser.cc

const struct http_parser_settings Parser::settings = {
  Proxy<Call, &Parser::on_message_begin>::Raw,
  Proxy<DataCall, &Parser::on_url>::Raw,
  Proxy<DataCall, &Parser::on_status>::Raw,
  Proxy<DataCall, &Parser::on_header_field>::Raw,
  Proxy<DataCall, &Parser::on_header_value>::Raw,
  Proxy<Call, &Parser::on_headers_complete>::Raw,
  Proxy<DataCall, &Parser::on_body>::Raw,
  Proxy<Call, &Parser::on_message_complete>::Raw,
  nullptr,  // on_chunk_header
  nullptr   // on_chunk_complete
};

这里我们以on_header_complete钩子来分析。

const uint32_t kOnHeadersComplete = 1
int on_headers_complete() {
	Local<Value> cb = obj->Get(kOnHeadersComplete);	
	 MakeCallback(cb.As<Function>(), arraysize(argv), argv);
}

最后会执行kOnHeadersComplete这个函数。我们看到这个kOnHeadersComplete 等于1,其实这个是在js层复赋值的。在_http_common.js中的开头。

const kOnHeadersComplete = HTTPParser.kOnHeadersComplete | 0;

然后在新建一个http解析器的函数注册了该函数。

parser[kOnHeadersComplete] = parserOnHeadersComplete;

所以当解析头部结束就会执行parserOnHeadersComplete。

function parserOnHeadersComplete(...) {
	parser.incoming = new IncomingMessage(parser.socket);
	...
	return parser.onIncoming(parser.incoming, shouldKeepAlive);
}

新建了一个IncomingMessage对象,然后执行_http_server.js注册的回调onIncoming 。该回调函数也是再建立tcp连接时注册的。

function parserOnIncoming() {
	var res = new ServerResponse(req);
	...
	server.emit('request', req, res);
}

生成一个ServerResponse对象,然后触发request事件。该函数是在我们执行http.createServer时传进行的函数。

function Server(requestListener) {
  ...
  // 收到http请求时执行的回调
  if (requestListener) {
    this.on('request', requestListener);
  }
}

最后在我们的回调里就拿到了这两个对象。但是这时候只是解析完了头部,request对象里还拿不到body的数据。我们需要自己获取。

	var str = "";    
    req.on('data', (data) => {
        str += data;   
    });    
    req.on('end',() => {})
回到顶部