Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

前置知识

学习之前需要理清这么几个关键概念:

  • netty相关:EventLoop, EventLoopGroup, ChannelHandler, ChannelPipeline,ChannnelPromise, ChannelFuture
  • nio相关:channel, selector.

ChannelSelector

谈到nio, 那么channel和selector就是绕不开的话题,他们的关系如下:

nio

一个selector和一个线程对应,这也是nio的好处之一:用单线程处理多个客户端的连接;一个selector同时对应多个channel,每个channel将自己注册到selector上,注册的时候会携带自己感兴趣的事件。然后selector就去轮训这些事件,当有某个事件ready时,就去通知对应的channel.

EventLoop

翻译过来是事件循环器,其实就是用来处理各种io事件的,要具备这种能力,那么eventloop必须:

  1. 具有一个selector
  2. 具有自己的线程,在这个线程里可以对selector注册的事件进行遍历

所以,eventloop继承自了Executor,具有线程池的能力,同时含有一个selector.

image

每个channel都对应一个eventloop,在netty启动之时,会将channel注册到eventloop,然后eventloop就会开启死循环,去

  1. 遍历channel感兴趣的事件
  2. 作为executor去执行任务队列里的任务

上述两点便是eventloop的核心作用。我们已AbstractChannel.register来说明这个问题。

AbstractChannel.register
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}

在注册channel的时候,如果当前线程是eventloop线程,则直接注册,否则,将register0作为一个任务提交到eventloop的任务队列里,也就是execute方法。

SingleThreadEventExecutor.execute
image

execute方法也是言简意赅,如果当前线程是eventloop线程,则直接addTask,否则先开启一个线程,然后addTask.

SingleThreadEventExecutor.doStartThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
}
});
}

这里利用Jdk的Executor.execute开启一个线程,并在这个线程里开始了它的死循环,即SingleThreadEventExecutor.this.run()

NioEventloop.run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
...
}
}

这个run方法就是EventLoop的核心了,里面有两个比较关键的方法:

  • processSelectedKeys:查询已经就绪的事件并处理,说白了就是处理io事件
  • runAllTasks:处理任务队列里的任务,比如我们通过eventloop.execute提交的任务

image

image

那么至此,eventloop的功能就大概清楚了,它就是在死循环里套死循环,去处理io事件和taskQueue. 当然了,里面还有一些小细节,比如:

  • 为了分配好处理io事件的时间和处理taskQueue的事件,使用了ioratio作为两者的时间比例。

ChannelPipeline

用张图来解释:

image

每个channel都会对应一个channel pipeline, 而这个pipeline就是一个链表,每个节点类型为ChannelHandlerContext,内部包了一个ChannelHandler. 而ChannelHandler又分为ChannelInboundHandlerChannelOutboundHandler, 分别用来处理入站事件和出站事件,差不多像这样:

image

当有入站事件或者出站事件发生时,事件会以责任链模式经过handler.

ChannelPromise ChannelFuture

netty扩展了jdk原生的future. 而promise则是对Netty future的进一步扩展。

image

jdk原生future:

image

netty的future:

image

可以看到,netty扩展的future增加了一些监听器的addremove的方法,以及一些同步方法,如await,sysnc.

再来看下promise:

image

多了一些设置状态的方法,如setSuccess,setFailure

这些扩展意味着什么

netty的future可以addListener,removeLisener, promise可以setSuccess,setFailure,意味着异步操作时,如主线程调了eventloop的线程,只要将promise返回主线程,那么promise在eventloop线程里的任何动作都可以被主线程感知到,比jdk的futrue更加健壮了一些。

总览

一般的启动代码长这样:

image

真正的入口在bind这里,进去看看:

image

首先,initAndRegister,意思是初始化并注册,初始化什么?注册什么?这里其实是初始化一个channel, 并把eventloop的selector注册到channel上,注意这个方法的返回值,是个future,也就是说initAndRegister这个动作是异步进行的,如果注册完成了,即regFuture.isDone(),则进行doBind0操作,否则,添加一个监听器,等initAndRegister完成了会出发它,然后进行doBind0

OK,大致来看,启动分为两个步骤,首先initAndRegister,然后进行doBind0

initAndRegister

首先来看下这个initAndRegister究竟做了些啥。

image

这里init后面会讲。最终调用到的是**AbstractChannel$AbstractUnsage.register**

AbstractChannel$AbstractUnsage.register
image-20220326145847131

这个register方法有两个参数,eventloop是最初配置的bossGroup,ChannelPromise是对Channel的包装。这里要注意,注册的动作必须发生在eventloop的线程里,所以如果当前线程不是eventloop的线程的话,eventloop会起一个自己的线程去做这个事情。

继续看register0:

register0
image

doResigter就是真正的注册过程,在死循环里将channel注册到eventloopselector里。

doResigter
image

相比代码①,注册完成之后的②③④⑤更值得关注。

  • 代码②回调添加handler时候的handlerAdded方法

​ 这里是有细节的,回到我们最初的server端代码:

image-20220404200516407

执行完代码②后,应该输出“HandlerAdded”. 但是这个handler是什么时候添加进pipeline的呢?是serverBootStraphandler()方法吗,不是。回到initAndRegister处,

imageabc

看下这个init方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
void init(Channel channel) throws Exception {
... //省略若干代码
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

可以看到,在init的时候,pipeline就添加一个handler,即ChannelInitializer,并覆盖了initChannel方法,所以启动后pipelie里除了headtail之外,还有一个叫做ChannelInitializerhandler,一共三个handler。再回到代码②,代码②的最终功能就是去调用ChannelInitializerinitChannel方法,而这个initChannle里面,才是把我们在serverbootstrap里定义的handler给加到pipeline里,并在添加之后回调handlerAdded方法。

image

当这个ChannelInitializer完成的它的任务后,就被remove掉了,此时pipeline里就只有head,tail以及我们自己定义的handler了。

  • 代码③设置成功之后,我们上文里提到的doBind方法里的listener就会被出发,从而继续执行doBind0方法
  • 代码④执行添加handler时候的channelRegistered回调
  • 代码⑤首次注册的时候不是active,后面再讲

OK,至此,一个channel就初始化好了,并且注册了到了eventloop上

doBind0

书接上回register0的代码③,设置成功后,doBind方法里的regFuture添加的listener就被触发了,来到了doBind0环节。

image

这里的channel.bind()也是比较有意思的,看似是channelbind,其实最后调的是pipelinebind.(插一嘴,pipieline在netty里相当核心,基于责任链模式,所有的事件都在pipeline里流动,后面会专门起一篇文章说这个事情。)

AbstractChannel.bind()

image

DefaultChannelPipeline.bind()

image

​ 可以看到,pipelinebind是从tail开始的,也就是说,它是从尾部向头部传播的,也就是说,context的类型应该是outboundContenxt

总结一下

服务端启动需要做这么几件事:初始化一个channel, 然后把这个channel注册到selector上,然后把在channel上绑定服务端地址。

整个过程中,pipeline贯穿全局,起着传递事件的作用。

评论