程序地带

Netty 线程模型分析(一) reactor 线程如何执行任务


前面几篇文章整体分析了Netty应用启动过程,但是留了一些细节情况并没有深入研究,本文主要围绕 reactor线程对任务执行。


EventLoop

EventLoop可以理解为一个单线程多任务的线程池,netty中所有内部任务事件都是由EventLoop来驱动执行的,例如下面例子:


if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}

如果在EventLoop中,直接执行,否则执行execute方法:


@Override
public void execute(Runnable task) {
if (task == null) { // 判断任务
throw new NullPointerException("task");
}
// 判断是否在EventLoop中
boolean inEventLoop = inEventLoop();
// 添加进任务队列
addTask(task);
if (!inEventLoop) { // 如果不在EventLoop,则进入方法尝试启动线程池。
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
// 首次进入,唤醒它,让他开始跑?
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

先看唤醒线程 startThread 方法,只会执行一次:


private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}

startThread 实际需要执行 doStartThread 来进行 SingleThreadEventExecutor.this.run()。


NioEventLoop 的run方法

前面说,EventLoop是一个单消费者多任务的线程池,那么 NioEventLoop的run 就是这个单消费者:


@Override
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// 报错则重建selector,捕获异常
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
// io任务和普通任务执行比例,默认是50%
if (ioRatio == 100) {
try {
// 执行selector的io任务
processSelectedKeys();
} finally {
// 保证会执行其他任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 执行一定时间的任务队列
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
// 如果抛异常了,异常传播
handleLoopException(t);
}
// 每次循环判断是否shutdown了
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
selectStrategy.calculateStrategy

selectStrategy.calculateStrategy(selectNowSupplier, hasTasks() ,主要判断2个状态,即continue或者非continue(执行select)。 DefaultSelectStrategy 的calculateStrategy如下


@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

selectSupplier 则代表依次selectNow:


private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};

hasTasks 判断了两个队列,即 SingleThreadEventExecutor 的 taskQueue 和 SingleThreadEventLoop 的 tailTasks 两者都是 Queue<Runnable>。 taskQueue 属于 EventLoop 的正统任务,而 tailTask 则是再最后才执行的。


即如果有任务,则 calculateStrategy 执行 selectNow(即Selector的selectNow) 根据具体返回值确定。 结合后面例子:


如果没有任务,则执行一次select(后面的select(wakenUp.getAndSet(false))),并停顿后在执行如果有任务,那么先执行一次 selectNow。如果执行selectNow结果为其他,例如0,那么执行后面处理
select(wakenUp.getAndSet(false))

具体调用selector再次执行selectNow 地方,里面处理了一个jdk一个死循环bug。


private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 计算下一次延迟任务待执行时间,保证在其之前执行完。
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
// 如果时间不够了,执行一次,立刻返回
break;
}
// 有任务
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 阻塞式select这么多秒
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// select到了时间
// 本身就是被用户执行进来
// 有任务
break;
}
if (Thread.interrupted()) {
// 被中断,则需要打印中断
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 轮训512次,仍然没有退出,则可能触发到jdk的死循环异常了,需要重建selector
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}

上文中,主要是围绕:


在下一次定时时间前,完成select事件。控制轮训次数,解决jdk死循环bug 解决方法就是
根据该BUG的特征,首先侦测该BUG是否发生;将问题Selector上注册的Channel转移到新建的Selector上;老的问题Selector关闭,使用新建的Selector替换。
nio 空轮询bug

什么是nio空轮训bug? 简言之就是nio的selector.select 返回了一个nio中未定义的状态码。 而nio无法解决,selectedKeys返回有值,那么就会使得程序中一直循环(因为是听过阻塞循环轮训是否有io事件来进行的)。


poll和epoll对于突然中断的连接socket会对返回的eventSet事件集合置为POLLHUP,也可能是POLLERR,eventSet事件集合发生了变化,这就可能导致Selector会被唤醒。


因为selector的select方法,返回numKeys是0,所以下面本应该对key值进行遍历的事件处理根本执行不了,又回到最上面的while(true)循环,循环往复,不断的轮询,直到linux系统出现100%的CPU情况,其它执行任务干不了活,


这里不进行深度搬运: 详细可以看:https://www.cnblogs.com/JAYIT/p/8241634.html


io任务与普通任务

由于nio特征就是selector轮训,但是多个channel绑定到一个reactor线程中,那么reactor线程如何分配轮训任务与普通任务呢? 核心代码这一段:


if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
默认ration=50,即默认比例是1:1,即执行io任务时间和普通任务时间一致。就算是ration=100,即优先普通任务执行,那么也要等当前io任务执行完后,再执行普通任务。通过 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 来确定普通任务执行时间。
processSelectedKeys

执行io任务,主要通过上一步轮训出来的 selecteKeys 执行: NioEventLoop 的 processSelectedKeys 方法


private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

前面提到过,netty用数组替换了原本nio中的set类型的selectKeys变量: 所以是调用 processSelectedKeysOptimized 方法: NioEventLoop 的 processSelectedKeysOptimized 方法


private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) { // 对每个keys进行操作
final SelectionKey k = selectedKeys.keys[i];
// 置位null,这样gc可以回收
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// io事件处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// 显示调用了 cancel 方法后,需要再一次轮训
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}

此时 final Object a = k.attachment(); 获取这个attachment 是啥呢?就是当前的channel本身,说明这个channel有这个事件了。 是在channel向register注册时候,将AbstractNioChannel 注册进入java的niochannel中。 NioEventLoop的processSelectedKey 方法:


private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// 判断条件
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 处理connect事件
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// 处理write事件
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 处理读事件,在server端就是连接事件,在client端就是读事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
通过判断 是否在EventLoop中,以及判空事件。区分读事件和写事件进行操作。 其中 unsafe 是从AbstractNioChannel中获取,对于 NioSocketChannel 是 NioSocketChannelUnsafe ,而 NioServerSocketChannel 则是 NioMessageUnsafe 。
runAllTasks

执行完io任务,就是执行普通任务时候,这里以有timeout为例分析:


protected boolean runAllTasks(long timeoutNanos) {
// 判断是否有任务,或者尝试将延时任务放到执行其中
fetchFromScheduledTaskQueue();
// 获取一个任务
Runnable task = pollTask();
if (task == null) {
// 没有任务了,执行tailTask任务,即 tailTask任务优先级最低。
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
// 每执行或者循环64个任务,执行一次timeout对比,因为 nanoTime() 很耗时
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// 执行所有任务
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}

上面执行普通任务方法,整个代码框架不难理解,就是获取任务,执行任务。 fetchFromScheduledTaskQueue 方法很有意思。 在netty中,可以有空闲检测机制,即几秒没有发送或者接受消息,触发一次。这样一来,就需要类似于ScheduledPoolExecutor线程池来进行,但是一个channel只能绑定一个reactor线程啊?这怎么处理?


fetchFromScheduledTaskQueue

fetchFromScheduledTaskQueue 会将 PriorityQueue<ScheduledFutureTask<?>> 最近任务拿出来,如果超时了,就将其放入taskqueue中执行。 由于 PriorityQueue 是以根堆来实现,所以peek 时间复杂度为O(1)。思路还是很妙的。 SingleThreadEventExecutor 的 fetchFromScheduledTaskQueue 方法:


private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
// 去优先队列拿信息
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// taskQueue已经满了,那么还是放回优先级队列中
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}

SingleThreadEventExecutor 的 pollScheduledTask 方法:


protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}

即拿出最近一个任务,对比 nanoTime 超时时间。


关注博主公众号: 六点A君。 哈哈哈,一起研究Netty: 在这里插入图片描述


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/anLA_/article/details/109757411

随机推荐

2020-12-28

Springboot初建项目,跨域+Swagger的配置跨域配置类@ConfigurationpublicclassWebApiConfigextendsWebMvcCo...

peamp派木 阅读(902)

android 解决android studio编译慢问题

 1、我们编译代码的时候经常会出现build超级慢,有时候可能得等十几二十分钟,因为我们用了谷歌的加载导致会很慢(maven{url"https://...

踏雪羽翼 阅读(763)