q

qianxws

V1

2022/12/28阅读:45主题:橙心

netty中ServerBootstrap启动源码分析(下)

1.SingleThreadEventExecutor.this.run()

SingleThreadEventExecutor.this.run();

protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                  case SelectStrategy.CONTINUE:
                      continue;
                  case SelectStrategy.BUSY_WAIT:

                  case SelectStrategy.SELECT:
                       select(wakenUp.getAndSet(false));
                       if (wakenUp.get()) {
                          selector.wakeup();
                       }
                  default:
                }
            } catch (IOException e) {
                //
            }

            if (ioRatio == 100) { // 默认50
                // 处理IO事件
                processSelectedKeys();
                // 处理其他事件
                runAllTasks();
            } else if (strategy > 0) {
                // 处理IO事件
                processSelectedKeys();
                // 处理其他事件
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            } else {
                // 处理其他事件
                runAllTasks(0);
            }
        } catch (Throwable t) {
            //
        } finally {
            closeAll();
        }
    }
}

NioEventLoop 方法的作用主要是

  • 轮询 IO 事件
  • 处理 IO 事件
  • 处理其他非 IO 任务

下面我们看看 select 方法是如何进行事件轮询的

2.select()

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        // 记录循环次数
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    // 第一次轮循,非阻塞select
                    selector.selectNow();
                    selectCnt = 1;
                }
                // 超时退出
                break;
            }

            // 如果有任务且退出select()成功,非阻塞select
            if (hasTasks() && wakenUp.compareAndSet(falsetrue)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            // 上面设置的超时时间未到,这里可以阻塞select去轮询感兴趣的事件
            int selectedKeys = selector.select(timeoutMillis);
            // 轮询次数+1
            selectCnt ++;

            // selectedKeys != 0      表示轮询到了事件
            // oldWakenUp              当前的操作是否需要唤醒
            // wakenUp.get()          是否被外部线程唤醒
            // hasTasks()             任务队列中是否有新任务了
            // hasScheduledTasks()   当时定时任务队列里面是否有任务
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            if (Thread.interrupted()) {
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // 进行了一次阻塞操作
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // 轮询次数超过阈值,可能发生空轮询
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }
    } catch (CancelledKeyException e) {
        //
    }
}
  1. 如果超过了 截止时间,selector.selectNow(); 直接退出
  2. 如果任务队列中出现了新的任务 selector.selectNow(); 直接退出
  3. 经过了上面两次判断后, netty 进行阻塞式 select(time) ,可能会会出现空轮询的 Bug
  4. 如果经过阻塞式的轮询之后,出现的感兴趣的事件,或者任务队列又有新任务了,或者定时任务中有新任务了,或者被外部线程唤醒了 都直接退出循环
  5. 如果前面都没出问题,最后检验是否出现了 JDK 空轮询的 BUG

什么是 JDK 空轮询

问题产生于 linux 的 epoll。

如果一个 socket 文件描述符,注册的事件集合码为 0,然后连接突然被对端中断,那么 epoll 会被 POLLHUP 或者有可能是 POLLERR 事件给唤醒,并返回到事件集中去。

这意味着,Selector 会被唤醒,即使对应的 channel 兴趣事件集是 0,并且返回的 events 事件集合也是 0。

简言之,jdk 认为 linux 的 epoll 告诉我事件来了,但是 jdk 没有拿到任何事件(READ、WRITE、CONNECT、ACCPET)。但此时 select()方法不再选择阻塞了,而是选择返回了 0。

如何解决空轮询

  1. 取消对应的 key,马上刷新 Selector。就是在重现步骤中的第 4 步,立马调用 selector.selectNow 刷新一次 selector

  2. 如果注册到 selector 兴趣事件集为 0,则直接取消注册。 如果注册到 selector 兴趣事件集不为 0,则需要将 linux epoll 事件 POLLHUP/POLLERR 转化为 OP_READ 或者 OP_WRITE。由谁决定转化呢,笔者认为应该由 jdk。这样程序就有机会探测到 IO 异常

  3. 丢弃旧的 selector,重新构造一个(netty 的解决方案)

netty是如何解决空轮询的

想解决空轮询,先要判断这次轮询是否为空轮询

long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
    // 不是空轮询,重置计数器
    selectCnt = 1;
else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
    // 已发生空轮询,解决方案
    break;
}
currentTimeNanos = time;

我们发现netty中这样的判断逻辑

time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos

time 当前时间 t1

TimeUnit.MILLISECONDS.toNanos(timeoutMillis) 任务执行时长(正常情况应该阻塞的时长) t2

currentTimeNanos 开始进入轮询的时间 t3

从代码逻辑看,正常情况下,执行到这里timeoutMills肯定为一个正数,因为在循环开始就判断了,如果为负数,则说明有任务,直接break。在空轮询情况下,time与currentTimeNanos是近似相等的,所以可以看出如果if中的判断式左边在空轮询的条件下可能为负数

在空轮询次数达到阈值后,会采用重建selector的方法解决

就是把原selector的属性等重新赋值给新selector,并执行selectNow方法

3.processSelectedKeys()

处理selectedKeys

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

以processSelectedKeysOptimized()为例

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        

    Iterator<SelectionKey> i = selectedKeys.iterator();
    
    // 迭代selectKeys
    for (;;) {
        final SelectionKey k = i.next();
        // 从selectKey中取出Channel
        final Object a = k.attachment();
        
        i.remove();

        if (a instanceof AbstractNioChannel) {
            // 处理selectKey相关事件
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {

            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }


    }
}

processSelectedKey()

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    try {
        int readyOps = k.readyOps();
        // 处理连接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // 处理写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }

        // 处理读事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        //
    }
}

比如处理读事件,AbstractNioByteChannel.NioByteUnsafe#read()

public final void read() {
    final ChannelConfig config = config();
    if (shouldBreakReadReady(config)) {
        clearReadPending();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // 1.创建ByteBuf
            byteBuf = allocHandle.allocate(allocator);
            // 2.将数据load到ByteBuf中
            allocHandle.lastBytesRead(doReadBytes(byteBuf));

            allocHandle.incMessagesRead(1);
            readPending = false;
            // 3.数据进入到了pipeline中,依次调用inBound接口的handler方法
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        //
    } finally {
        //
    }
}

4.runAllTasks()

runAllTasks() -> runAllTasksFrom() -> runAllTasksFrom()

protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    // 自旋
    for (;;) {
        safeExecute(task);
        task = pollTaskFrom(taskQueue);
        if (task == null) {
            return true;
        }
    }
}

safeExecute(),直接调用任务的run()

protected static void safeExecute(Runnable task) {
    try {
        task.run();
    } catch (Throwable t) {
        //
    }
}

5.NioEventLoop启动总结

NioEventLoop创建之后,就会通过自身的run()方法启动了,比如SingleThreadEventExecutor.this.run()。这里面是一个for(;;)循环

循环中会计算出一个IO的选择策略,这个策略表示调用selector.selectorNow()还是selector.select()方法。从名字上可以看出来,selectNow()是非阻塞的,select()会阻塞到有监听的socket就绪。使用哪种策略主要是看NioEventLoop的任务队列内是不是有任务执行,也就是hasTasks()方法。如果有,调用selectNow()方法

没有本地任务时会调用select()方法阻塞一段时间,并将计数器加一。也是为了解决jdk空轮询的bug,具体方法是当计数器超过阈值512时,重建selector

回来到选择策略,选择策略确定后,会对selectedKeys做处理

selectedKeys表示本次选择器筛选出来的就绪状态的Channel,然后迭代这个selectedKeys,从key中拿到关联的Channel,然后检查这个就绪的事件是读?写?连接?事件,然后进行相应处理

比如处理读就绪事件,会把Socket缓冲区的数据load到ByteBuf中,然后调用pipeline.fireChannelRead(),这样socket数据就在pipeline中存在了。pipeline会依次调用inBound中相应的handler进行处理。这就是NioEventLoop对IO事件的处理

处理完了IO事件,还会调用runAllTask()方法对非IO任务处理。这个具体就是从队列中取出任务,再执行其run()方法

参考:

  • https://www.cnblogs.com/ZhuChangwu/p/11196791.html
  • https://www.shuzhiduo.com/A/kvJ3pWmw5g/
  • https://www.dandelioncloud.cn/article/details/1526511425988345858

分类:

后端

标签:

Java

作者介绍

q
qianxws
V1