q

qianxws

V1

2022/12/28阅读:149主题:橙心

netty中ServerBootstrap启动源码分析(上)

0.启动案例

以源码工程 example 中 io.netty.example.echo 包 EchoServer 为例:

// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
     .option(ChannelOption.SO_BACKLOG, 100)
     .handler(new LoggingHandler(LogLevel.INFO))
     .childHandler(new ChannelInitializer<SocketChannel>() 
{
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             if (sslCtx != null) {
                 p.addLast(sslCtx.newHandler(ch.alloc()));
             }
             //p.addLast(new LoggingHandler(LogLevel.INFO));
             p.addLast(serverHandler);
         }
     });

    // Start the server.
    ChannelFuture f = b.bind(PORT).sync();

    // Wait until the server socket is closed.
    f.channel().closeFuture().sync();
finally {
    // Shut down all event loops to terminate all threads.
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
}

先看核心类 ServerBootstrap 的继承结构

b.bind(PORT)说起

最终会定位到AbstractBootstrap#ChannelFuture doBind(final SocketAddress localAddress)

精简后的主干代码

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 1.创建和初始化NioServerSocketChannel并注册到boss线程,返回异步对象
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    // 2.绑定
    if (regFuture.isDone()) {
        // 返回异步任务完成直接绑定
        doBind0(regFuture, channel, localAddress, promise);
    } else {
        // 异步任务未完成则监听任务,等待回调执行绑定
        regFuture.addListener({
            doBind0(regFuture, channel, localAddress, promise);
        )};
    }
}

下面就来看一下这两个核心方法

  • initAndRegister()
  • doBind0()

1.initAndRegister()

此方法为服务端创建 NioServerSocketChannel 的主干方法,提取抽象功能代码如下。可以看出该方法表达的功能也很简单,就是创建->初始化->注册。下面我们将这三步逐个分析一下。

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 1.创建channel
            channel = channelFactory.newChannel();
            // 2.初始化channel
            init(channel);
        } catch (Throwable t) {
            // 异常处理
        }
        // 3.注册到boss线程和worker线程
        ChannelFuture regFuture = config().group().register(channel);
        return regFuture;
    }

1.1 创建 Channel

准确来说是创建 NioServerSocketChannel

可以看到,这里仅仅调用了一个工厂方法,这个工厂方法的实现也仅仅是通过反射的方式创建 Channel

ReflectiveChannelFactory#newChannel()

@Override
public T newChannel() {
    try {
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}

我们再回头看一下 ServerBootstrap 的配置方法

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 ...

发现当时是传入的某个 Channel 的实现类的字节码对象,而且 ServerBootstrap.channel()

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(
            ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

会通过构造ReflectiveChannelFactory实例来放入一个 Channel 工厂

我们回到initAndRegister()中的channelFactory.newChannel();会很容易理解了,直接就会创建出 NioServerSocketChannel 实例

前人栽树,这里乘凉。无需多言

1.2 init(channel)

接下来就是配置创建好的 NioServerSocketChannel 实例,包含一些 options、attributes 等等,比如之前我们配置好的一些参数在此时会赋值给 channel 实例

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .option(ChannelOption.SO_BACKLOG, 100// 相关配置

1.3 register(channel)

源码中是通过ChannelFuture regFuture = config().group().register(channel);进行注册的。

初看代码,顿时产生几个疑问(真是字少事大)

  • config()是什么?
  • config().group()是什么?
  • register(channel)是将什么注册到哪里?

别急,源码之下无秘密,逐个击破

config() 在 ServerBootstrap 中有这样的配置属性 final ServerBootstrapConfig config = new ServerBootstrapConfig(this); 在实例化启动类的时候也会实例化

config()是返回启动类的配置类,那么group()目测跟 boss 线程和 worker 线程有关,是不是呢,看源码

b.group(boss,worker)方法中

// ServerBootstrap#group配置参数时的方法
 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
     // 关键看这里,将父类的group属性赋值为boss线程,即AbstractBootstrap的group属性
    super.group(parentGroup);
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
    return this;
}

// AbstractBootstrap#group
public B group(EventLoopGroup group) {
    ObjectUtil.checkNotNull(group, "group");
    if (this.group != null) {
        throw new IllegalStateException("group set already");
    }
    // 赋值父类
    this.group = group;
    return self();
}

可以看到,无非就是对启动类内部的 group 属性进行赋值,这里直接取出而已。

总结,config()和 group()仅仅是将前置的配置取出,只不过 config()返回的对象是包含当前启动类的配置类实例,然后再根据配置中的启动实例取出其中的 boss 线程,也就是 Reactor 线程模型的主 Reactor 线程。总结如下

  • config()是持有 ServerBootstrap 实例的配置实例
  • config().group()是 boss 线程 NioEventLoopGroup 实例

到这里问题就变成了boss.register(channel)如何进行注册

NioEventLoopGroup 的继承关系

NioEventLoopGroup 没有对 register()方法的实现,会走其父类注册方法

MultithreadEventLoopGroup#register(Channel channel)

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

通过 debug 可以看到,next()是 NioEventLoopGroup 选择出一个 NioEventLoop 来完成注册任务,最终由 SingleThreadEventLoop#regitster(...) -> AbstractChannel#register(...)执行

AbstractChannel#regitster抽取主干逻辑

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    // 1.将当前eventLoop与channel关联起来,这样就保证了每个channel都有一个eventLoop,都有线程资源
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        // 2.当前线程,直接注册
        register0(promise);
    } else {
        try {
            // 3.若非当前线程,交给任务队列注册
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            // 异常处理
        }
    }
}

AbstractChannel#regitster0抽取主干逻辑

private void register0(ChannelPromise promise) {
  try {
      // 1.具体注册
      doRegister();
      // 2.channel已经在bossGroup上了,执行listener回调
      pipeline.invokeHandlerAddedIfNeeded();
      // 3.cas设置成功状态
      safeSetSuccess(promise);
      // 4.在pipeline中传播注册事件
      pipeline.fireChannelRegistered();
      ...
  } catch (Throwable t) {
      // 异常处理
  }
}

AbstractNioChannel#doRegister抽取逻辑,可以看到实际上是对 jdk 原生 channel 的注册

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 实际上对jdk Channel的注册
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                // 异常处理
            }
        }
    }
}

2.doBind0()

AbstractBootstrap#doBind0

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise)
 
{
  // 1.将绑定方法封装成异步任务,放在任务队列中
  // 2.由NioServerSocketChannel中的NioEventLoop执行
  channel.eventLoop().execute(new Runnable() {
      @Override
      public void run() {
          if (regFuture.isSuccess()) {
              channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
          } else {
              promise.setFailure(regFuture.cause());
          }
      }
  });
}

最终由 SingleThreadEventExcutor#excute -> excute0 -> execute(Runnable task, boolean immediate)执行 SingleThreadEventExecutor#execute(Runnable task, boolean immediate)抽取关键逻辑

private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        // 1.将当前任务放入队列
        addTask(task);
        if (!inEventLoop) {
            // 2.执行
            startThread();
        }
    }

startThread -> doStartThread

由此开启线程

分类:

后端

标签:

Java

作者介绍

q
qianxws
V1