Janker
2022/09/04阅读:91主题:全栈蓝
Dubbo集群容错 - 容错机制的实现
Dubbo集群容错 - 容错机制的实现
前言
在微服务环境中,为了保证服务的高可用,很少会有单点服务出现,服务通常都是以集群 的形式出现的。在之前文章中 [Dubbo远程调用过程](Dubbo远程调用过程 (qq.com)) 我们已经了解了远程调用的实现,然而被调用的远程服务 并不是每时每刻都保持良好状况,当某个服务调用出现异常时,如网络抖动、服务短暂不可用 需要自动容错,或者只想本地测试、服务降级,需要Mock返回结果,就需要使用本篇提到集群容错机制。
概述
集群容错机制主要集中在源码dubbo-cluster
模块中,我们可以把Cluster
看作一个集群容错层,该层中包含Cluster
、Directory
、Router
、LoadBalance
几大核心接口。注意这里要区分Cluster
层和Cluster
接口,Cluster
层是抽象概念,表示的是对 外的整个集群容错层;Cluster
是容错接口,提供Failover
、Failfast
等容错策略。
由于Cluster
层的实现众多,因此下面介绍的流程是一个基于Abstractclusterinvoker
的全量流程,某些实现可能只使用了该流程的一小部分。Cluster
的总体工作流程可以分为以下几步:
-
生成 Invoker
对象。不同的Cluster
实现会生成不同类型的Clusterinvoker
对象并返回。然后调用Clusterinvoker
的Invoker
方法,正式开始调用流程。 -
获得可调用的服务列表。首先会做前置校验,检查远程服务是否已被销毁。然后通过 Directory#list
方法获取所有可用的服务列表。接着使用Router
接口处理该服务列表,根据路由规则过滤一部分服务,最终返回剩余的服务列表。 -
做负载均衡。在第2步中得到的服务列表还需要通过不同的负载均衡策略选出一个服 务,用作最后的调用。首先框架会根据用户的配置,调用 ExtensionLoader
获取不同负载均衡策 略的扩展点实现(具体负载均衡策略会在后面讲解)。然后做一些后置操作,如果是异步调用则 设置调用编号。接着调用子类实现的dolnvoke
方法(父类专门留了这个抽象方法让子类实现), 子类会根据具体的负载均衡策略选出一个可以调用的服务。 -
做RPC调用。首先保存每次调用的 Invoker
到RPC上下文,并做RPC调用。然后处 理调用结果,对于调用出现异常、成功、失败等情况,每种容错策略会有不同的处理方式。在本文后面会讲到不同容错策略的实现。
「总体调用流程如下:」

上图是一个全量的通用流程,其中1〜3步都是在抽象方法Abstractclusterinvoker
中 实现的,可以理解为通用的模板流程,主要做了校验、参数准备等工作,最终调用子类实现的 dolnvoke
方法。不同的ClusterInvoker
子类都继承了该抽象类,子类会在上述流程中做个性化的裁剪。
容错机制的实现
Cluster
接口一共有9种不同的实现,每种实现分别对应不同的Clusterlnvoker
。本篇会介绍继承了 AbstractclusterInvoker
的7 种Clusterinvoker
实现,Merge
和Mock
属于特殊机制会在下一篇给大家介绍。
容错机制概述
Dubbo
容错机制能增强整个应用的鲁棒性,容错过程对上层用户是完全透明的,但用户也 可以通过不同的配置项来选择不同的容错机制。每种容错机制又有自己个性化的配置项。Dubbo
中现有 Failover
、 Failfast
、 Failsafe
、Fallback
、 Forking
、 Broadcast
等容错机制,容错机制的特性如下表。
机制名 | 机制简介 |
---|---|
Failover | 当出现失败时,会重试其他服务器。用户可以通过retries="2"设置重试次数。这是 Dubbo的默认容错机制,会对请求做负载均衡。通常使用在读操作或幕等的写操作上, 但重试会导致接口的延退增大,在下游机器负载已经达到极限时,重试容易加重下游 服务的负载 |
Failfast | 快速失败,当请求失败后,快速返回异常结果,不做任何重试。该容错机制会对请求 做负载均衡,通常使用在非幕等接口的调用上。该机制受网络抖动的影响较大 |
Failsafe | 当出现异常时,直接忽略异常。会对请求做负载均衡。通常使用在"佛系"调用场景, 即不关心调用是否成功,并且不想抛异常影响外层调用,如某些不重要的日志同步, 即使出现异常也无所谓 |
Fallback | 请求失败后,会自动记录在失败队列中,并由一个定时线程池定时重试,适用于一些 异步或最终一致性的请求。请求会做负载均衡 |
Forking | 同时调用多个相同的服务,只要其中一个返回,则立即返回结果。用户可以配置 forks:"最大并行调用数" 参数来确定最大并行调用的服务数量。通常使用在对接口 实时性要求极高的调用上,但也会浪费更多的资源 |
Broadcast | 广播调用所有可用的服务,任意一个节点报错则报错。由于是广播,因此请求不需要 做负载均衡。通常用于服务状态更新后的广播 |
Mock | 提供调用失败时,返回伪造的响应结果。或直接强制返回伪造的结果,不会发起远程 调用 |
Available | 最简单的方式,请求不会做负载均衡,遍历所有服务列表,找到第一个可用的节点, 直接请求并返回结果。如果没有可用的节点,则直接抛出异常 |
Mergeable | Mergeable可以自动把多个节点请求得到的结果进行合并 |
Cluseter的具体实现:用户可以在<dubbo :service>
、<dubbo:reference>
、<dubbo:consumer>
、
<dubbo:provider>
标签上通过cluster
属性设置。 对于Failover
容错模式,用户可以通过retries
属性来设置最大重试次数。可以设置在dubbo: reference
标签上,也可以设置在细粒度的方法标签dubbo:method
上。
对于Forking
容错模式,用户可通过forks="最大并行数"
属性来设置最大并行数。假设设置 的forks
数为n
,可用的服务数为m
,当n
< m
时,即可用的服务数大于配置的并行数,则并行请 求 n
个服务;当 n
> m
时,即可用的服务数小于配置的并行数,则请求所有可用的服务m
。
对于Mergeable
容错模式,用可以在dubbo:reference
标签中通过merger="true"
开启,合并时可以通过group="*"
属性指定需要合并哪些分组的结果。默认会根据方法的返回值自动匹 配合并器,如果同一个类型有两个不同的合并器实现,则需要在参数中指定合并器的名字(merger="合并器名"
)。例如:用户根据某List
类型的返回结果实现了多个合并器,则需要手动 指定合并器名称,否则框架不知道要用哪个。如果想调用返回结果的指定方法进行合并(如返 回了一个Set
,想调用Set#addAll
方法),则可以通过merger=".addAll'
配置来实现。
官方 Mergeable
配置示例如下。
<!-- 搜索所有分组,根据返回结果的类型自动查找合并器。该接口中getMenuItems方法不做合并 -->
<dubbo:reference interface="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger="true" />
</dubbo:reference>
<!-- 指定方法合并结果 -->
<dubbo:reference interface="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger="mymerge" />
</dubbo:reference>
<!-- 调用返回结果的指定方法进行合并 -->
<dubbo:reference interface="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger=".addAll" />
</dubbo:reference>
Cluster 接口关系
在微服务环境中,可能多个节点同时都提供同一个服务。当上层调用Invoker时,无论实际 存在多少个Invoker,只需要通过Cluster层,即可完成整个调用的容错逻辑,包括获取服务列 表、路由、负载均衡等,整个过程对上层都是透明的。当然,Cluster接口只是串联起整个逻辑, 其中Clusterlnvokep只实现了容错策略部分,其他逻辑则是调用了 Directory、Router、LoadBalance 等接口实现。
容错的接口主要分为两大类,第一类是Cluster类,第二类是Clusterinvoker类。Cluster和 Clusterinvoker之间的关系也非常简单:Cluster接口下面有多种不同的实现,每种实现中都需要 实现接口的join方法,在方法中会“new”一个对应的Clusterinvoker实现。我们以FailoverCluster 实现为例进行说明,代码如下:
「Cluster与Clusterinvoker之间的关系示例」
public abstract class AbstractCluster implements Cluster {
//.....
}
public class FailoverCluster extends AbstractCluster {
public final static String NAME = "failover";
@Override
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<>(directory);
}
}
FailoverCluster
是Cluster
的其中一种实现,FailoverCluster
中直接创建了一个新的 FailoverClusterlnvoker
并返回。FailoverClusterlnvoker
继承的接口是 Invoker
。光看文字描述还是 比较难懂。因此,在理解集群容错的详细原理之前,我们先从“上帝视角”看一下整个集群容错的接口关系。Cluster
接口的类图关系如下图。

「ClusterInvoker总体类结构」

Invoker
接口是最上层的接口,它下面分别有 AbstractClusterlnvoker
、 MockClusterlnvoker
和 MergeableClusterlnvoker
H 个类。其中,AbstractClusterlnvoker
是一个抽象类,其封装了通用 的模板逻辑,如获取服务列表、负载均衡、调用服务提供者等,并预留了一个dolnvoke方法需 要子类自行实现。AbstractClusterlnvoker
T面有7个子类,分别实现了不同的集群容错机制。
MockClusterlnvoker
和MergeableClusterlnvoker
由于并不适用于正常的集群容错逻辑,因此 没有挂在AbstractClusterlnvoker
下面,而是直接继承了 Invoker
接口。
以上就是容错的接口,Directory
、Router
和LoadBalance
的接口会在以后的文章中详细探讨。
Failover 策略
Cluster
接口上有SPI
注解@SPI(FailoverCluster.NAME)
,即默认实现是Failover
。该策略的 代码逻辑如下:
-
校验。校验从 AbstractClusterlnvoker
传入的Invoker
列表是否为空。 -
获取配置参数。从调用 URL
中获取对应的retries
重试次数。 -
初始化一些集合和对象。用于保存调用过程中出现的异常、记录调用了哪些节点(这 个会在负载均衡中使用,在某些配置下,尽量不要一直调用同一个服务)。 -
使用for循环实现重试,for循环的次数就是重试的次数。成功则返回,否则继续循环。 如果for循环完,还没有一个成功的返回,则抛出异常,把(3)中记录的信息抛出去。
前3步都是做一些校验、数据准备的工作。第4步开始真正的调用逻辑。以下步骤是for 循环中的逻辑:
-
校验。如果for循环次数大于1,即有过一次失败,则会再次校验节点是否被销毁、传 入的 Invoker
列表是否为空。 -
负载均衡。调用 select
方法做负载均衡,得到要调用的节点,并记录这个节点到步骤3 的集合里,再把己经调用的节点信息放进RPC
上下文中。 -
远程调用。调用 invoker#invoke
方法做远程调用,成功则返回,异常则记录异常信息, 再做下次循环。
Failover流程如下图

Failfast 策略
Failfast会在失败后直接抛出异常并返回,实现非常简单,步骤如下:
-
校验。校验从 AbstractClusterlnvoker
传入的Invoker列表是否为空。 -
负载均衡。调用 select
方法做负载均衡,得到要调用的节点。 -
进行远程调用。在try代码块中调用 invoker#invoke
方法做远程调用。如果捕获到异 常,则直接封装成RpcException
抛出。
整个过程非常简短,也不会做任何中间信息的记录。
Failsafe 策略
Failsafe
调用时如果出现异常,则会直接忽略。实现也非常简单,步骤如下:
-
校验传入的参数。校验从 AbstractClusterlnvoker
传入的Invoker
列表是否为空。 -
负载均衡。调用 select
方法做负载均衡,得到要调用的节点。 -
远程调用。在 try
代码块中调用invoker#invoke
方法做远程调用,"catch
"到任何异常都直接"吞掉",返回一个空的结果集。
Failsafe
调用源代码如下:
「Failsafe调用源码」
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
//1 验传入的参数
checkInvokers(invokers, invocation);
//2 做负载均衡
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
//3 进行远程调用,调用成功则直接返回
return invokeWithContext(invoker, invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
//捕获到异常,直接返回一个空的结果集
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}
Fallback 策略
Fallback
如果调用失败,则会定期重试。FailbackClusterlnvoker
里面定义了一个 HashedWheelTimer
,里面保存了很多定时失败需要重试的Task,重试失败重新放回HashedWheelTimer
中,如果调用重试成功,则会清理serviceContext
中移除。 dolnvoke
的调用逻辑如下:
-
校验传入的参数。校验从 AbstractClusterlnvoker
传入的Invoker
列表是否为空。 -
负载均衡。调用select方法做负载均衡,得到要调用的节点。 -
远程调用。在 try
代码块中调用invoker#invoke
方法做远程调用,"catch
" 到异常后 直接把invocation
、loadbalance
、invokers
、consumerUrl
等信息构建了一个RetryTimerTask
重试任务。 -
定时 HashedWheelTimer
会定时执行RetryTimerTask
,如果请求还是失败,则会重新添加到HashedWheelTimer
中。
Fallback重试源码如下:
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Invoker<T> invoker = null;
URL consumerUrl = RpcContext.getServiceContext().getConsumerUrl();
try {
//校验传入的参数
checkInvokers(invokers, invocation);
//负载均衡
invoker = select(loadbalance, invocation, invokers, null);
// Asynchronous call method must be used here, because failback will retry in the background.
// Then the serviceContext will be cleared after the call is completed.
//远程调用
return invokeWithContextAsync(invoker, invocation, consumerUrl);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
if (retries > 0) {
////添加失败的请求到重试failTimer中
addFailed(loadbalance, invocation, invokers, invoker, consumerUrl);
}
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}
//添加失败的请求到重试failTimer中
private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, URL consumerUrl) {
if (failTimer == null) {
synchronized (this) {
if (failTimer == null) {
failTimer = new HashedWheelTimer(
new NamedThreadFactory("failback-cluster-timer", true),
1,
TimeUnit.SECONDS, 32, failbackTasks);
}
}
}
//构建RetryTimerTask
RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD, consumerUrl);
try {
failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
} catch (Throwable e) {
logger.error("Failback background works error, invocation->" + invocation + ", exception: " + e.getMessage());
}
}
Available 策略
Available
是找到第一个可用的服务直接调用,并返回结果。步骤如下:
-
遍历从 AbstractClusterlnvoker
传入的Invoker
列表,如果Invoker
是可用的,则直接调用并返回。 -
如果遍历整个列表还没找到可用的 Invoker
,则抛出异常
Broadcast 策略
Broadcast
会广播给所有可用的节点,如果任何一个节点报错,则返回异常。步骤如下:
-
前置操作。校验从 AbstractClusterlnvoker
传入的Invoker
列表是否为空;在RPC
上下 文中设置Invoker
列表;初始化一些对象,用于保存调用过程中产生的异常和结果信息等。 -
循环遍历所有 Invoker
,直接做RPC
调用。任何一个节点调用出错,并不会中断整个 广播过程,会先记录异常,在最后广播完成后再抛出。如果多个节点异常,则只有最后一个节 点的异常会被抛出,前面的异常会被覆盖。
Forking 策略
Forking
可以同时并行请求多个服务,有任何一个返回,则直接返回。相对于其他调用策略,Forking
的实现是最复杂的。其步骤如下:
-
准备工作。校验传入的
Invoker
列表是否可用;初始化一个Invoker
集合,用于保存真正要调用的Invoker
列表;从URL
中得到最大并行数、超时时间。 -
获取最终要调用的
Invoker
列表。假设用户设置最大的并行数为n
, 实际可以调用的 最大服务数为m
。如果n
< 0或n
<m
,则说明可用的服务数小于用户的设置,因此最终要调用的Invoker
只能有m
个;如果n
<m
,则会循环调用负载均衡方法,不断得到可调用的Invoker
,加入 步骤1中的Invoker
集合里。这里有一点需要注意:在
Invoker
加入集合时,会做去重操作。因此,如果用户设置的负载 均衡策略每次返回的都是同一个Invoker
,那么集合中最后只会存在一个Invoker
,也就是只会 调用一个节点。 -
调用前的准备工作。设置要调用的
Invoker
列表到RPC
上下文;初始化一个异常计数 器;初始化一个阻塞队列,用于记录并行调用的结果。 -
执行调用。循环使用线程池并行调用,调用成功,则把结果加入阻塞队列;调用失败, 则失败计数+1。如果所有线程的调用都失败了,即失败计数大于等于所有可调用的
Invoker
时,则把 异常信息加入阻塞队列。这里有一点需要注意:并行调用是如何保证个别调用失败不返回异常信息,只有全部失败 才返回异常信息的呢?因为有判断条件,当失败计数N所有可调用的
Invoker
时,才会把异常信 息放入阻塞队列,所以只有当最后一个Invoker
也调用失败时才会把异常信息保存到阻塞队列, 从而达到全部失败才返回异常的效果。 -
同步等待结果。由于步骤4中的步骤是在线程池中执行的,因此主线程还会继续往下执行,主线程中会使用阻塞队列的
poll
("超时时间")方法,同步等待阻塞队列中的第一个结果, 如果是正常结果则返回,如果是异常则抛出。
从上面步骤可以得知,Forking
的超时是通过在阻塞队列的poll方法中传入超时时间实现的; 线程池中的并发调用会获取第一个正常返回结果。只有所有请求都失败了,Forking
才会失败。 Forking
调用流程如图

总结
以上就是所有普通容错策略的实现原理,他在Cluster
模块中是一个非常重要的组件之一,类似的在这个模块非常重要的组件还包括,Directory
、 Router
、 LoadBalance
其实现原理,在下一篇中会深入探索,敬请期待。
作者介绍