jrh

V1

2023/02/27阅读:20主题:萌粉

Java 面试八股文之消息队列篇(一)

前言

封面取自电影《Finch》,如果你看过这部电影,就一定记得曾经让我们泪流满面的这一幕。 ——古德耶尔和杰夫最爱的人

这是系列文章【 Java 面试八股文 】消息队列篇的第一期。

【 Java 面试八股文 】系列会陆续更新 Java 面试中的高频问题,旨在从问题出发,带你理解 Java 基础,数据结构与算法,数据库,常用框架等知识点。该系列前几期文章可以通过下方给出的链接进行查看~

往期文章

消息队列篇(一)

1、消息队列是什么?有什么作用?


什么是消息队列

消息队列(Message Queue)简称为 MQ,是一种可以应用于分布式系统中的中间件。

消息是信息的载体,在软件系统中,消息就是服务与服务之间传递的数据;而队列则是一种先进先出(FIFO)的数据结构。我们将负责发送消息的一方称为生产者(Producer),而负责消费处理消息的一方称为消费者(Consumer)。由于队列这种数据结构的特性,消息也会按照先进先出的顺序被消费者依次消费。

消息队列有什么作用?

接下来,我们就从一个外卖点餐系统入手,学习一下为什么要使用消息队列,以及消息队列究竟有什么作用。

该外卖点餐系统由以下四种微服务构成:

  • 订单微服务
  • 商家微服务
  • 骑手微服务
  • 结算微服务

其业务流程图如下所示:

如我们所见,该系统各种微服务之间为同步调用,同步直接调用会带来以下几种问题:

  • 业务调用链过长,用户需要长时间等待
  • 如果部分组件出现故障,则会使整个业务发生瘫痪
  • 业务高峰期没有缓冲,假设下单高峰,订单,商家,骑手微服务处理的速度比结算微服务快很多,那么结算微服务就会堆积大量待处理的请求,从而致使系统发生瘫痪

而这个时候,我们就可以使用消息队列来解决这些问题。

来看一下使用消息队列后的业务流程图:

首先,我们的系统从同步调用改进为异步调用,图中虚线隔开的部分表示为异步调用。从流程图中可以直观地看出,由于异步,用户不再需要长时间的等待了,在用户下订单后,异步线程 a 会立刻响应用户订单处理中,异步线程 b 则会开始执行业务逻辑。

在使用消息队列后,四种微服务会调用消息中间件来进行消息的收发,而不再与彼此相互耦合:

这样做的好处是,如果某一个微服务宕机,那么消息还是会保存在消息队列中,等到该微服务重启后,仍然能继续处理消息。

并且,业务高峰期没有缓冲这一问题也可以得到解决。业务高峰期产生的消息会被存储到消息队列中,然后我们根据各个微服务的能力,慢慢去消费这些消息,这样就可以避免某个微服务因堆积了大量请求而瘫痪的问题。

通过这样一个示例,我们可以总结出,为什么要使用消息队列,或者说消息队列的作用是什么?

用简单的三句话来概括:

  1. 异步处理
  2. 系统解耦
  3. 削峰限流

那么,使用消息队列会带来哪些问题呢?

  1. 系统可用性降低
  2. 系统复杂性变高
  3. 一致性问题

如果 MQ 挂掉,就会导致系统发生崩溃,所以系统的可用性降低了;我们引入了 MQ 以后,需要考虑消息是否有被重复消费、丢失等问题,所以提升了系统的复杂性;MQ 可以实现异步调用,异步调用虽然提升了系统的响应速度,但是也会带来一致性问题,假如生产者发送消息后,由于某些问题导致业务逻辑执行出错,从而发生事务回滚,消费者没有正确地消费消息,这便会产生一致性问题。

主流的消息中间件有哪些?

当前主流的消息中间件有:ActiveMQ、RabbitMQ、RocketMQ、Kafka、Apollo 等,不过应用最为广泛的还是 RabbitMQ、RocketMQ 以及 Kafka。这三种消息中间件也是各有各的优势:

RabbitMQ

优点:

  • 基于 Erlang 开发,支持高并发
  • 高可靠性,支持发送确认,投递确认等特性
  • 高可用性,支持镜像队列
  • ...

缺点:

  • Erlang 语言较为小众,不利于二次开发
  • 代理架构下,中央节点增加了延迟,影响性能
  • 使用 AMQP 协议,使用起来有一定学习成本
  • ...

RocketMQ

优点:

  • 基于 Java,方便二次开发
  • 单机吞吐量为十万级,比 RabbitMQ 还高出一个量级
  • 高可用性,采用分布式架构
  • 高可靠性,经过参数优化配置,消息可做到 0 丢失
  • ...

缺点:

  • 支持的客户端语言不多,较为成熟的有 Java,C++ 以及 Go
  • 没有 Web 管理界面,仅提供一个 CLI
  • ...

Kafka

优点:

  • 原生的分布式系统
  • 使用零拷贝技术,吞吐量高
  • 快速持久化;可以在 O(1) 的系统开销下进行消息持久化
  • 支持数据批量发送和拉取
  • ...

缺点:

  • 单机超过 64 个队列/分区时,性能会出现明显劣化
  • 使用短轮询方式,实时性取决于轮询间隔时间
  • 消费失败不支持重试
  • ...

那么该如何选择这些消息中间件呢?

Kafka 一般用于追求高吞吐量的业务, 它非常适合配合大数据类的系统进行实时数据计算,日志采集的场景;RocketMQ 则可用于追求高可靠性的场景中,譬如电商领域,在双十一这种会发生大量交易涌入,需要进行业务的削峰限流,并且对可靠性要求很高时,我们可以选择使用 RocketMQ;RabbitMQ 比起 Kafka 与 RocketMQ 可以说是“麻雀虽小,五脏俱全”,它比较适合数据量没有那么大,但要求功能完备的公司,虽然 RabbitMQ 基于 erlang 开发,但是 RabbitMQ 的社区活跃度很高,更新维护速度也快。

我们的 Java 面试八股文之消息队列篇便会以 RabbitMQ 这个中间件为主,进行面试题的分析与讲解~

2、什么是 AMQP 协议?请描述 Direct,Fanout,Topic 三种 Exchange 的区别?


什么是 AMQP 协议?

AMQP(Advanced Message Queuing Protocol) 是一套为面向消息中间件而设计的协议,基于此协议,客户端和消息中间件之间可以进行通讯。

我们熟知的消息中间件:RabbitMQ 则是 AMQP 协议的一个实现;而 AMQP 协议则规定了 RabbitMQ 对外的接口规范。

AMQP 模型图如下所示:

关于模型图中的名词解释:

1. Publisher 与 Consumer

Publisher,也可以叫做 Producer;它是消息的生产者,也是消息的发送者。

Consumer 则是消息的消费者。

Publisher 会将消息发送给交换机(Exchange),交换机会根据消息的路由键(Routing Key)以及交换机与队列的绑定关系(Binding)将消息路由转发到相应的队列(Queue)中,并由消费者(Counsumer)进行消费。

2. Routing Key

Routing Key 即路由键,生产者将消息发送给交换机时,会指定一个 Routing Key,Routing Key 决定消息去往哪里。通俗解释的话,如果我们将 Publisher 形象地比作快递发件方,消息比作快递,那么 Routing Key 就相当于快递邮寄的地址。

3. Message Broker 与 Virtual Host

Message Broker 是用于接收和分发消息的应用;譬如,RabbitMQ 就是一个 Message Broker。

Virtual Host 则是虚拟的 Broker,用于将内部多个单元划分隔开。

4. Connection 与 Channel

Connection 是 Publisher/Consumer 与 Broker 之间的 TCP 连接。

Channel 是 Connection 内部建立的逻辑连接,通常每个线程创建一个单独的 Channel。

5. Exchange

Exchange 即:交换机;它是 AMQP 协议中最为重要的组件,并承担着最核心的功能——路由转发;如果沿用上面的例子来说明的话,Exchange 则相当于快递的分拨中心。Publisher 将消息发送至 Exchange 交换机,Exchange 交换机通过 Routing Key 和 Queue 与 Exchange 的绑定关系(Binding),将消息路由到对应的队列中。

6. Binding

Binding 是 Exchange 与 Queue 之间的虚拟连接,是消息分发的依据。在绑定多个 Queue 到同一个 Exchange 时,这些 Binding 允许使用相同的 Binding Key。

7. Queue

Queue 即队列,消息最终会被消费者从队列中取走并消费。

请描述 Direct,Fanout,Topic 三种 Exchange 模式的区别?

Exchange 是 AMQP 协议与 RabbitMQ 的核心组件,我们在上文提到了 Exchange 的功能就是根据 Routing Key 与绑定关系(Binding)将消息路由发送至相应的队列中。

最常用的三种 Exchange 模式为:

  • Direct
  • Fanout
  • Topic

接下来,我们就一起看一下这三种 Exchange 的区别。

1. Direct Exchange

当消息携带的 Routing Key 和交换机与队列的 Binding Key 一致时,Direct Exchange 则将消息分发到对应的队列中。

如上面的动图所示:Exchange 的类型为 Direct,生产者发送的消息携带的 Routing Key 为 “orange”,消息只会流向 Binding Key 同样为 “orange” 的队列 “q1”。

题外话 🌺:

我使用的工具为 RabbitMQ Simulator,它是一个可以模拟 RabbitMQ 消息队列发送消息的在线工具。RabbitMQ Simulator 可以构建出消息发送拓扑图,进而帮助我们迅速理解不同的 Exchange 模型,以及整个 AMQP 模型。

链接 🔗:http://tryrabbitmq.com/

同时也向大家强烈推荐一个视频转 GIF 的在线免费工具 ezgif。

链接 🔗:https://ezgif.com/

2. Fanout Exchange

fanout 翻译为扇形展开,顾名思义,Fanout Exchange 可以和任意的多个队列绑定起来,无论绑定在消息上的 Routing Key 是什么,当消息发送至 Fanout Exchange 时,该消息则会被拷贝并路由到所有绑定到该交换机上的队列中,这一种方式也就是我们俗称的广播方式,非常容易理解。

3. Topic Exchange

Topic Exchange 是应用最为灵活的一种 Exchange 模式,它会根据 Routing Key 以及通配规则,将消息路由发送到匹配的 Queue 中。

通配规则为:

  1. 全匹配,全匹配和 Direct Exchange 相同
  2. Binding Key 中,# 表示可以匹配任意个数的单词
  3. Binding Key 中,* 表示可以匹配任意一个单词

譬如下面的动图所示,如果一条消息携带的 Routing Key 为 kim.orange.fly,那么这条消息将被发送到 “q1” 队列中,因为 “q1” 的 Binding Key 为 *.orange.*,根据通配规则,“q1” 的 Binding Key 与 Routing Key 匹配;如果一条消息携带的 Routing Key 为 lazy.kim.boy,那么这条消息将被发送到 “q3” 队列中,因为 “q3” 的 Binding Key 为 lazy.#,根据通配规则,“q3” 的 Binding Key 与 Routing Key 匹配。

3、RabbitMQ 如何保证消息的可靠性?


RabbitMQ 该如何保证一条消息的可靠性呢?

有以下几点:

  • AMQP 事务与发送端的 Confirm 机制保证了生产者的消息是否有成功发送到 RabbitMQ 服务器
  • RabbitMQ 的消息返回机制保证了消息是否可以被正确路由
  • 消费端的手动 Confirm 机制保证消息是否从 RabbitMQ 队列成功发送至消费端,并被消费端消费
  • RabbitMQ 的消费端限流机制限制了消息推送速度,保证了消息接收端服务的稳定
  • 消息 TTL 机制保证了 RabbitMQ 服务器不会有大量消息堆积而导致其崩溃
  • 死信队列保证了被 RabbitMQ 丢弃的消息可以被收集,以提供运维人员分析

AMQP 事务与发送端的 Confirm 机制

对于消息的发送方,即生产者来说,需要知道一条消息在被发送后,是否有正确到达 Broker 代理服务器,RabbitMQ 有两种方式可以解决这一问题:

  1. AMQP 事务
  2. 发送端的 Confirm 机制

1. AMQP 事务

AMQP 协议自身提供了一种保证消息投递成功的事务模式,通过信道 Channel,我们可以开启事务,提交事务,当发生异常时,可以回滚事务:

  • channel.txSelect():开启事务
  • channel.txCommit():提交事务
  • channel.txRollback():回滚事务

来看下示例代码:

@Slf4j
public class RabbitTx {

    public static final String QUEUE = "queue.test";

    public static final String ROUTING_KEY = "key.test";

    public static final String EXCHANGE = "exchange.test";

    public static void sendMsg() throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 声明 Exchange
        channel.exchangeDeclare(
                EXCHANGE, BuiltinExchangeType.DIRECT,
                true,
                false,
                null
        );

        // 声明 Queue
        channel.queueDeclare(
                QUEUE,
                true,
                false,
                false,
                null
        );
        // Queue 与 Exchange 绑定
        channel.queueBind(
                QUEUE,
                EXCHANGE,
                ROUTING_KEY
        );

        try {
            // 开启事务
            channel.txSelect();
            // 发送消息
            String message = "hello world";
            channel.basicPublish(
                    EXCHANGE,
                    ROUTING_KEY,
                    null,
                    message.getBytes());
            // 提交事务
            channel.txCommit();

        } catch (Exception e) {
            log.error(e.getMessage());
            // 发生异常,回滚
            channel.txRollback();
        } finally {
            channel.close();
            connection.close();
        }
    }

    public static void main(String[] args) throws Exception {
        sendMsg();
    }
}

运行程序,使用 WireShark 抓包的结果如下所示:

通过结果显示,我们在发送消息的前后多出了开启事务与提交事务的步骤。

假如在消息发送之后,事务提交之前,程序发生了异常导致事务回滚,队列还能收到消息么?

目前队列 queue.test 中没有任何消息。

我们在 channel.basicPublish()channel.txCommit() 之间写了一段会抛出异常的代码

try {
    // 开启事务
    channel.txSelect();
    // 发送消息
    String message = "hello world";
    channel.basicPublish(
            EXCHANGE,
            ROUTING_KEY,
            null,
            message.getBytes());
    // 抛出异常
    Integer.valueOf("abc");
    // 提交事务
    channel.txCommit();
            
catch (Exception e) {
    log.error(e.getMessage());
    // 发生异常,回滚
    channel.txRollback();
finally {
    channel.close();
    connection.close();
}

运行程序,使用 WireShark 抓包的结果如下所示:

结果显示,我们在发送消息后由于程序抛出异常,所以消息并没有被提交,而是发生了事务回滚,此时队列 queue.test 中仍然显示收到的消息数为 0:

通过上面的例子,我们知道,AMQP 事务模型可以解决判断生产者的消息是否有成功发送到 Broker 的问题。事务提交成功则意味着消息被 Broker 成功接收;而一旦发生事务回滚,则意味着消息发送至 Broker 失败,进而,发送方可以做出相应的处理措施,对消息进行重发。

不过 AMQP 事务的性能非常差,这种方式我们需要了解,但并不推荐使用。

2. 发送端的 Confirm 机制

发送端有三种 Confirm 机制,来确认消息是否成功发送到 RabbitMQ,这三种确认机制为:

  • 单条同步确认
  • 多条同步确认
  • 异步确认

2.1 单条同步确认

单条同步确认模式为:消息发送端每发送一条消息至 RabbitMQ 成功,服务端就会回传给发送端一条同步确认(ACK),如果服务端超时未返回则说明消息发送失败。

关键代码为:

  • channel.confirmSelect():开启 Confirm
  • channel.waitForConfirms():等待服务端返回 ACK

来看下示例代码:

@Slf4j
public class SingleConfirm {

    public static final String QUEUE = "queue.test";

    public static final String ROUTING_KEY = "key.test";

    public static final String EXCHANGE = "exchange.test";

    public static void sendMsg() {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明 Exchange
            channel.exchangeDeclare(
                    EXCHANGE, BuiltinExchangeType.DIRECT,
                    true,
                    false,
                    null
            );

            // 声明 Queue
            channel.queueDeclare(
                    QUEUE,
                    true,
                    false,
                    false,
                    null
            );
            // Queue 与 Exchange 绑定
            channel.queueBind(
                    QUEUE,
                    EXCHANGE,
                    ROUTING_KEY
            );
            // 开启 Confirm
            channel.confirmSelect();
            for (int i = 0; i < 3; i++) {
                String message = "第" + i + "条消息";
                channel.basicPublish(
                        EXCHANGE,
                        ROUTING_KEY,
                        null,
                        message.getBytes());
                // 等待同步确认
                if (channel.waitForConfirms()) {
                    log.info("消息发送成功");
                } else {
                    log.error("消息发送失败");
                    // TODO message resend
                }
            }
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

    public static void main(String[] args) {
        sendMsg();
    }
}

代码十分简单,程序内,我们循环三次,让生产者发消息,每发一条消息都等待 Broker 的同步确认。

运行程序,使用 WireShark 抓包的结果如下所示:

我们可以看到,生产者每发送一条消息,都会收到 Broker 回传的同步确认(Basic.Ack),如果没有收到,则意味着消息发送失败,生产者便不会继续发送下一条消息。

2.2 多条同步确认

多条同步确认模式与单条同步确认模式的不同点在于:单条同步确认是生产者每发一条消息,就要确认一次,收到 ACK 后再发送下一条;而多条同步确认模式是批量发送消息,然后再进行确认。其使用方法与单条同步确认无任何区别。

这样做的优点是,多条同步确认相比于单条同步确认提升了处理消息的效率;缺点也很明显,一旦 channel.waitForConfirms() 方法返回了 false,那么生产者就需要将这一批次的消息全部进行重发,不仅效率没有提升,反而降低了系统的性能。

来看下示例代码:

@Slf4j
public class BatchConfirm {

    public static final String QUEUE = "queue.test";

    public static final String ROUTING_KEY = "key.test";

    public static final String EXCHANGE = "exchange.test";

    public static void sendMsg() {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明 Exchange
            channel.exchangeDeclare(
                    EXCHANGE, BuiltinExchangeType.DIRECT,
                    true,
                    false,
                    null
            );

            // 声明 Queue
            channel.queueDeclare(
                    QUEUE,
                    true,
                    false,
                    false,
                    null
            );
            // Queue 与 Exchange 绑定
            channel.queueBind(
                    QUEUE,
                    EXCHANGE,
                    ROUTING_KEY
            );
            // 开启 Confirm
            channel.confirmSelect();
            for (int i = 0; i < 3; i++) {
                String message = "第" + i + "条消息";
                channel.basicPublish(
                        EXCHANGE,
                        ROUTING_KEY,
                        null,
                        message.getBytes());
            }
            // 等待同步确认
            if (channel.waitForConfirms()) {
                log.info("消息发送成功");
            } else {
                log.error("消息发送失败");
                // TODO message resend
            }
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

    public static void main(String[] args) {
        sendMsg();
    }
}

运行程序,使用 WireShark 抓包的结果如下所示:

可以看到,生产者连续发送了三条消息,待消息全部发送完毕才会收到 Broker 回传的同步确认(Basic.Ack)。

2.3 异步确认

异步确认模式与同步确认的不同点在于,异步确认模式的发消息与确认是相互独立的事件,对于同步确认来说,无论是单条同步还是多条同步,消息发送者都需要发送消息,并等待确认后,才能发送下一条或下一批次的消息;而异步就不同了,发送消息与确认消息两个事件完全分离,发送消息的线程只管发消息,而异步线程则负责确认,判断消息发送成功还是失败。

来看下示例代码:

@Slf4j
public class AsyncConfirm {
    public static final String QUEUE = "queue.test";

    public static final String ROUTING_KEY = "key.test";

    public static final String EXCHANGE = "exchange.test";

    public static void sendMsg() {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明 Exchange
            channel.exchangeDeclare(
                    EXCHANGE, BuiltinExchangeType.DIRECT,
                    true,
                    false,
                    null
            );

            // 声明 Queue
            channel.queueDeclare(
                    QUEUE,
                    true,
                    false,
                    false,
                    null
            );
            // Queue 与 Exchange 绑定
            channel.queueBind(
                    QUEUE,
                    EXCHANGE,
                    ROUTING_KEY
            );
            
            // 开启 Confirm
            channel.confirmSelect();

            ConfirmListener confirmListener = new ConfirmListener() {

                // 确认成功,将调用 handleAck
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    // deliveryTag 为发送端的消息序号
                    // multiple 为 true 时,说明确认的是多条消息;为 false 时,说明确认的是单条消息
                    log.info("Ack, deliveryTag:{},multiple:{}", deliveryTag, multiple);
                }

                // 确认失败,将调用 handleNack
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    log.info("Nack, deliveryTag:{},multiple:{}", deliveryTag, multiple);
                }
            };

            channel.addConfirmListener(confirmListener);

            for (int i = 0; i < 10; i++) {
                String message = "第" + i + "条消息";
                channel.basicPublish(
                        EXCHANGE,
                        ROUTING_KEY,
                        null,
                        message.getBytes());
            }

            Thread.sleep(10000);

        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

    public static void main(String[] args) {
        sendMsg();
    }
}

执行程序,输出结果为:

[AMQP Connection 127.0.0.1:5672] INFO producter.AsyncConfirm - Ack, deliveryTag:5,multiple:true
[AMQP Connection 127.0.0.1:5672] INFO producter.AsyncConfirm - Ack, deliveryTag:9,multiple:true
[AMQP Connection 127.0.0.1:5672] INFO producter.AsyncConfirm - Ack, deliveryTag:10,multiple:false

从程序的输出结果来看,我们可以知道,生产者将这十条消息分成三个批次发出且成功收到了三次确认,前两个批次为一次性发送多条消息,最后一个批次单独发送了一条消息(multiple 为 fasle)。

使用 WireShark 抓包的结果如下所示:

看过代码后,我们先来思考下异步确认模式的缺点。

假如消息发送失败,要如何处理?我们知道发送消息和消息确认是两个不同的线程,假如某一条消息发送失败,那么在消息确认的异步线程中的 handleNack() 方法里,我们就要对发送失败的消息进行重发。可是,异步线程如何才能知道发送失败的消息的具体内容是什么呢?

这里面我们就需要采用一些可以让线程之间进行通信的方法,譬如发送消息的时候,我们可以将消息的 deliveryTag 和消息体存入到数据库中,这样在负责消息确认的异步线程中,我们就可以从数据库里查到这条消息的具体内容了。

不过,这样的操作无疑增加了系统的复杂性,而这便是异步确认模式不太常用的原因,也是它的缺点所在。

异步确认模式的优点就不用多说了,首先它的性能必然要比单条同步确认要好,也是远远高于 AMQP 事务的,其次它对失败消息的重发效率也比多条同步确认高。

3. 如何保证消息发送成功

通过上文的讲解,我们已经知道这个问题的答案了,如何保证生产者的消息可以到达 RabbitMQ 服务器或者说是 RabbitMQ Broker 呢?

有两种方法:

  • 第一种是通过 AMQP 事务
  • 第二种是通过 RabbitMQ 消息发送端的 Confirm 模式(单条,多条,异步)

推荐使用单条同步确认,虽然它的性能不是最优的,但是其原理简单,不易出错。

消息返回机制

当一条消息发送到 Broker 后,Exchange 交换机会根据这条消息携带的 Routing Key,和 Queue 与 Exchange 的绑定关系(Binding),将消息路由到正确的队列中。如果这条消息的 Routing Key 不能匹配任何与该 Exchange 绑定的队列的 Binding Key,将无法去往任何一个队列,我们则称这条消息为一条不可达的消息。

消息返回机制正是一种维系不可达的消息与生产者关系的保障策略,通俗解释的话:消息返回机制就是一种监听机制,它监听生产者发送到 Broker 的消息是否可达,如果消息不可达,就会返回一个信号通知消息发送端,消息发送端便可以做出相应的处理;反之,如果消息被正确路由,则不会返回任何信号。

开启消息返回机制的关键代码:

// 开启监听
// replyCode:类似于 HTTP 返回码,用于表示消息路由结果的字码
// replyText:返回信息
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
    log.info("Message Return");
    // TODO:说明消息不可达,可执行对应的操作,譬如告警,发邮件通知运维人员等等
});
// 发送消息
channel.basicPublish(
    "exchange",
    "routingKey",
    true,
    null,
    message.getBytes()
);

首先,我们要为 channel 添加一个 ReturnListener,即:消息返回监听器,当消息不可达时,程序便会回调监听器的 handleReturn 方法(异步回调),在该方法中,我们可以获取到消息路由结果的字码,返回信息,Exchange 名称,Routing Key 等内容;然后我们需要在 basicPublish 方法中,将 Mandtory 对应的参数项设置为 true,Mandatory 设置为 false 时,RabbitMQ 将直接丢弃无法被路由的不可达消息;而 Mandatory 设置为 true 时,RabbitMQ 便会处理不可达的消息。

示例代码:

@Slf4j
public class ReturnListenerTest {

    public static final String QUEUE = "queue.test";

    public static final String ROUTING_KEY = "key.test";

    public static final String WRONG_ROUTING_KEY = "key.wrong";

    public static final String EXCHANGE = "exchange.test";

    public static void sendMsg() {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明 Exchange
            channel.exchangeDeclare(
                    EXCHANGE, BuiltinExchangeType.DIRECT,
                    true,
                    false,
                    null
            );

            // 声明 Queue
            channel.queueDeclare(
                    QUEUE,
                    true,
                    false,
                    false,
                    null
            );
            // Queue 与 Exchange 绑定
            channel.queueBind(
                    QUEUE,
                    EXCHANGE,
                    ROUTING_KEY
            );

            String message = "send message";

            // 开启监听
            // replyCode:类似于 HTTP 返回码,用于表示消息路由结果的字码
            // replyText:返回信息
            channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
                log.info("-------- Message Return --------");
                log.info("replyCode:{}", replyCode);
                log.info("replyText:{}", replyText);
                log.info("exchange:{},routingKey:{}", exchange, routingKey);
                // TODO:说明消息不可达
            });

            // 发送消息
            channel.basicPublish(
                    EXCHANGE,
                    WRONG_ROUTING_KEY,
                    true,
                    null,
                    message.getBytes());

            Thread.sleep(10000);


        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

    public static void main(String[] args) {
        sendMsg();
    }
}

在示例程序中,消息携带的 Routing Key 为我们定义的 WRONG_ROUTING_KEY,Exchange 与队列绑定的 Binding Key 为我们定义的 ROUTING_KEY,同时我们使用的 Exchange 模式为 Direct,所以生产者发送的消息将无法被路由。运行程序,输出结果为:

[AMQP Connection 127.0.0.1:5672] INFO producter.ReturnListenerTest - -------- Message Return --------
[AMQP Connection 127.0.0.1:5672] INFO producter.ReturnListenerTest - replyCode:312
[AMQP Connection 127.0.0.1:5672] INFO producter.ReturnListenerTest - replyText:NO_ROUTE
[AMQP Connection 127.0.0.1:5672] INFO producter.ReturnListenerTest - exchange:exchange.test,routingKey:key.wrong

RabbitMQ 的消息返回机制确保了消息可以被正确路由,如果出现路由异常,消息不会丢失。

消费端的确认机制

通过上文,我们知道了,发送端的 Confirm 机制保证了消息是否从发送端成功发送到 RabbitMQ,那么我们自然也会想到,对于 RabbitMQ 与消费端之间,也必然存在一种机制,来确保消息是否从 RabbitMQ 队列成功发送至消费端,并被消费端消费。而这种机制就是消费端的 Confirm 机制。

默认情况下,消费端接收消息时,会自动回传给 RabbitMQ 一条 ACK,来通知 RabbitMQ,“我已经收到你的消息了”,在代码中的体现就是将消费者的 basicConsume 方法中的 autoAck 设置为 true:

channel.basicConsume(QUEUE, true, deliverCallback, consumerTag -> {
});

DeliverCallback deliverCallback = ((consumerTag, message) -> {
    // do somthing ...
});

不过,将 autoAck 设置为 true 的这种做法却是不被推荐的。

1. 将 autoAck 设置为 true 会有什么问题?

我们要知道的是,RabbitMQ 在收到消费端发来的 ACK 后,会将消息从内存中移除。

那么试想这样一种情况,RabbitMQ 队列收到了消费端发送的 ACK 后,将消息从队列中移除,而在此时,消费端发生宕机,因为宕机,消费端没有正确处理这条消息,这样便会引起消息的丢失!

取而代之的做法就是,使用手动 ACK。

2. 消费端的手动确认机制

为了避免自动 ACK 带来的消息丢失问题,我们可以使用消费端的手动 ACK 机制。

当我们设置了手动 ACK 后, 消费端收到消息后不会自动“签收”,而是在我们的业务代码中显示地去进行“签收”;RabbitMQ 会等待消费者显式地回传确认信号,这样做的好处是,消费者将会有足够的时间去处理消息,等到消费者把这条消息“真正地”消费后,才去回传 ACK。大家可以去体会下“真正地”消费这句话的意思,为什么自动 ACK,不代表消费者真正地消费了消息呢?其原因在于消费端的 Confirm 机制本身就是异步的,有可能 RabbitMQ 已经收到了 ACK,但是消费端的业务逻辑还没有处理完毕。

在代码中的体现则是将消费者的 basicConsume 方法中的 autoAck 设置为 false;并且我们要在 deliverCallback 中,指定消费者的操作:

channel.basicConsume(QUEUE, false, deliverCallback, consumerTag -> {
});

DeliverCallback deliverCallback = ((consumerTag, message) -> {
    // do somthing ...
    channel.basicAck(tag,multiple);
});

消费者可以进行的四种基本操作有:

  • basicAck
  • basicRecover
  • basicReject
  • basicNack

basicAck 为确认消息,方法中的第二个参数 multipletrue 时,表示批量确认操作,为 false 时,表示对单个消息进行确认操作;basicRecover 表示是否使消息重新恢复至队列中,true 表示重新发回到队列,并尽可能地将之前 recover 的消息投递给其他的消费者,false 则会使消息重新投递给自己;basicReject 为拒收消息,方法中的第二个参数 requeuetrue 时,表示是否将拒收的消息重回队列,为 false 时,表示丢弃或将消息发送至死信队列;basicNack 为批量拒绝,方法参数中的 multiple 设置为 true 时,可以对消息进行批量拒绝。

接下来,我们来看一个示例程序:

@Slf4j
public class ConsumerConfirm {

    public static final String QUEUE = "queue.test";

    public static final String ROUTING_KEY = "key.test";

    public static final String EXCHANGE = "exchange.test";

    public Channel channel;

    public static void main(String[] args) {
        ConsumerConfirm test = new ConsumerConfirm();
        test.doTest();
    }

    public void doTest() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {

            this.channel = channel;

            // 声明 Exchange
            channel.exchangeDeclare(
                    EXCHANGE, BuiltinExchangeType.DIRECT,
                    true,
                    false,
                    null
            );

            // 声明 Queue
            channel.queueDeclare(
                    QUEUE,
                    true,
                    false,
                    false,
                    null
            );
            // Queue 与 Exchange 绑定
            channel.queueBind(
                    QUEUE,
                    EXCHANGE,
                    ROUTING_KEY
            );

            String message = "a message";

            // 生产者发送消息
            log.info("----- send message -----");
            log.info("message : {}", message);

            channel.basicPublish(
                    EXCHANGE,
                    ROUTING_KEY,
                    null,
                    message.getBytes());

            // 消费者消费消息
            channel.basicConsume(QUEUE, false, deliverCallback, consumerTag -> {
            });

            Thread.sleep(10000);

        } catch (Exception e) {
            log.error("error message : {}", e.getMessage());
        }
    }

    DeliverCallback deliverCallback = ((consumerTag, message) -> {
        try {
            String msgBody = new String(message.getBody());

            log.info("----- receive message -----");
            log.info("message : {}", msgBody);
            // TODO: 实现消费消息的业务逻辑
            // 消费端手动确认
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    });
}

channel.basicPublish()channel.basicAck() 这两处打上断点;这一次,我们将会使用 DEBUG 模式来运行程序,进行分析。

运行程序之前,我们的队列为空。

Ready 表示待消费的消息数量,即:还未发送给消费者的消息数量;Unacked 表示待应答的消息数量,即:消息已经发送给了消费者,但消费者还未返回 ACK;Total 则是二者的加和。

当程序执行完channel.basicPublish()方法后:

我们看到队列中已经存在了一条待处理的消息。

程序执行到 channel.basicAck() 方法前时:

我们看到,消息已经发送给消费者了,但是该消息处于一个等待消费者确认的状态。

执行完 channel.basicAck() 方法后,队列中就没有待处理的消息了。

通过本小节的学习,想必大家对消费端的确认机制已经有了一个良好的认识。示例中,我只给出了 basicAck 的代码及断点流程分析,大家可以自行尝试,体会一下 basicRejectbasicRecoverbasicNack 这些操作分别都是怎样的~

消费端限流,TTL,死信队列

1. 消费端限流机制

为什么要对消费端做限流处理?

试想这样一种场景:在业务高峰期时,由于消息发送端与消息接收端性能的不一致(发送端推送消息的速度远大于消费端处理消息的速度),导致大量的消息被一次性推送给消息接收端,从而造成消息接收端服务发生崩溃。消息接收端服务宕机下线后,期间消息队列积压了大量的消息。那么当这个微服务重新上线后,又一次性收到了大量的消息,导致继续崩溃...

针对以上问题,RabbitMQ 提供了一种 QoS(Quality of Service)即:服务质量保障功能。它的原理是在非自动确认消息开启的前提下,当消费端有一定数量的消息未被 ACK 确认时,RabbitMQ 将不会给消费端推送新的消息。

我们在消费端的代码中只需开启非自动确认,并使用 channel.basicQos() 方法即可开启 QoS 功能。

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
void basicQos(int prefetchCount, boolean global) throws IOException;
void basicQos(int prefetchCount) throws IOException;

basicQos 有三种重载方法,这些参数的含义为:

  • prefetchCount:表示针对一个消费端最多可以推送多少未 ACK 确认消息
  • global:当其值设置为true时,会针对整个消费端进行限流,当值为false时,仅针对当前的 channel 进行限流
  • prefetchSize:单条消息大小限制,一般设置为 0,代表不限制

需要注意的是:带有 prefetchSizeglobal 这两个参数项的方法, RabbitMQ 暂时没有实现(仅在 AMQP 协议中定义了接口)。

接下来,我们来看一个示例程序:

@Slf4j
public class QoS {

    public static final String QUEUE = "queue.qos.test";

    public static final String ROUTING_KEY = "key.qos.test";

    public static final String EXCHANGE = "exchange.qos.test";

    public Channel channel;

    public static void main(String[] args) {
        QoS test = new QoS();
        test.doTest();
    }

    public void doTest() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {

            this.channel = channel;

            // 声明 Exchange
            channel.exchangeDeclare(
                    EXCHANGE, BuiltinExchangeType.DIRECT,
                    true,
                    false,
                    null
            );

            // 声明 Queue
            channel.queueDeclare(
                    QUEUE,
                    true,
                    false,
                    false,
                    null
            );
            // Queue 与 Exchange 绑定
            channel.queueBind(
                    QUEUE,
                    EXCHANGE,
                    ROUTING_KEY
            );

            String message = "a message";

            // 生产者发送消息
            log.info("----- send message -----");
            for (int i = 0; i < 10; i++) {
                channel.basicPublish(
                        EXCHANGE,
                        ROUTING_KEY,
                        null,
                        message.getBytes());
            }
            
            // 消费者消费消息
            channel.basicConsume(QUEUE, false, deliverCallback, consumerTag -> {
            });

            Thread.sleep(100000);

        } catch (Exception e) {
            log.error("error message : {}", e.getMessage());
        }
    }

    DeliverCallback deliverCallback = ((consumerTag, message) -> {
        try {
            String msgBody = new String(message.getBody());

            log.info("----- receive message -----");
            log.info("message : {}", msgBody);
            Thread.sleep(5000);
            // 消费端手动确认
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    });
}

为了模拟消费者消费消息的速度远远落后于生产者推送消息的速度,我在 deliverCallback 中,使用了 Thread.sleep(5000)

在未开启消费端限流 QoS 功能时,运行程序:

在监控台中,我们可以看到,生产者发送到队列的十条消息,已经全部由 RabbitMQ 一次性地发送给消费端了(Unacked 表示待应答的消息数量,即:消息已经发送给了消费者,但消费者还未返回 ACK)。

我们在代码中添加 QoS 功能开启:

// ... ...
// 消费端限流机制 QoS
channel.basicQos(2);
// 消费者消费消息
channel.basicConsume(QUEUE, false, deliverCallback, consumerTag -> {
});
// ... ...

basicQos(2) 表示消费端最多可以处理 2 条未 ACK 确认的消息。

重新运行程序:

我们看到,正如我们所想,消费者最多可以处理 2 条消息,而其余的消息仍然在队列中未发送给消费者(Ready 表示待消费的消息数量,即:还未发送给消费者的消息数量)。

QoS 的重要意义不仅仅在于可以保护消费端,同时它也有利于消费端的横向扩展,使得消费端可以分布式处理堆积的消息。

假如我们没有设置 QoS 功能,仅启动了一个消费者,那么堆积的消息仍然已经全部发送给了这个消费者(消息状态为 Unacked)。就算我想启动更多的消费者去处理这些堆积的消息也无济于事。

反之,如果我们开启了 QoS 功能,堆积的消息仍然在队列中处于 Ready 的状态,这时,我们就可以横向扩展更多的消费者帮忙处理这些消息了~

2. 消息过期机制

在默认的情况下,当消息进入到队列后,会永久存在,直到被消费。但是这样一来,就会造成 RabbitMQ 产生大量的消息堆积,这给 RabbitMQ 自身造成了很大的压力。为了解决这个问题,RabbitMQ 支持对发送的消息设置过期时间(我们简称:消息 TTL),以及对整个队列设置消息的过期时间(我们简称:队列 TTL)。TTL 的机制是当消息过期时,如果有设置死信队列,那么这条消息会被转发到专门接收死信的队列中,如果没有设置死信队列,那么消息会被队列永久移除。

2.1 消息 TTL

用法:

// 设置单条消息 TTL 为 1 min
AMQP.BasicProperties properties = new AMQP.BasicProperties()
        .builder()
        .expiration("60000")
        .build();


channel.basicPublish(
        EXCHANGE,
        ROUNTING_KEY,
        properties,
        message.getBytes()
);

如代码所见,我们只需要在 channel.basicPublish 方法中,对 properties 参数设置 expiration 属性即可,单位为毫秒,示例代码中,我们为发送的消息设置的 TTL 为 1 分钟。

2.2 队列 TTL

用法:

Map<String, Object> args = new HashMap<>(16);
args.put("x-message-ttl"60000);
// x-expire 为队列的存活时间,如果在一定的时间内,队列没有接收到消息,队列会被删除;不要加入这样一个参数
// args.put("x-expire",60000);
// 声明队列 Queue
channel.queueDeclare(
        QUEUE,
        true,
        false,
        false,
        args
);

如代码所见,我们可以在声明队列的 channel.queueDeclare 方法中,设置 args 参数,args 是一个 Map,我们为其加入 x-message-ttl 这个键,对应的值就是规定队列中的消息的 TTL,单位为毫秒。

需要注意的是,args 还有一个 x-expire 的键, 它特别容易和 x-message-ttl 搞混。x-expire 是为队列设置一个存活时间,如果在这个时间内,队列没有接收到任何消息,那么整个队列会自动删除。我们在开发代码中尽量不要直接删除队列,这不是一个值得提倡的做法。

还有一个问题是,如果我们既给消息设置了 TTL,也给队列设置了 TTL,究竟哪个会起作用呢?

答案很简单,自然是按照最短的时间来啦~ 木桶效应,哪个先到时间,哪个起作用。

3. 死信队列

我先来解释一下,什么叫做死信(Dead Message)。

当一条消息在队列中出现以下三种情况时,就会被队列标记成一条死信:

  1. 消息被拒绝(reject/nack),并且 requeue = false
  2. 消息过期
  3. 队列达到了最大长度

当一条消息成为死信后,就会被队列直接丢弃。但是,运维人员并不希望这些消息被直接丢弃,而是希望收集这些消息,找到它们被丢弃的原因。而死信队列便是做这样一件事的~

RabbitMQ 支持为队列配置死信队列,当一条消息在队列中变成死信后,队列不会将其直接删除,而是将这条死信重新推送到死信交换机(Dead-Letter Exchange)上,死信交换机和普通的交换机没有任何区别,只不过它的作用是用来专门处理死信的。死信交换机会将死信投递到与其绑定的队列中,这个队列就是死信队列(Dead-Letter Queue),当然死信队列和普通的队列没有任何区别,只不过它的作用是用来专门接收死信交换机路由传递的死信而已。

死信队列的设置方法:

  1. 设置转发与接收死信的死信交换机和死信队列
    • Exchange:channel.exchangeDeclare(DL-exchangeName)
    • Queue:channel.queueDeclare(DL-queueName)
    • RoutingKey:#;如果我们使用 Topic Exchange,当 RoutingKey 设置为 # 时,死信队列则可以接收任何消息
  2. 在需要设置死信的队列中加入参数
    • x-dead-letter-exchange = DL-exchangeName

示例代码:

@Slf4j
public class TestDeadLetter {

    public static final String QUEUE = "queue.letter.test";

    public static final String ROUTING_KEY = "key.letter.test";

    public static final String EXCHANGE = "exchange.letter.test";

    public static final String DEAD_EXCHANGE = "exchange.dlx";

    public static final String DEAD_QUEUE = "queue.dlx";

    public Channel channel;

    public static void main(String[] args) {
        TestDeadLetter test = new TestDeadLetter();
        test.doTest();
    }

    public void doTest() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {

            this.channel = channel;

            // 声明死信交换机
            channel.exchangeDeclare(
                    DEAD_EXCHANGE,
                    BuiltinExchangeType.TOPIC,
                    true,
                    false,
                    null
            );

            // 声明死信队列
            channel.queueDeclare(
                    DEAD_QUEUE,
                    true,
                    false,
                    false,
                    null
            );

            // 将死心队列和死信交换机进行绑定
            channel.queueBind(
                    DEAD_QUEUE,
                    DEAD_EXCHANGE,
                    "#"
            );

            // 声明 Exchange
            channel.exchangeDeclare(
                    EXCHANGE, BuiltinExchangeType.DIRECT,
                    true,
                    false,
                    null
            );

            Map<String, Object> args = new HashMap<>(16);
            // 设置队列 TTL 为 10 s
            args.put("x-message-ttl"10000);
            // 绑定在死信交换机
            args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            // 声明 Queue
            channel.queueDeclare(
                    QUEUE,
                    true,
                    false,
                    false,
                    args
            );
            // Queue 与 Exchange 绑定
            channel.queueBind(
                    QUEUE,
                    EXCHANGE,
                    ROUTING_KEY
            );

            String message = "a message";

            // 生产者发送消息
            log.info("----- send message -----");

            channel.basicPublish(
                    EXCHANGE,
                    ROUTING_KEY,
                    null,
                    message.getBytes());

            Thread.sleep(15000);

            int messageCountInLetterTestQueue = channel.queueDeclarePassive(QUEUE).getMessageCount();
            log.info("queue.letter.test 队列中的消息数为:{}", messageCountInLetterTestQueue);

            int messageCountInDeadQueue = channel.queueDeclarePassive(DEAD_QUEUE).getMessageCount();
            log.info("queue.dlx 队列中的消息数为:{}", messageCountInDeadQueue);
            
        } catch (Exception e) {
            e.printStackTrace();
            log.error("error message : {}", e.getMessage());
        }
    }
}

在示例程序中,我们设置消息在队列的存活时间为 10 秒,在消息发送后,没有消费者去消费该消息,那么这条消息将会被标记为死信,转发到死信交换机并路由转发到死信队列。

程序运行后,输出如下:

[main] INFO mq.TestDeadLetter - ----- send message -----
[main] INFO mq.TestDeadLetter - queue.letter.test 队列中的消息数为:0
[main] INFO mq.TestDeadLetter - queue.dlx 队列中的消息数为:1

通过管控台,我们也可以看到,死信队列中已经存入了一条死信:

总结

在今日的文章分享中,我向大家简单介绍了 RabbitMQ 的基本概念。

通过这篇文章,你需要知晓这些问题的答案:

  1. 什么是消息队列?消息队列有什么作用?
  2. 什么是 AMQP 协议?Direct,Fanout,Topic 三种 Exchange 有什么区别?
  3. RabbitMQ 如何保证消息的可靠性?
  4. 请介绍一下 RabbitMQ 发送端的 Confirm 机制?
  5. 请介绍一下 RabbitMQ 消息返回机制?
  6. 请介绍一下 RabbitMQ 消费端的确认机制?
  7. 消费端的 basicConsume 方法中,将 autoAck 参数设置为 true 会有什么问题?
  8. 消费端的限流机制有什么作用?
  9. 如何为消息设置过期时间?
  10. 什么是死信队列?
  11. 哪些情况下,一条消息会被标记为死信?

好啦,至此为止,这篇文章就到这里了,Java 面试八股文系列后面依旧会继续更新并查缺补漏,感谢您的阅读与支持~~

欢迎大家关注我的公众号,在这里希望你可以收获更多的知识,我们下一期再见!

分类:

后端

标签:

Java

作者介绍

jrh
V1