qianxws
2022/12/28阅读:149主题:橙心
netty中ServerBootstrap启动源码分析(上)
0.启动案例
以源码工程 example 中 io.netty.example.echo 包 EchoServer 为例:
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
先看核心类 ServerBootstrap 的继承结构
从
b.bind(PORT)
说起
最终会定位到AbstractBootstrap#ChannelFuture doBind(final SocketAddress localAddress)
精简后的主干代码
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1.创建和初始化NioServerSocketChannel并注册到boss线程,返回异步对象
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 2.绑定
if (regFuture.isDone()) {
// 返回异步任务完成直接绑定
doBind0(regFuture, channel, localAddress, promise);
} else {
// 异步任务未完成则监听任务,等待回调执行绑定
regFuture.addListener({
doBind0(regFuture, channel, localAddress, promise);
)};
}
}
下面就来看一下这两个核心方法
-
initAndRegister() -
doBind0()
1.initAndRegister()
此方法为服务端创建 NioServerSocketChannel 的主干方法,提取抽象功能代码如下。可以看出该方法表达的功能也很简单,就是创建->初始化->注册
。下面我们将这三步逐个分析一下。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1.创建channel
channel = channelFactory.newChannel();
// 2.初始化channel
init(channel);
} catch (Throwable t) {
// 异常处理
}
// 3.注册到boss线程和worker线程
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
1.1 创建 Channel
准确来说是创建 NioServerSocketChannel
可以看到,这里仅仅调用了一个工厂方法,这个工厂方法的实现也仅仅是通过反射的方式创建 Channel
ReflectiveChannelFactory#newChannel()
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
我们再回头看一下 ServerBootstrap 的配置方法
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
...
发现当时是传入的某个 Channel 的实现类的字节码对象,而且 ServerBootstrap.channel()
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
会通过构造ReflectiveChannelFactory
实例来放入一个 Channel 工厂
我们回到initAndRegister()
中的channelFactory.newChannel();
会很容易理解了,直接就会创建出 NioServerSocketChannel 实例
前人栽树,这里乘凉。无需多言
1.2 init(channel)
接下来就是配置创建好的 NioServerSocketChannel 实例,包含一些 options、attributes 等等,比如之前我们配置好的一些参数在此时会赋值给 channel 实例
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.option(ChannelOption.SO_BACKLOG, 100) // 相关配置
1.3 register(channel)
源码中是通过ChannelFuture regFuture = config().group().register(channel);
进行注册的。
初看代码,顿时产生几个疑问(真是字少事大)
-
config()是什么? -
config().group()是什么? -
register(channel)是将什么注册到哪里?
别急,源码之下无秘密,逐个击破
config()
在 ServerBootstrap 中有这样的配置属性 final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
在实例化启动类的时候也会实例化
config()是返回启动类的配置类,那么group()
目测跟 boss 线程和 worker 线程有关,是不是呢,看源码
在b.group(boss,worker)
方法中
// ServerBootstrap#group配置参数时的方法
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
// 关键看这里,将父类的group属性赋值为boss线程,即AbstractBootstrap的group属性
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
// AbstractBootstrap#group
public B group(EventLoopGroup group) {
ObjectUtil.checkNotNull(group, "group");
if (this.group != null) {
throw new IllegalStateException("group set already");
}
// 赋值父类
this.group = group;
return self();
}
可以看到,无非就是对启动类内部的 group 属性进行赋值,这里直接取出而已。
总结,config()和 group()仅仅是将前置的配置取出,只不过 config()返回的对象是包含当前启动类的配置类实例,然后再根据配置中的启动实例取出其中的 boss 线程,也就是 Reactor 线程模型的主 Reactor 线程。总结如下
-
config()是持有 ServerBootstrap 实例的配置实例 -
config().group()是 boss 线程 NioEventLoopGroup 实例
到这里问题就变成了boss.register(channel)
如何进行注册
NioEventLoopGroup 的继承关系

NioEventLoopGroup 没有对 register()方法的实现,会走其父类注册方法
MultithreadEventLoopGroup#register(Channel channel)
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
通过 debug 可以看到,next()是 NioEventLoopGroup 选择出一个 NioEventLoop 来完成注册任务,最终由 SingleThreadEventLoop#regitster(...) -> AbstractChannel#register(...)执行
AbstractChannel#regitster
抽取主干逻辑
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
// 1.将当前eventLoop与channel关联起来,这样就保证了每个channel都有一个eventLoop,都有线程资源
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
// 2.当前线程,直接注册
register0(promise);
} else {
try {
// 3.若非当前线程,交给任务队列注册
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 异常处理
}
}
}
AbstractChannel#regitster0
抽取主干逻辑
private void register0(ChannelPromise promise) {
try {
// 1.具体注册
doRegister();
// 2.channel已经在bossGroup上了,执行listener回调
pipeline.invokeHandlerAddedIfNeeded();
// 3.cas设置成功状态
safeSetSuccess(promise);
// 4.在pipeline中传播注册事件
pipeline.fireChannelRegistered();
...
} catch (Throwable t) {
// 异常处理
}
}
AbstractNioChannel#doRegister
抽取逻辑,可以看到实际上是对 jdk 原生 channel 的注册
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 实际上对jdk Channel的注册
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
// 异常处理
}
}
}
}
2.doBind0()
AbstractBootstrap#doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 1.将绑定方法封装成异步任务,放在任务队列中
// 2.由NioServerSocketChannel中的NioEventLoop执行
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
最终由 SingleThreadEventExcutor#excute -> excute0 -> execute(Runnable task, boolean immediate)执行 SingleThreadEventExecutor#execute(Runnable task, boolean immediate)
抽取关键逻辑
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
// 1.将当前任务放入队列
addTask(task);
if (!inEventLoop) {
// 2.执行
startThread();
}
}
startThread -> doStartThread
由此开启线程
作者介绍