Janker

V1

2022/09/04阅读:91主题:全栈蓝

Dubbo集群容错 - 容错机制的实现

Dubbo集群容错 - 容错机制的实现

前言

在微服务环境中,为了保证服务的高可用,很少会有单点服务出现,服务通常都是以集群 的形式出现的。在之前文章中 [Dubbo远程调用过程](Dubbo远程调用过程 (qq.com)) 我们已经了解了远程调用的实现,然而被调用的远程服务 并不是每时每刻都保持良好状况,当某个服务调用出现异常时,如网络抖动、服务短暂不可用 需要自动容错,或者只想本地测试、服务降级,需要Mock返回结果,就需要使用本篇提到集群容错机制。

概述

集群容错机制主要集中在源码dubbo-cluster模块中,我们可以把Cluster看作一个集群容错层,该层中包含ClusterDirectoryRouterLoadBalance 几大核心接口。注意这里要区分Cluster层和Cluster接口,Cluster层是抽象概念,表示的是对 外的整个集群容错层;Cluster是容错接口,提供FailoverFailfast等容错策略。

由于Cluster层的实现众多,因此下面介绍的流程是一个基于Abstractclusterinvoker的全量流程,某些实现可能只使用了该流程的一小部分。Cluster的总体工作流程可以分为以下几步:

  1. 生成Invoker对象。不同的Cluster实现会生成不同类型的Clusterinvoker对象并返回。然后调用ClusterinvokerInvoker方法,正式开始调用流程。
  2. 获得可调用的服务列表。首先会做前置校验,检查远程服务是否已被销毁。然后通过 Directory#list方法获取所有可用的服务列表。接着使用Router接口处理该服务列表,根据路由规则过滤一部分服务,最终返回剩余的服务列表。
  3. 做负载均衡。在第2步中得到的服务列表还需要通过不同的负载均衡策略选出一个服 务,用作最后的调用。首先框架会根据用户的配置,调用ExtensionLoader获取不同负载均衡策 略的扩展点实现(具体负载均衡策略会在后面讲解)。然后做一些后置操作,如果是异步调用则 设置调用编号。接着调用子类实现的dolnvoke方法(父类专门留了这个抽象方法让子类实现), 子类会根据具体的负载均衡策略选出一个可以调用的服务。
  4. 做RPC调用。首先保存每次调用的Invoker到RPC上下文,并做RPC调用。然后处 理调用结果,对于调用出现异常、成功、失败等情况,每种容错策略会有不同的处理方式。在本文后面会讲到不同容错策略的实现。

总体调用流程如下:

上图是一个全量的通用流程,其中1〜3步都是在抽象方法Abstractclusterinvoker中 实现的,可以理解为通用的模板流程,主要做了校验、参数准备等工作,最终调用子类实现的 dolnvoke方法。不同的ClusterInvoker子类都继承了该抽象类,子类会在上述流程中做个性化的裁剪。

容错机制的实现

Cluster接口一共有9种不同的实现,每种实现分别对应不同的Clusterlnvoker。本篇会介绍继承了 AbstractclusterInvoker 的7 种Clusterinvoker 实现,MergeMock属于特殊机制会在下一篇给大家介绍。

容错机制概述

Dubbo容错机制能增强整个应用的鲁棒性,容错过程对上层用户是完全透明的,但用户也 可以通过不同的配置项来选择不同的容错机制。每种容错机制又有自己个性化的配置项。Dubbo 中现有 FailoverFailfastFailsafeFallbackForkingBroadcast 等容错机制,容错机制的特性如下表。

机制名 机制简介
Failover 当出现失败时,会重试其他服务器。用户可以通过retries="2"设置重试次数。这是
Dubbo的默认容错机制,会对请求做负载均衡。通常使用在读操作或幕等的写操作上,
但重试会导致接口的延退增大,在下游机器负载已经达到极限时,重试容易加重下游
服务的负载
Failfast 快速失败,当请求失败后,快速返回异常结果,不做任何重试。该容错机制会对请求
做负载均衡,通常使用在非幕等接口的调用上。该机制受网络抖动的影响较大
Failsafe 当出现异常时,直接忽略异常。会对请求做负载均衡。通常使用在"佛系"调用场景,
即不关心调用是否成功,并且不想抛异常影响外层调用,如某些不重要的日志同步,
即使出现异常也无所谓
Fallback 请求失败后,会自动记录在失败队列中,并由一个定时线程池定时重试,适用于一些
异步或最终一致性的请求。请求会做负载均衡
Forking 同时调用多个相同的服务,只要其中一个返回,则立即返回结果。用户可以配置
forks:"最大并行调用数"参数来确定最大并行调用的服务数量。
通常使用在对接口 实时性要求极高的调用上,但也会浪费更多的资源
Broadcast 广播调用所有可用的服务,任意一个节点报错则报错。由于是广播,因此请求不需要
做负载均衡。通常用于服务状态更新后的广播
Mock 提供调用失败时,返回伪造的响应结果。或直接强制返回伪造的结果,不会发起远程
调用
Available 最简单的方式,请求不会做负载均衡,遍历所有服务列表,找到第一个可用的节点,
直接请求并返回结果。如果没有可用的节点,则直接抛出异常
Mergeable Mergeable可以自动把多个节点请求得到的结果进行合并

Cluseter的具体实现:用户可以在<dubbo :service><dubbo:reference><dubbo:consumer>

<dubbo:provider>标签上通过cluster属性设置。 对于Failover容错模式,用户可以通过retries属性来设置最大重试次数。可以设置在dubbo: reference标签上,也可以设置在细粒度的方法标签dubbo:method上。

对于Forking容错模式,用户可通过forks="最大并行数"属性来设置最大并行数。假设设置 的forks数为n,可用的服务数为m,当n < m时,即可用的服务数大于配置的并行数,则并行请 求 n 个服务;当 n > m 时,即可用的服务数小于配置的并行数,则请求所有可用的服务m

对于Mergeable容错模式,用可以在dubbo:reference标签中通过merger="true"开启,合并时可以通过group="*"属性指定需要合并哪些分组的结果。默认会根据方法的返回值自动匹 配合并器,如果同一个类型有两个不同的合并器实现,则需要在参数中指定合并器的名字(merger="合并器名")。例如:用户根据某List类型的返回结果实现了多个合并器,则需要手动 指定合并器名称,否则框架不知道要用哪个。如果想调用返回结果的指定方法进行合并(如返 回了一个Set,想调用Set#addAll方法),则可以通过merger=".addAll'配置来实现。

官方 Mergeable配置示例如下。

<!-- 搜索所有分组,根据返回结果的类型自动查找合并器。该接口中getMenuItems方法不做合并 -->
<dubbo:reference interface="com.xxx.MenuService" group="*">
    <dubbo:method name="getMenuItems" merger="true" />
</dubbo:reference>

<!-- 指定方法合并结果 -->
<dubbo:reference interface="com.xxx.MenuService" group="*">
    <dubbo:method name="getMenuItems" merger="mymerge" />
</dubbo:reference>

<!-- 调用返回结果的指定方法进行合并 -->
<dubbo:reference interface="com.xxx.MenuService" group="*">
    <dubbo:method name="getMenuItems" merger=".addAll" />
</dubbo:reference>

Cluster 接口关系

在微服务环境中,可能多个节点同时都提供同一个服务。当上层调用Invoker时,无论实际 存在多少个Invoker,只需要通过Cluster层,即可完成整个调用的容错逻辑,包括获取服务列 表、路由、负载均衡等,整个过程对上层都是透明的。当然,Cluster接口只是串联起整个逻辑, 其中Clusterlnvokep只实现了容错策略部分,其他逻辑则是调用了 Directory、Router、LoadBalance 等接口实现。

容错的接口主要分为两大类,第一类是Cluster类,第二类是Clusterinvoker类。Cluster和 Clusterinvoker之间的关系也非常简单:Cluster接口下面有多种不同的实现,每种实现中都需要 实现接口的join方法,在方法中会“new”一个对应的Clusterinvoker实现。我们以FailoverCluster 实现为例进行说明,代码如下:

Cluster与Clusterinvoker之间的关系示例

public abstract class AbstractCluster implements Cluster {
 //.....
}
public class FailoverCluster extends AbstractCluster {

    public final static String NAME = "failover";

    @Override
    public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<>(directory);
    }

}

FailoverClusterCluster的其中一种实现,FailoverCluster中直接创建了一个新的 FailoverClusterlnvoker 并返回。FailoverClusterlnvoker 继承的接口是 Invoker。光看文字描述还是 比较难懂。因此,在理解集群容错的详细原理之前,我们先从“上帝视角”看一下整个集群容错的接口关系。Cluster接口的类图关系如下图。

ClusterInvoker总体类结构

Invoker 接口是最上层的接口,它下面分别有 AbstractClusterlnvokerMockClusterlnvokerMergeableClusterlnvoker H 个类。其中,AbstractClusterlnvoker 是一个抽象类,其封装了通用 的模板逻辑,如获取服务列表、负载均衡、调用服务提供者等,并预留了一个dolnvoke方法需 要子类自行实现。AbstractClusterlnvoker T面有7个子类,分别实现了不同的集群容错机制。

MockClusterlnvokerMergeableClusterlnvoker由于并不适用于正常的集群容错逻辑,因此 没有挂在AbstractClusterlnvoker下面,而是直接继承了 Invoker接口。

以上就是容错的接口,DirectoryRouterLoadBalance的接口会在以后的文章中详细探讨。

Failover 策略

Cluster接口上有SPI注解@SPI(FailoverCluster.NAME),即默认实现是Failover。该策略的 代码逻辑如下:

  1. 校验。校验从AbstractClusterlnvoker传入的Invoker列表是否为空。
  2. 获取配置参数。从调用URL中获取对应的retries重试次数。
  3. 初始化一些集合和对象。用于保存调用过程中出现的异常、记录调用了哪些节点(这 个会在负载均衡中使用,在某些配置下,尽量不要一直调用同一个服务)。
  4. 使用for循环实现重试,for循环的次数就是重试的次数。成功则返回,否则继续循环。 如果for循环完,还没有一个成功的返回,则抛出异常,把(3)中记录的信息抛出去。

前3步都是做一些校验、数据准备的工作。第4步开始真正的调用逻辑。以下步骤是for 循环中的逻辑:

  • 校验。如果for循环次数大于1,即有过一次失败,则会再次校验节点是否被销毁、传 入的Invoker列表是否为空。
  • 负载均衡。调用select方法做负载均衡,得到要调用的节点,并记录这个节点到步骤3 的集合里,再把己经调用的节点信息放进RPC上下文中。
  • 远程调用。调用invoker#invoke方法做远程调用,成功则返回,异常则记录异常信息, 再做下次循环。

Failover流程如下图

image-20220121214521249
image-20220121214521249

Failfast 策略

Failfast会在失败后直接抛出异常并返回,实现非常简单,步骤如下:

  • 校验。校验从AbstractClusterlnvoker传入的Invoker列表是否为空。
  • 负载均衡。调用select方法做负载均衡,得到要调用的节点。
  • 进行远程调用。在try代码块中调用invoker#invoke方法做远程调用。如果捕获到异 常,则直接封装成RpcException抛出。

整个过程非常简短,也不会做任何中间信息的记录。

Failsafe 策略

Failsafe调用时如果出现异常,则会直接忽略。实现也非常简单,步骤如下:

  • 校验传入的参数。校验从AbstractClusterlnvoker传入的Invoker列表是否为空。
  • 负载均衡。调用select方法做负载均衡,得到要调用的节点。
  • 远程调用。在try代码块中调用invoker#invoke方法做远程调用,"catch"到任何异常都直接"吞掉",返回一个空的结果集。

Failsafe调用源代码如下:

Failsafe调用源码

    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
           //1 验传入的参数
            checkInvokers(invokers, invocation);
           //2 做负载均衡
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
           //3 进行远程调用,调用成功则直接返回
            return invokeWithContext(invoker, invocation);
        } catch (Throwable e) {
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
           //捕获到异常,直接返回一个空的结果集
            return AsyncRpcResult.newDefaultAsyncResult(nullnull, invocation); // ignore
        }
    }

Fallback 策略

Fallback如果调用失败,则会定期重试。FailbackClusterlnvoker里面定义了一个 HashedWheelTimer,里面保存了很多定时失败需要重试的Task,重试失败重新放回HashedWheelTimer中,如果调用重试成功,则会清理serviceContext中移除。 dolnvoke的调用逻辑如下:

  1. 校验传入的参数。校验从AbstractClusterlnvoker传入的Invoker列表是否为空。
  2. 负载均衡。调用select方法做负载均衡,得到要调用的节点。
  3. 远程调用。在try代码块中调用invoker#invoke方法做远程调用," catch" 到异常后 直接把invocationloadbalanceinvokersconsumerUrl等信息构建了一个RetryTimerTask重试任务。
  4. 定时HashedWheelTimer会定时执行RetryTimerTask,如果请求还是失败,则会重新添加到HashedWheelTimer中。

Fallback重试源码如下:

    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        Invoker<T> invoker = null;
        URL consumerUrl = RpcContext.getServiceContext().getConsumerUrl();
        try {
           //校验传入的参数
            checkInvokers(invokers, invocation);
           //负载均衡
            invoker = select(loadbalance, invocation, invokers, null);
            // Asynchronous call method must be used here, because failback will retry in the background.
            // Then the serviceContext will be cleared after the call is completed.
           //远程调用
            return invokeWithContextAsync(invoker, invocation, consumerUrl);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                + e.getMessage() + ", ", e);
            if (retries > 0) {
               ////添加失败的请求到重试failTimer中
                addFailed(loadbalance, invocation, invokers, invoker, consumerUrl);
            }
            return AsyncRpcResult.newDefaultAsyncResult(nullnull, invocation); // ignore
        }
    }
 
//添加失败的请求到重试failTimer中
private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, URL consumerUrl) {
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    failTimer = new HashedWheelTimer(
                        new NamedThreadFactory("failback-cluster-timer"true),
                        1,
                        TimeUnit.SECONDS, 32, failbackTasks);
                }
            }
        }
     //构建RetryTimerTask
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD, consumerUrl);
        try {
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error, invocation->" + invocation + ", exception: " + e.getMessage());
        }
}

Available 策略

Available 是找到第一个可用的服务直接调用,并返回结果。步骤如下:

  1. 遍历从AbstractClusterlnvoker传入的Invoker列表,如果Invoker是可用的,则直接调用并返回。
  2. 如果遍历整个列表还没找到可用的Invoker,则抛出异常

Broadcast 策略

Broadcast会广播给所有可用的节点,如果任何一个节点报错,则返回异常。步骤如下:

  1. 前置操作。校验从AbstractClusterlnvoker传入的Invoker列表是否为空;在RPC上下 文中设置Invoker列表;初始化一些对象,用于保存调用过程中产生的异常和结果信息等。
  2. 循环遍历所有Invoker,直接做RPC调用。任何一个节点调用出错,并不会中断整个 广播过程,会先记录异常,在最后广播完成后再抛出。如果多个节点异常,则只有最后一个节 点的异常会被抛出,前面的异常会被覆盖。

Forking 策略

Forking可以同时并行请求多个服务,有任何一个返回,则直接返回。相对于其他调用策略,Forking的实现是最复杂的。其步骤如下:

  1. 准备工作。校验传入的Invoker列表是否可用;初始化一个Invoker集合,用于保存真正要调用的Invoker列表;从URL中得到最大并行数、超时时间。

  2. 获取最终要调用的Invoker列表。假设用户设置最大的并行数为n, 实际可以调用的 最大服务数为m。如果n < 0或 n < m,则说明可用的服务数小于用户的设置,因此最终要调用的 Invoker 只能有m个;如果 n < m ,则会循环调用负载均衡方法,不断得到可调用的Invoker,加入 步骤1中的Invoker集合里。

    这里有一点需要注意:在Invoker加入集合时,会做去重操作。因此,如果用户设置的负载 均衡策略每次返回的都是同一个Invoker,那么集合中最后只会存在一个Invoker,也就是只会 调用一个节点。

  3. 调用前的准备工作。设置要调用的Invoker列表到RPC上下文;初始化一个异常计数 器;初始化一个阻塞队列,用于记录并行调用的结果。

  4. 执行调用。循环使用线程池并行调用,调用成功,则把结果加入阻塞队列;调用失败, 则失败计数+1。如果所有线程的调用都失败了,即失败计数大于等于所有可调用的Invoker时,则把 异常信息加入阻塞队列。

    这里有一点需要注意:并行调用是如何保证个别调用失败不返回异常信息,只有全部失败 才返回异常信息的呢?因为有判断条件,当失败计数N所有可调用的Invoker时,才会把异常信 息放入阻塞队列,所以只有当最后一个Invoker也调用失败时才会把异常信息保存到阻塞队列, 从而达到全部失败才返回异常的效果。

  5. 同步等待结果。由于步骤4中的步骤是在线程池中执行的,因此主线程还会继续往下执行,主线程中会使用阻塞队列的poll("超时时间")方法,同步等待阻塞队列中的第一个结果, 如果是正常结果则返回,如果是异常则抛出。

从上面步骤可以得知,Forking的超时是通过在阻塞队列的poll方法中传入超时时间实现的; 线程池中的并发调用会获取第一个正常返回结果。只有所有请求都失败了,Forking才会失败。 Forking调用流程如图

总结

以上就是所有普通容错策略的实现原理,他在Cluster模块中是一个非常重要的组件之一,类似的在这个模块非常重要的组件还包括,DirectoryRouterLoadBalance其实现原理,在下一篇中会深入探索,敬请期待。

分类:

后端

标签:

后端

作者介绍

Janker
V1