Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

前情提要

Netty学习笔记之服务端启动一文中,我们了解了eventloop的基本功能,知道了它的一生其实就是个死循环,再循环里处理IO事件和taskQueue里面的任务;同时我们也了解到,在服务端启动之初(准确的来讲是在channel注册完成之后调用handlerAdded的时候)会给pipeline里添加一个特殊的handler:ServerBootstrapAcceptor,有了这两点之后,让我们看下Netty服务端启动后是如何利用reactor模型建立新连接的。

正文

当客户端启动并给服务端发送消息后,服务端的eventloop就会监听到对应事件:

NioEventLoop.run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys(); //这里就会支棱起来
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
...
}
}

NioEventLoop.processSelectedKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {

....
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read(); //调用了这里
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

NioMessageUnsafe.read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void read() {
...
boolean closed = false;
Throwable exception = null;
try {
...
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i)); //这里
}
...
} finally {
// Check if there is a readPending which was not processed yet.
...
}
}
}

最终调用了pipeline.fireChannelRead(readBuf.get(i)), 在服务端启动一文中我们也讲到了pipeline的传播,像这种firexxx的都是从head往后传播的,所以最终会传播到ServerBootstrapAcceptor里,调用ServerBootstrapAcceptorchannelRead,而这里,就是重头戏了。

ServerBootstrapAcceptor.channelRead

image

这里有几个点需要注意下:

  • 代码①:这里的ChannelSockerChannel,而不是ServerSocketChannel,即是客户端的Channel.

  • 代码②:childHandler就是我们在boostrap里配的ChannelInitializer,等到channel真正注册后回调initChannel方法,把MyServerHandler添加到pipeline,见下图,这里的处理和服务端启动一文中doResigter一节中的代码②是一个道理,归根到底就是因为我们想给pipeline添加handler,但是Channel还没注册好,所以采取的一种妥协的方式,即先给pipeline注册一个ChannelInitializer类型的Handler,等到Channel真正注册好之后,再去回调这个特殊HandlerinitChannel方法。

    这里也可以解释bootstraphandlerchildHanlder的区别:前者对应ServerSocketChannel,后者对应SockerChannel.

image

  • 代码③:这里就是将channel注册到eventloop上,和我们在服务端启动一文中看到的serversocketChannel的注册差不多,但是有细微区别,主要体现在

    • 服务端启动时的注册是将serversocketChannel注册到bossGroup里的eventloop.而childChannel注册时是要注册到workerGroup里。

    • 在注册环节,由于当前线程是bossGroup的线程,所以一定会走else的逻辑, 将注册的任务提交到childChannel自己的eventloop

      image

评论