jrh

V1

2023/04/17阅读:18主题:丘比特忙

Java 面试八股文之消息队列篇(二)

前言

封面是德国童话故事《The Duration of Life》的插页,它被格林兄弟收集于格林童话故事中,编号为 176。 这篇故事讲述的内容是,上帝在创造了世界后,为所有的生物都赋予了 30 年寿命。驴子觉得自己勤勤恳恳为人类服务,却换来人类对它的拳打脚踢,所以请求上帝让它少活几年,上帝便收回驴子 18 年的寿命;狗害怕衰老,因为衰老会让它咬不动食物,并且无法快乐地奔跑,便请求上帝让它少活几年,上帝便收回狗 12 年的寿命;猴子不想自己总是干着扮鬼脸逗人发笑的事儿,也请求上帝让它少活几年,上帝便收回了猴子 10 年的寿命。最后轮到人类,人类嫌弃 30 年寿命太短,请求上帝多赐予他们些生命。于是,上帝把从驴子那里拿来的 18 年赐给了人类,可是人类还是不满足;上帝又把从狗那里拿来的 12 年赐给人类,人类依旧不满足;最后,上帝将猴子多出来的 10 年也给了人类。这样,人的寿命便延长到了 70岁,起先 30 年是他们的本份,这个阶段人类健康、快乐,高兴地工作,生活也充满了欢乐;接下来是驴子的 18 年,在这个阶段,生活的负担压在肩上,人类虽然会辛勤地劳作,但是他们这种忠实的服务换来的却是别人的嫌弃与谩骂;然后是狗的 12 年,那时人类失去了牙齿,失去了健壮的骨骼,只能躺在床上哀怨;最后,就是猴子的 10 年,人类在最后的 10 年里,变得痴呆,糊里糊涂,成了孩子们捉弄、嘲笑的对象。

这是系列文章【 Java 面试八股文 】消息队列篇的第二期。

【 Java 面试八股文 】系列会陆续更新 Java 面试中的高频问题,旨在从问题出发,带你理解 Java 基础,数据结构与算法,数据库,常用框架等知识点。该系列前几期文章可以通过下方给出的链接进行查看~

往期文章

消息队列篇(二)

1. Spring-AMQP


Spring-AMQP 是 Spring 对 AMQP 协议的封装与扩展,它将 Spring 的核心概念应用于基于 AMQP 的消息传递解决方案中,使得开发者可以通过 Spring-AMQP 更简单方便地完成声明组件(队列,交换机等),收发消息等工作。

Spring-AMQP 是一个抽象层,不依赖于特定的 AMQP Broker 的实现,这样做的好处在于,可以使用户只针对抽象层来进行开发,而不用关心底层具体的实现是什么。

本篇文章内容基于 spring-boot-starter-amqp:2.7.5 创作。

2. RabbitAdmin


RabbitAdmin 的基本使用

RabbitAdmin 是 Spring-AMQP 中的核心组件。顾名思义,RabbitAdmin 是用来连接、配置与管理 RabbitMQ 的,其主要功能包括:

  • declareExchange:创建交换机
  • deleteExchange:删除交换机
  • declareQueue:创建队列
  • deleteQueue:删除队列
  • purgeQueue:清空队列
  • declareBinding:创建绑定关系
  • removeBinding:删除绑定关系
  • ... ...

来看一个 🌰:

Producer

@Service
@Slf4j
public class Producer {

    final String QUEUE = "queue.test";
    final String EXCHANGE = "exchange.test";
    final String ROUTING_KEY = "key.test";


    public void initRabbit() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 声明 Exchange
        Exchange exchange = new DirectExchange(EXCHANGE, falsefalse);
        // 声明 Queue
        Queue queue = new Queue(QUEUE, false);
        // 声明 Binding
        Binding binding = new Binding(
                QUEUE,
                Binding.DestinationType.QUEUE,
                EXCHANGE,
                ROUTING_KEY,
                null
        );
        rabbitAdmin.declareExchange(exchange);
        rabbitAdmin.declareQueue(queue);
        rabbitAdmin.declareBinding(binding);
    }
}

TestController

@RestController
public class TestController {

    @Autowired
    private Producer producer;

    @GetMapping("/test")
    public String test() {
        producer.initRabbit();
        return "success";
    }
}

启动 Spring Boot 项目,调用接口,我们可以在 RabbitMQ 管控台看到 Exchange,Queue,Binding 声明及创建成功:

除了手动调用 RabbitAdmin 方法这种方式以外,我们还可以通过 Spring Boot Config 声明式地完成队列,交换机,绑定关系的创建。

Spring-AMQP 充分地发挥了 Spring Boot 的 Convention Over Configuration ,即:约定优于配置的特性。我们可以通过 Spring Boot Config 将 RabbitAdmin 交给 Spring 管理,并声明式地将队列,交换机,绑定关系注册为 Bean,Spring Boot 会为我们优雅地完成这些组件的创建:

@Configuration
@Slf4j
public class RabbitConfig {

    final String QUEUE = "queue.test";
    final String EXCHANGE = "exchange.test";
    final String ROUTING_KEY = "key.test";

    /**
     * 声明队列 queue.test
     *
     * @return
     */

    @Bean
    public Queue testQueue() {
        return new Queue(QUEUE);
    }


    /**
     * 声明交换机 exchange.test
     *
     * @return
     */

    @Bean
    public Exchange testExchange() {
        return new DirectExchange(EXCHANGE);
    }

    /**
     * 声明绑定关系
     *
     * @return
     */

    @Bean
    public Binding testBinding() {
        return new Binding(QUEUE,
                Binding.DestinationType.QUEUE,
                EXCHANGE,
                ROUTING_KEY,
                null);
    }


    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

如上面的代码所示,我们将 RabbitAdmin 注册为一个 Bean,交给 Spring 管理;并将 Exchange,Queue,Binding 都声明为了 Bean。

我们从 RabbitMQ 管控台中将队列,交换机删除后,启动 Spring Boot 项目。

项目启动完成,回到 RabbitMQ 管控台,我们可以发现,Spring Boot “神奇地”创建了我们在 Spring Boot Config 中声明的交换机,队列,以及绑定关系:

Spring-AMQP 是如何做到通过 Spring Boot Config 声明式创建 Exchang,Queue,Binding 的?

查看源代码,我们可以看到 RabbitAdmin 实现了多个接口,其中便有 ApplicationContextAwareInitializingBean 接口。

ApplicationContextAware 接口的作用就是获取应用上下文资源,实现了该接口的 Bean 便可以拿到 Spring 容器(ApplicationContext)。

InitializingBean 则是一个 Bean 的生命周期接口,对应一个 Bean 的初始化阶段。

在往期文章《Java 面试八股文之框架篇(二)》中,我向大家介绍了 Spring Bean 的完整生命周期,其中便有关于 InitializingBean 接口的解读。

该接口只有一个 afterPropertiesSet() 方法,当一个 Bean 实现了 InitializingBean 接口,那么在这个 Bean 的初始化阶段,便会自动调用 afterPropertiesSet() 方法,执行其初始化的逻辑。

我们跟随源码,来到 RabbitAdmin 实现的 afterPropertiesSet() 方法中,便会看到方法内有如下逻辑:

this.connectionFactory.addConnectionListener 该方法的作用是为 ConnectionFactory 添加连接监听器,一旦发现有连接,即会回调 Lambda 表达式内的逻辑。

进入到上图中红框圈出的 initialize() 方法:

由于 RabbitAdmin 实现了 ApplicationContextAware 接口,所以它可以获取到整个 Spring 上下文。在逻辑中,我们看到,它获取到了上下文中所有类型为 ExchangeQueueBinding 的 Bean。

获取到这些 Bean 后,RabbitAdmin 便使用如上方式,对 Exchange,Queue,Binding 进行了声明与创建。

总结归纳

  1. RabbitAdmin 实现了 ApplicationContextAware 接口与 InitializingBean 接口
  2. RabbitAdmin 在初始化方法 afterPropertiesSet() 中,首先获取到 Spring 容器中,所有类型为 ExchangeQueueBinding 的 Bean,接着对其进行声明与创建;所以,我们可以通过 Spring Boot Config 声明式创建 Exchang,Queue,Binding 。

3. RabbitTemplate


RabbitTemplate 发送消息的方法:send 与 convertAndSend 的区别是什么?

在上文中,我们了解了 Spring-AMQP 的核心组件——RabbitAdmin,知道了该如何使用 RabbitAdmin 连接配置客户端,并声明交换机,消息队列与绑定关系。本小节,我将向大家继续讲解 Spring-AMQP 另一个重要的核心组件——RabbitTemplate

RabbitTemplate 主要功能为收发消息,但是通常我们只使用其消息发送的功能。发送消息的方法为:

  • send
  • convertAndSend

先来看一下 send 方法的基本使用,示例代码如下:

RabbitConfig

@Configuration
@Slf4j
public class RabbitConfig {

    final String QUEUE = "queue.test";
    final String EXCHANGE = "exchange.test";
    final String ROUTING_KEY = "key.test";

    /**
     * 声明队列 queue.test
     *
     * @return
     */

    @Bean
    public Queue testQueue() {
        return new Queue(QUEUE);
    }


    /**
     * 声明交换机 exchange.test
     *
     * @return
     */

    @Bean
    public Exchange testExchange() {
        return new DirectExchange(EXCHANGE);
    }

    /**
     * 声明绑定关系
     *
     * @return
     */

    @Bean
    public Binding testBinding() {
        return new Binding(QUEUE,
                Binding.DestinationType.QUEUE,
                EXCHANGE,
                ROUTING_KEY,
                null);
    }


    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}

我们依旧使用 Spring Boot Config,将 RabbitTemplate 注册为 Bean,交给 Spring 管理。

Producer

@Service
@Slf4j
public class Producer {

    final String QUEUE = "queue.test";
    final String EXCHANGE = "exchange.test";
    final String ROUTING_KEY = "key.test";


    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        String messageToSend = "test message";

        MessageProperties messageProperties = new MessageProperties();
        //  设置单条消息 TTL 为 1 min
        messageProperties.setExpiration("60000");
        Message message = new Message(messageToSend.getBytes(), messageProperties);
        CorrelationData correlationData = new CorrelationData();
        rabbitTemplate.send(
                EXCHANGE,
                ROUTING_KEY,
                message,
                correlationData
        );
        log.info("message sent");
    }
}

Producer 类中,我们使用了 rabbitTemplate.send 方法,发送了一条消息。

该方法(全参数重载)的第一个参数指定了交换机的名称;第二个参数为路由键名称;第三个参数接收一个 Message 对象:

Message message = new Message(messageToSend.getBytes(), messageProperties);

构建 Message 的第一个参数为消息体,消息体是一个 byte 数组,第二个参数为 MessageProperties 对象,该对象可以指定消息携带属性。示例代码中,我们指定了消息的 TTL,即失效时间为 1 分钟;send 方法的最后一个参数为一个 CorrelationData 对象,每一个发送的消息都要配备一个 CorrelationData 对象,该对象内部仅有一个 id 属性,用来表示当前消息的唯一性。

我们也可以手动指定这条消息唯一的 id,譬如:

CorrelationData correlationData = new CorrelationData(order.getID().toString()));

真实的业务场景中,我们一般会通过某种方式(譬如写入数据库)记录下这个 id,用来做纠错与对账。如果不指定 id,那么生成的 CorrelationData 对象将使用 UUID 来作为唯一 id。

TestController

@RestController
public class TestController {

    @Autowired
    private Producer producer;

    @GetMapping("/test")
    public String send() {
        producer.sendMessage();
        return "success";
    }
}

启动 Spring Boot 项目,调用接口。我们可以在 RabbitMQ 管控台中看到,消息发送成功:

convertAndSend 也是 RabbitTemplate 发送消息的方法之一。

convertAndSend 翻译为“转换并发送”。send 方法接收一个 Message 对象,convertAndSend 方法则可以直接传入一个对象,该对象将会在发送到 RabbitMQ Brocker 之前,被转换为 Message 对象。

来看示例代码:

@Service
@Slf4j
public class Producer {

    final String QUEUE = "queue.test";
    final String EXCHANGE = "exchange.test";
    final String ROUTING_KEY = "key.test";


    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        String messageToSend = "test message";
        CorrelationData correlationData = new CorrelationData();
        rabbitTemplate.convertAndSend(
                EXCHANGE,
                ROUTING_KEY,
                messageToSend,
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //  设置单条消息 TTL 为 1 min
                        MessageProperties messageProperties = message.getMessageProperties();
                        messageProperties.setExpiration("60000");
                        return message;
                    }
                },
                correlationData
        );
        log.info("message sent");
    }
}

convertAndSend 方法(全参数重载)的第一个参数指定了交换机的名称;第二个参数为路由键名称;第三个参数接收一个 Object 对象,示例代码中,我传入的是一个 String 字符串,如果你愿意,可以传入任何对象,前提是该对象需要实现 Serializable 接口;第四个参数为一个 MessagePostProcessor 对象,MessagePostProcessor 的作用是让消息对象发送到 RabbitMQ Brocker 之前来操作消息对象,譬如为消息设置属性;最后一个参数为 CorrelationData

启动 Spring Boot 项目,调用接口。我们依旧可以在 RabbitMQ 管控台中看到,消息发送成功:

那么,convertAndSend 方法是如何做到将对象自动转换为消息对象的呢?

我们进入到 convertAndSend 方法:

可以看到 convertAndSend 的本质就是调用了 send 进行消息发送,不过它在内部调用了一个 convertMessageIfNecessary 方法,将我们传入的对象转换为了 Message 消息对象。

进入到 convertMessageIfNecessary 方法:

该方法首先会判断对象是否为 Message 类型,如果是,则强转并返回;如果不是,则会调用 getRequiredMessageConverter().toMessage() 方法。

getRequiredMessageConverter() 方法会返回一个 MessageConverter 对象,也就是说,convertAndSend 方法其实就是调用了 MessageConverter 对象的 toMessage 方法,将我们传入的对象转换为 Message 对象,并调用 send 方法进行消息发送。

关于 MessageConverter 这一组件,我会在稍后进行详细的介绍,先来总结一下 sendconvertAndSend 的区别:

  1. 首先,二者均为 RabbitTemplate 发送消息的方法。send 方法指定我们传入一个 Message 对象;而 convertAndSend 方法则可以直接传递一个对象,传入的对象需实现 Serializable 接口。
  2. convertAndSend 方法的本质就是调用了 MessageConvertertoMessage 方法,将我们传入的对象转换为 Message 对象,并调用 send 方法进行消息发送。

使用 RabbitTemplate 开启消息确认机制与消息返回机制

关于 RabbitMQ 消息确认机制与消息返回机制的相关内容我就不再赘述了,对这两个知识点有疑问的童鞋,可以看上一篇文章《Java 面试八股文之消息队列篇(一)》进行复习~

使用 RabbitTemplate 开启消息确认与消息返回机制的方法十分简单,我们只需在配置类中进行配置即可,方法如下:

RabbitConfig

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    // 设置开启消息确认类型为 CORRELATED
    connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
    // 设置开启消息返回
    connectionFactory.setPublisherReturns(true);
    return connectionFactory;
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    // 开启消息返回机制
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnsCallback(returned -> {
        // 说明消息不可达
        log.info("message:{}", returned.getMessage().toString());
        log.info("replyCode:{}", returned.getReplyCode());
        log.info("replyText:{}", returned.getReplyText());
        log.info("exchange:{}", returned.getExchange());
        log.info("routingKey:{}", returned.getRoutingKey());
    });
    // 开启消息确认机制
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            log.info("send msg to Broker success");
            log.info("correlationData : {}", correlationData);
        } else {
            log.info("send msg to Broker fail");
            log.info("cause : {}", cause);
        }
    });
    return rabbitTemplate;
}

4. SimpleMessageListenerContainer


SimpleMessageListenerContainer 原理

1. 异步线程实现消息监听

上文中,我们学习了如何使用 RabbitAdmin 连接配置客户端,以及声明交换机,队列,绑定关系;也学习了如何使用 RabbitTemplate 发送消息。那么接下来,我们需要做的便是让消费者监听队列,并消费消息了。

在学习 Spring-AMQP 的消息监听容器之前,我们不妨先思考一下,如果让你去实现消息监听,你会怎么做?

首先,消息监听肯定是一个异步操作,所以,我的想法是通过异步线程池来管理异步线程,让异步线程去调用消费消息的方法;其次,为了能让异步线程持续监听消息队列,我们需要某种手段让其维持监听状态,最简单的方法就是 while(true) Thread.sleep()

具体代码如下:

AsyncTaskConfig

@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {

    @Bean
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        // 设置线程池的核心线程数
        threadPoolTaskExecutor.setCorePoolSize(5);
        // 设置线程池最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(10);
        // 设置缓冲队列的长度
        threadPoolTaskExecutor.setQueueCapacity(10);
        // 设置线程池关闭时,是否要等待所有线程结束后再关闭
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        // 设置线程池等待所有线程结束的时间
        threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
        // 设置所有线程的前缀名称
        threadPoolTaskExecutor.setThreadNamePrefix("Rabbit-Async-");
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }

}

AsyncTaskConfig 为线程池配置类,我在该配置类上使用了 @EnableAsync 注解,该注解的作用是使线程池执行器开启异步执行功能。

通常,@EnableAsync 注解与 @Async 注解合用,@Async 注解可以标注在方法上,也可以标注在类上。当 @Async 注解标注在方法上时,则指定该方法为异步执行;当标注在类上时,则指定该类上的所有方法均异步执行。当然,只有在线程池配置类中标注 @EnableAsync 注解,@Async 注解才会生效。

Consumer

@Service
@Slf4j
public class Consumer {

    final String QUEUE = "queue.test";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Async
    public void handleMessage() {
        try (
                Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
                Channel channel = connection.createChannel(false);
        ) {
            channel.basicConsume(QUEUE,
                    true,
                    deliverCallback,
                    consumerTag -> {
                    }
            );
            while (true) {
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            log.error("error message : {}", e.getMessage());
        }
    }

    DeliverCallback deliverCallback = (this::handle);

    private void handle(String consumerTag, Delivery message) {
        String messageBody = new String(message.getBody());

        log.info("receive message");
        log.info("consumerTag : {}", consumerTag);
        log.info("messageBody : {}", messageBody);

    }
}

Consumer 为消费者,在 handleMessage 方法上,我使用 @Async 注解,指定该方法为异步执行。方法内,我使用了 channel.basicConsume 来消费消息,并使用了 while(true) Thread.sleep() 来维持监听状态。

RabbitmqTutorialApplication

@SpringBootApplication
public class RabbitmqTutorialApplication implements ApplicationRunner {

    @Autowired
    private Consumer consumer;

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqTutorialApplication.classargs);
    }


    @Override
    public void run(ApplicationArguments args) throws Exception {
        consumer.handleMessage();
    }
}

RabbitmqTutorialApplication 为 Demo 工程的启动类,启动类实现了 ApplicationRunner 接口(或者可以实现 CommandLineRunner 接口),并重写了 run 方法,其作用为:使我们可以在启动类启动之后做一些事情,我们做的事情便是调用 Consumer 的异步方法 handleMessage,实现对消息队列的监听。

RabbitConfigProducer 等代码不变,为了节省文章篇幅就不再赘述了。

启动 Spring Boot 项目,调用接口。我们可以在控制台输出中看到,Producer 消息发送成功,Consumer 监听到了消息,并成功消费了消息;重复调用接口,Consumer 依旧能够监听到消息,并成功消费消息:

2023-04-06 23:30:14.655  INFO 143 --- [nectionFactory1] c.d.r.config.RabbitConfig                : send msg to Broker success
2023-04-06 23:30:14.655  INFO 143 --- [nectionFactory1] c.d.r.config.RabbitConfig                : correlationData : CorrelationData [id=fb01bfcb-ddc7-495b-8c6c-251911f154a7]
2023-04-06 23:30:14.656  INFO 143 --- [pool-1-thread-4] c.d.rabbitmqtutorial.service.Consumer    : receive message
2023-04-06 23:30:14.656  INFO 143 --- [pool-1-thread-4] c.d.rabbitmqtutorial.service.Consumer    : consumerTag : amq.ctag-0YmVWOIrKlKA85833g7VeQ
2023-04-06 23:30:14.656  INFO 143 --- [pool-1-thread-4] c.d.rabbitmqtutorial.service.Consumer    : messageBody : test message
2023-04-06 23:30:16.528  INFO 143 --- [nio-8080-exec-3] c.d.rabbitmqtutorial.service.Producer    : message sent
2023-04-06 23:30:16.529  INFO 143 --- [nectionFactory1] c.d.r.config.RabbitConfig                : send msg to Broker success
2023-04-06 23:30:16.529  INFO 143 --- [nectionFactory1] c.d.r.config.RabbitConfig                : correlationData : CorrelationData [id=9bbf5139-76a3-4609-b03b-e73b9fbe5f7d]
2023-04-06 23:30:16.529  INFO 143 --- [pool-1-thread-5] c.d.rabbitmqtutorial.service.Consumer    : receive message
2023-04-06 23:30:16.529  INFO 143 --- [pool-1-thread-5] c.d.rabbitmqtutorial.service.Consumer    : consumerTag : amq.ctag-0YmVWOIrKlKA85833g7VeQ
2023-04-06 23:30:16.529  INFO 143 --- [pool-1-thread-5] c.d.rabbitmqtutorial.service.Consumer    : messageBody : test message

2.SimpleMessageListenerContainer 的基本使用

在学习了异步线程调用这样一种逻辑后,我们来看一下 Spring-AMQP 提供的简单消息监听容器——SimpleMessageListenerContainer

SimpleMessageListenerContainer 的功能十分强大,其支持:

  • 设置同时监听多个队列,自动启动,自动配置 RabbitMQ
  • 设置消费者数量(最大数量,最小数量,批量消费)
  • 设置消息确认模式,是否重回队列,异常捕获
  • 设置是否独占,其他消费者属性等
  • 设置具体的监听器,消息转换器等
  • 支持动态设置,运行中修改监听器配置
  • ... ...

来看示例代码:

RabbitConfig

@Configuration
@Slf4j
public class RabbitConfig {

    final String QUEUE = "queue.test";
    final String EXCHANGE = "exchange.test";
    final String ROUTING_KEY = "key.test";

    // ... ...

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        // 设置监听队列
        messageListenerContainer.setQueueNames(QUEUE);
        // 设置消费者线程数量
        messageListenerContainer.setConcurrentConsumers(3);
        // 设置最大的消费者线程数量
        messageListenerContainer.setMaxConcurrentConsumers(5);
        // 消费端开启手动确认
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置消息监听器
        messageListenerContainer.setMessageListener(new MyChannelAwareMessageListener());

        // 消费端限流
        messageListenerContainer.setPrefetchCount(20);
        return messageListenerContainer;
    }

}

MyChannelAwareMessageListener

@Slf4j
public class MyChannelAwareMessageListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        log.info("receive message:{}"new String(message.getBody()));
        // 消费端手动确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        // 重回队列
        // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        // 拒绝消息
        // channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }
}

我们依旧使用 Spring Boot Config 的方式,将 SimpleMessageListenerContainer 这一组件注册为 Bean,交给 Spring 管理。

在代码中,我设置了 SimpleMessageListenerContainer 监听的队列,消费者线程数量,并设置了消费端手动确认,开启了消费端限流;同时,我为其设置了自定义的消息监听器:MyChannelAwareMessageListener,该消息监听器实现了 ChannelAwareMessageListener 接口。

我们通常会使用两种消息监听器的实现:

  1. ChannelAwareMessageListener
  2. MessageListener

这二者的不同点在于,MessageListener 只能获取到 Message 信息,而 ChannelAwareMessageListener 则可以获取到 MessageChannel 的信息,我们便可以通过 Channel 开启消费端手动确认等设置。

启动 Spring Boot 项目,调用接口。我们可以在控制台输出中看到,消息被成功监听且消费:

2023-04-08 23:01:08.442  INFO 7713 --- [nio-8080-exec-1] c.d.rabbitmqtutorial.service.Producer    : message sent
2023-04-08 23:01:08.444  INFO 7713 --- [enerContainer-2] c.d.r.c.MyChannelAwareMessageListener    : receive message:test message
2023-04-08 23:01:08.450  INFO 7713 --- [nectionFactory1] c.d.r.config.RabbitConfig                : send msg to Broker success
2023-04-08 23:01:08.450  INFO 7713 --- [nectionFactory1] c.d.r.config.RabbitConfig                : correlationData : CorrelationData [id=9a48e245-8930-4047-94c2-dbac8fdfb583]

那么,SimpleMessageListenerContainer 是如何做到的呢?接下来,便由我带大家一探究竟~

3. SimpleMessageListenerContainer 原理

上图便是 SimpleMessageListenerContainer 继承链的 UML 类图。我们可以看到,它最终实现了 Lifecycle 接口。Lifecycle 用于对一个 Bean 生命周期的控制操作,该接口共定义了三个方法:

public interface Lifecycle {
    void start();
    void stop();
    boolean isRunning();
}

start/stop 对应启动与停止,isRunning 用于判断组件是否正在运行;当一个 Bean 实现了 Lifecycle 接口时,在 Bean 初始化完毕后,Spring 会自动调用该组件实现的 start 方法。

SimpleMessageListenerContainerstart 方法在父类 AbstractMessageListenerContainer 中实现:

可以看到,AbstractMessageListenerContainerstart 方法中,调用了 doStart 方法;而 SimpleMessageListenerContainer 对父类的 doStart 方法进行了重写:

我们顺着 doStart 的逻辑往下走,着重地看一下我在上图中用红框圈出的代码。

首先,在 doStart 方法内,调用了 initializeConsumers 方法,我们直观地理解该方法的含义为:对消费者进行初始化。

进入到 initializeConsumers 方法:

initializeConsumers 方法内调用了 createBlockingQueueConsumer 方法来创建消费者对象,消费者对象的类型为 BlockingQueueConsumerBlockingQueueConsumer 是 Spring AMQP 定义的消费者类,它具有自己的生命周期(包含 startstop 方法)。

回到 doStart 方法。在初始化消费者对象后,代码里创建了一个存储类型为 AsyncMessageProcessingConsumerHashSet 对象 processors;所有的 BlockingQueueConsumer 对象重新包装成了 AsyncMessageProcessingConsumer 对象,然后被 addprocessores 中,接着调用了 getTaskExecutor().execute(processor) 方法。

AsyncMessageProcessingConsumer 翻译为 “支持异步消息处理的消费者”;它是 SimpleMessageListenerContainer 的一个内部类,其实现了 Runnable 接口;而 getTaskExecutor() 方法将会返回一个 Executor 即:异步任务执行器,异步任务执行器的 execute 方法需要传入一个实现 Runnable 接口的“任务对象”,它会直接调用实现了 Runnable 接口的“任务对象”的 run 方法。

那么这段逻辑做的事情,翻译成人话就是:将 BlockingQueueConsumer 类型的消费者包装为实现了 Runnable 接口,可异步处理消息的 AsyncMessageProcessingConsumer,并丢进异步线程池中执行。

我们进入 AsyncMessageProcessingConsumer 实现的 run 方法,直接来看代码中的核心逻辑:

// ... ...
try {
  initialize();
  while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
      mainLoop();
  }
}
// ... ...

核心逻辑中主要做了两个关键操作:

  1. initialize
  2. mainLoop

首先,我们来看一下 initialize 初始化操作中做了什么:

在初始化逻辑中,调用了 BlockingQueueConsumer 生命周期的 start 方法,进入 start 方法:

start 方法中,有两个主要操作;passiveDeclarations 方法的作用为校验监听队列是否存在,setQosAndCreateConsumers 方法用于为客户端设置 Qos 以及消息订阅。进入到 setQosAndCreateConsumers 方法,我们可以看到其内部调用了 consumeFromQueue 方法,进入到 consumeFromQueue 方法:

该方法使用了 channel.basicConsume 订阅消息,它向 RabbitMQ Broker 发送指令,就相当于告诉服务器,我已经准备好了,如果监听队列有消息,你就把它推送给我~

RabbitMQ Brocker 接收到客户端发送的指令后,便会向客户端反馈,它通过 Basic.Deliver 指令类型将消息推送给客户端,一条消息对应一个 Basic.Deliver 指令的反馈,客户端收到服务端返回的指令后,将通过 ChannelN 这个类的 processAsync 方法进行处理,最终,将会调用 BlockingQueueConsumer 内部类 InternalConsumerhandleDelivery 方法:

handleDelivery 方法的核心操作就是将向 Brocker 发送的消息 offerBlockingQueueConsumer 对象存储的队列 queue

看完了 initialize 方法做的事情后,我们来看第二个核心操作 mainLoop

mainLoop 外层通过一个 while 无限循环套用,它做的事情就是从队列 queue 拿消息,并经过一系列操作最终传递到用户实现的 MessageListener 中。

mainLoop 的核心逻辑为 receiveAndExecutereceiveAndExecute 方法内部调用了 doReceiveAndExecute 方法,doReceiveAndExecute 有两处重要的逻辑:

  1. consumer.nextMessage
  2. executeListener

我们可以看到,nextMessage 方法的本质就是通过 queue.poll 来获取下一条消息的。

接着,我们进入到 executeListener 方法,不断跟踪,可以看到,该方法最终调用了 doInvokeListener 方法:

doInvokeListener 正是调用了我们实现的 MessageListeneronMessage 方法。

至此为止,我们也就清楚了 SimpleMessageListenerContainer 的原理,总结:

  1. SimpleMessageListenerContainer 实现了 Lifecycle 接口,在它完成初始化后,Spring 会自动调用它的 start 方法;SimpleMessageListenerContainer 的父类实现了 start 方法,而在 start 方法中会调用 SimpleMessageListenerContainerdoStart 方法。
  2. doStart 中做了两件事,首先就是调用了 initializeConsumers 方法,对消费者进行初始化,生成了 BlockingQueueConsumer;然后,就是将 BlockingQueueConsumer 类型的消费者包装为实现了 Runnable 接口,可异步处理消息的 AsyncMessageProcessingConsumer,并丢进异步线程池中执行。
  3. 异步线程池会调用 AsyncMessageProcessingConsumerrun 方法。run 方法中,首先就是调用了 initialize 方法,initialize 主要的作用就是为客户端设置 Qos 以及消息订阅等操作,当 RabbitMQ 服务端收到客户端发送的指令后,会将消息推送给客户端,本质便是将消息缓存到队列中(queue.offer());第二个核心的操作就是 mainLoop 操作,mainLoop 外层通过一个 while 无限循环套用,它做的事情就是从队列 queue 拿消息(queue.poll()),并经过一系列操作最终传递并调用到用户实现的 MessageListeneronMessage 方法中。

5. MessageListenerAdapter


MessageListenerAdapter 的基本使用

MessageListenerAdapter 即:消息监听适配器。话不多说,我们先来看一下它的基本使用:

MessageDelegate

@Slf4j
@Component
public class MessageDelegate {

    public void handleMessage(byte[] msgBody) {
        log.info("invoke handleMessage,msgBody : {}"new String(msgBody));
    }
}

首先,我定义了一个负责消息处理的代理类 MessageDelegate,该类有一个方法 handleMessage,方法传参为 byte 类型的数组。对消息处理的逻辑也很简单,只是打印日志。

RabbitConfig

@Configuration
@Slf4j
public class RabbitConfig {
    
    final String QUEUE = "queue.test";
    final String EXCHANGE = "exchange.test";
    final String ROUTING_KEY = "key.test";
    
    @Resource
    private MessageDelegate messageDelegate;
    
    // ... ...

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        // 设置监听队列
        messageListenerContainer.setQueueNames(QUEUE);
        // 设置消费者线程数量
        messageListenerContainer.setConcurrentConsumers(3);
        // 设置最大的消费者线程数量
        messageListenerContainer.setMaxConcurrentConsumers(5);
        // 消费端开启手动确认
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 消费端限流
        messageListenerContainer.setPrefetchCount(20);
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        // 设置代理
        messageListenerAdapter.setDelegate(messageDelegate);
        // 设置消息监听器
        messageListenerContainer.setMessageListener(messageListenerAdapter);
        return messageListenerContainer;
    }

}

RabbitConfig 配置类中,我将 MessageDelegate 组件注入,并通过如上方式进行了设置;我们可以看到,MessageListenerAdapter 对象可以被传入到 messageListenerContainer.setMessageListener() 方法中,这便说明了 MessageListenerAdapter 本质上就是消息监听器。

通过 UML 类图,也可以直观地看出,MessageListenerAdapter 实现了 MessageListener 接口:

启动 Spring Boot 项目,调用接口。可以在控制台输出中看到,我们自定义的 MessageDelegatehandleMessage 方法被成功调用了:

2023-04-10 19:33:17.038  INFO 50607 --- [nio-8080-exec-1] c.d.rabbitmqtutorial.service.Producer    : message sent
2023-04-10 19:33:17.040  INFO 50607 --- [enerContainer-1] c.d.r.delegate.MessageDelegate           : invoke handleMessage,msgBody : test message
2023-04-10 19:33:17.087  INFO 50607 --- [nectionFactory1] c.d.r.config.RabbitConfig                : send msg to Broker success
2023-04-10 19:33:17.087  INFO 50607 --- [nectionFactory1] c.d.r.config.RabbitConfig                : correlationData : CorrelationData [id=25d0b20d-4bbd-4ff7-9a54-41d9385bea83]

MessageListenerAdapter 原理

上文中,我介绍了 MessageListenerAdapter 的一个基本使用,大家可能会好奇:Spring AMQP 是如何找到并调用代理的 handleMessage 方法处理消息的?

我们知道,MessageListenerAdapter 本质就是一个消息监听器。在为 SimpleMessageListenerContainer 设置消息监听器后,它会回调用户实现的 onMessage 方法来处理消息,所以,MessageListenerAdapter 也必然实现了 onMessage 方法:

上图所示的代码,便是 MessageListenerAdapter 实现的 onMessage 方法,其中有两处重要的逻辑,我已经使用红框圈出,第一处为 getListenerMethodName 方法,第二处为 invokeListenerMethod 方法。

进入到 getListenerMethodName 方法:

我们看到,该方法首先会对 queueOrTagToMethodName 这个 HashMap 进行判断,如果该 HashMapsize 不为空,则会走 if中的逻辑;由于我们的代码中未设置该参数,所以将会走 getDefaultListenerMethod 的逻辑。

getDefaultListenerMethod 方法直接返回一个 defaultListenerMethod 常量,这个常量便是 handleMessage,而 handleMessage 便是 MessageListenerAdapter 默认调用的方法名:

第一处逻辑是通过 getListenerMethodName 方法获取到了监听方法名;那么第二处逻辑想必大家已经猜测到其含义了,invokeListenerMethod 方法会通过拿到的方法名,使用 org.springframework.util 包下的 MethodInvoker 工具类的 invoke 方法进行回调,其本质便是使用了 Java 的反射技术。因为篇幅限制,这里我就不带着大家一步一步地进行代码跟踪了~

也许有人会问,难道 MessageListenerAdapter 只能识别用户中介类的 handleMessage 方法么?我们是否可以自定义方法,让 Spring AMQP 进行调用呢?

答案当然是可以的,还记得 getListenerMethodName 中的逻辑么?该方法首先会对 queueOrTagToMethodName 这个 HashMap 进行判断,如果 HashMapsize 为空才会走 getDefaultListenerMethod 方法,所以,我们自然是可以通过设置 queueOrTagToMethodName,来使 Spring AMQP 识别到我们自定义的消息监听方法,代码如下:

RabbitConfig

@Configuration
@Slf4j
public class RabbitConfig {
    
    final String QUEUE = "queue.test";
    final String EXCHANGE = "exchange.test";
    final String ROUTING_KEY = "key.test";
    
    @Resource
    private MessageDelegate messageDelegate;
    
    // ... ...

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        // 设置监听队列
        messageListenerContainer.setQueueNames(QUEUE);
        // 设置消费者线程数量
        messageListenerContainer.setConcurrentConsumers(3);
        // 设置最大的消费者线程数量
        messageListenerContainer.setMaxConcurrentConsumers(5);
        // 消费端开启手动确认
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 消费端限流
        messageListenerContainer.setPrefetchCount(20);
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        // 设置自定义方法名
        Map<String, String> map = new HashMap<>(8);
        map.put(QUEUE, "handle");
        messageListenerAdapter.setQueueOrTagToMethodName(map);
        // 设置代理
        messageListenerAdapter.setDelegate(messageDelegate);
        // 设置消息监听器
        messageListenerContainer.setMessageListener(messageListenerAdapter);
        return messageListenerContainer;
    }

}

在代码中,我通过 MessageListenerAdapter 对象的 setQueueOrTagToMethodName 方法设置了 queueOrTagToMethodName 这个 HashMapHashMap 的键为队列名称,值为方法名称,示例代码中规定 MessageListenerAdapter 回调的监听方法名为 handle

MessageDelegate

@Slf4j
@Component
public class MessageDelegate {

    public void handleMessage(byte[] msgBody) {
        log.info("invoke handleMessage,msgBody : {}"new String(msgBody));
    }

    public void handle(byte[] msgBody) {
        log.info("invoke handle,msgBody : {}"new String(msgBody));
    }
}

启动 Spring Boot 项目,调用接口。可以在控制台输出中看到,我们自定义的 MessageDelegatehandle 方法被成功调用了:

2023-04-11 15:40:12.566  INFO 56939 --- [nio-8080-exec-1] c.d.rabbitmqtutorial.service.Producer    : message sent
2023-04-11 15:40:12.577  INFO 56939 --- [enerContainer-1] c.d.r.delegate.MessageDelegate           : invoke handle,msgBody : test message
2023-04-11 15:40:12.599  INFO 56939 --- [nectionFactory1] c.d.r.config.RabbitConfig                : send msg to Broker success
2023-04-11 15:40:12.599  INFO 56939 --- [nectionFactory1] c.d.r.config.RabbitConfig                : correlationData : CorrelationData [id=faff2552-52bd-47a2-8e90-110ee6106d5f]

总结:

  1. MessageListenerAdapter 本质就是一个消息监听器。
  2. MessageListenerAdapter 实现的 onMessage 方法中,首先会通过 getListenerMethodName 来获取消息监听的方法名,默认为 handleMessage;在拿到方法名后,它便会通过反射,来回调方法。
  3. 我们可以通过设置 queueOrTagToMethodName 来自定义 MessageListenerAdapter 回调的监听方法名。

6. MessageConverter


Jackson2JsonMessageConverter

在讲解 RabbitTemplate 时,我们已经简单地了解了 MessageConverter 这一组件了。

RabbitTemplateconvertAndSend 方法的本质便是调用了 MessageConverter 组件的 toMessage 方法,将我们传入的 Object 对象转换为 Message 对象,并调用 send 方法来进行消息发送。

MessageConverter 翻译为“消息转换器”。在之前,我们进行收发消息时,都会使用byte 数组作为消息体,但是,在真实的业务场景下,我们通常需要使用 Java 对象来作为消息体,这时,我们就可以使用 MessageConverter 在收发消息时,对消息进行转换。

Jackson2JsonMessageConverter 是最常用的 MessageConverter,顾名思义,它的作用是用来转换 JSON 格式消息的,而 JSON 又是我们做数据传输时,最常见的一种格式。

代码示例 🌰:

Order

@Getter
@Setter
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Order implements Serializable {
    private String orderId;
    private Double price;
}

Order 为订单类,是我创建的一个实体类。

JSONUtils

public class JSONUtils {
    /**
     * 将 Java 对象序列化为 JSON 字符串
     *
     * @return
     */

    public static String objectToJson(Object obj) {

        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 将 JSON 字符串反序列化为 Java 对象
     *
     * @param jsonStr
     * @return
     */

    public static Object jsonToObject(String jsonStr, Class<?> clazz) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.readValue(jsonStr, clazz);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }
}

JSONUtils 是一个可以将对象与 JSON 进行相互转换的工具类。

Producer

@Service
@Slf4j
public class Producer {

    final String QUEUE = "queue.test";
    final String EXCHANGE = "exchange.test";
    final String ROUTING_KEY = "key.test";


    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        // String messageToSend = "test message";
        Order order = new Order().builder()
                .orderId("111")
                .price(888.8)
                .build();
        String json = JSONUtils.objectToJson(order);

        CorrelationData correlationData = new CorrelationData();
        rabbitTemplate.convertAndSend(
                EXCHANGE,
                ROUTING_KEY,
                json,
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //  设置单条消息 TTL 为 1 min
                        MessageProperties messageProperties = message.getMessageProperties();
                        messageProperties.setContentType("application/json");
                        messageProperties.setExpiration("60000");
                        return message;
                    }
                },
                correlationData
        );
        log.info("message sent");
    }
}

Producer 为发送消息的生产者。代码中,首先,我创建了一个订单对象,并将订单对象转换为一个 JSON;然后,我将 JSON 作为消息体,并使用 rabbitTemplate.convertAndSend 方法进行消息发送。在方法内,我对 MessagePropertiesContentType 进行了设置,将其设置为 "application/json",这一点是需要注意的,如果没有设置 ContentType,由于发送的消息体为 String 字符串,默认的 ContentType 则默认为 "text/plain",消息就会被当作 String 字符串发送。

RabbitConfig

@Configuration
@Slf4j
public class RabbitConfig {
    
    final String QUEUE = "queue.test";
    final String EXCHANGE = "exchange.test";
    final String ROUTING_KEY = "key.test";
    
    @Resource
    private MessageDelegate messageDelegate;
    
    // ... ...

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        // 设置监听队列
        messageListenerContainer.setQueueNames(QUEUE);
        // 设置消费者线程数量
        messageListenerContainer.setConcurrentConsumers(3);
        // 设置最大的消费者线程数量
        messageListenerContainer.setMaxConcurrentConsumers(5);
        // 消费端开启手动确认
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 消费端限流
        messageListenerContainer.setPrefetchCount(20);
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        // 设置自定义方法名
        Map<String, String> map = new HashMap<>(8);
        map.put(QUEUE, "handle");
        messageListenerAdapter.setQueueOrTagToMethodName(map);
        // 设置代理
        messageListenerAdapter.setDelegate(messageDelegate);
        // 设置消息转换器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter("*");
        jackson2JsonMessageConverter.setClassMapper(new ClassMapper() {
            @Override
            public void fromClass(Class<?> clazz, MessageProperties properties) {

            }

            @Override
            public Class<?> toClass(MessageProperties properties) {
                return Order.class;
            }
        });
        messageListenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
        // 设置消息监听器
        messageListenerContainer.setMessageListener(messageListenerAdapter);
        return messageListenerContainer;
    }

}

RabbitConfig 配置类中,我为 messageListenerAdapter 设置了 Jackson2JsonMessageConverter 消息转换器。转换器可以通过设置 ClassMapper 指定 JSON 到具体类的映射。在代码中,我重写了 ClassMappertoClass 方法,使 JSON 类型的消息映射为 Order 类型,这样我们便可以在消息监听方法中,使用 Order 类型来接收消息。如果不设置 ClassMapper,那么即默认接收消息的类型为 Map

MessageDelegate

@Slf4j
@Component
public class MessageDelegate {

    public void handle(Order order) {
        log.info("invoke handle, msg : {}", order);
    }
}

由于我们在消息转换器中已经指定了消息从 JSON 到 Order 的映射,所以,消息监听方法 handle 的传参类型为 Order

启动 Spring Boot 项目,调用接口,消息收发成功,控制台输出内容如下:

2023-04-15 21:56:26.631  INFO 99952 --- [nio-8080-exec-1] c.d.rabbitmqtutorial.service.Producer    : message sent
2023-04-15 21:56:26.668  INFO 99952 --- [enerContainer-3] c.d.r.delegate.MessageDelegate           : invoke handle, msg : Order(orderId=111, price=888.8)
2023-04-15 21:56:26.682  INFO 99952 --- [nectionFactory1] c.d.r.config.RabbitConfig                : send msg to Broker success
2023-04-15 21:56:26.683  INFO 99952 --- [nectionFactory1] c.d.r.config.RabbitConfig                : correlationData : CorrelationData [id=deded32a-a28e-449e-8455-3a8f6ef5248b]

如果在你的项目中,需要自定义消息转换器,那便可以通过实现 MessageConverter 接口,重写 toMessagefromMessage 方法来完成。这部分内容就不再赘述了~

RabbitListener


RabbitListener 是 Spring AMQP 消费者监听消息的终极方案,它的使用方法非常简单,我们仅需在消息处理类或指定方法上添加 @RabbitListener 注解即可。

@RabbitListener 注解标注于方法之上时,该方法便是一个可用于消息监听的监听器;当 @RabbitListener 注解标注于类上,则需要同另一个注解 @RabbitHandler 一同配合使用。这种使用方式可用于实现监听不同队列,不同类型消息的处理,类中标注了 @RabbitHandler 的方法就是可用于消息监听的监听器。RabbitListener 基本上可以满足开发者 90% 以上的业务开发需求。

关于 RabbitListener 的基本使用方法,因为文章篇幅限制,就不再赘述了,我们来重点探讨它的原理。

RabbitListener 原理

1.RabbitAutoConfiguration

我们应该都知道 Spring Boot 自动配置的原理——当 Spring Boot 项目启动时,会通过 @EnableAutoConfiguration 注解定位到 META-INF/spring.factories 文件中,并获取所有 EnableAutoConfiguration 属性的值。这些值便是 Spring Boot 要执行的自动配置类。这些配置类通常以 XXXAutoConfiguration 这种形式来命名的。

RabbitAutoConfiguration 便是 Spring AMQP 的自动配置类:

我来解释一下 RabbitAutoConfiguration 配置类上这些注解的含义:

  1. @AutoConfiguration 注解就不必多说了,它是自动配置注解。
  2. @ConditionalOnClass 注解是一个条件注解,其含义为当 ClassPath 路径下有指定类时,才会去扫描解析当前的自动配置类;所以,只有 ClassPath 路径下存在 RabbitTemplateChannel 这两个类时,才会去扫描解析 RabbitAutoConfiguration 自动配置类。
  3. @EnableConfigurationProperties 注解的作用是使标注了 @ConfigurationProperties 注解的类生效;源代码的含义便是使 RabbitProperties 类生效,RabbitProperties 类上标注了 @ConfigurationProperties 注解,其作用是获取配置文件中所有以 spring.rabbitmq 的属性
  4. @Import 注解的作用是引入其他的配置类,源代码中,该注解引入了 RabbitAnnotationDrivenConfiguration 配置类。

接下来,我们进入到 RabbitAnnotationDrivenConfiguration 配置类:

我们看到,在 RabbitAnnotationDrivenConfiguration 类上也有一个 @ConditionalOnClass 注解,其含义为当 ClassPath 路径下有指定类 EnableRabbit 时,才会去扫描解析当前的配置类。进入到 EnableRabbit 类:

我们可以看到 EnableRabbit 引入了 RabbitListenerConfigurationSelector,像这种 XXXConfigurationSelector 的主要作用是收集需要导入的配置类,进入到 RabbitListenerConfigurationSelector

RabbitListenerConfigurationSelectorselectImports 方法返回了一个字符串数组,该字符串数组中有两个配置类名,分别是 RabbitBootstrapConfigurationMultiRabbitBootstrapConfiguration,这两个配置类便是 EnableRabbit 需要导入的配置类 ;进入到 RabbitBootstrapConfiguration 类中:

我们看到 RabbitBootstrapConfiguration 配置类的 registerBeanDefinitions 方法的逻辑就是向注册表注册两个 BeanDefinition 实例。

BeanDefinition 就是对 Bean 的一个定义,它保存了关于一个 Bean 的各种信息,注册的过程就是将 Bean 定义为 BeanDefinition,然后放入至 Spring 容器中。

而这两个 BeanDefinition 则是 RabbitListenerAnnotationBeanPostProcessorRabbitListenerEndpointRegistry

2.RabbitListenerAnnotationBeanPostProcessor

BeanPostProcessor 是 Spring 容器提供的一个重要接口。从字面的意思来看,BeanPostProcessor 翻译为 Bean 的后置处理器。当一个 Bean 实现了 BeanPostProcessor 接口,重写 postProcessBeforeInitializationpostProcessAfterInitialization 方法,就可以在 Bean 的初始化方法的前后,进行一些特殊的逻辑处理。

RabbitListenerAnnotationBeanPostProcessor 便实现了 BeanPostProcessor 接口。

先来看一下 RabbitListenerAnnotationBeanPostProcessorpostProcessBeforeInitialization 方法:

我们看到,在该 Bean 初始化之前,postProcessBeforeInitialization 方法并未对其作任何处理。

接着,我们来到 postProcessAfterInitialization 方法:

该方法中,重点的逻辑有三处,在上图中我已经使用红框圈出。

第一处逻辑是 buildMetadata 方法,该方法我就不带领大家一起深入跟踪了。

它的作用是对所有标注 @RabbitListener 注解的类与方法,以及标注了 @RabbitHandle 注解的方法进行获取解析,并封装到 TypeMetadata对象中,然后,将 TypeMetadata 对象交给 processAmqpListenerprocessMultiMethodListeners 方法处理。

processAmqpListener 会对标注了 @RabbitListener 注解的方法进行解析;processMultiMethodListeners 则会对标注了 @RabbitListener 注解的类中,标注了 @RabbitHandle 注解的方法进行解析。这两个方法的逻辑大体相同,我们就来看下 processAmqpListener 方法:

在该方法中,首先调用了 checkProxy 方法,其作用是对 JDK 动态代理的情况进行检查,检查代理的目标接口是否含有对应方法;然后,便创建了一个 MethodRabbitListenerEndpoint 对象,将该对象与其他参数一同传入 processListener 方法。

继续进入到 processListener 方法:

processListener 方法前面的逻辑很简单,它会对 @RabbitListener 注解的属性进行校验,并设置到 MethodRabbitListenerEndpoint 对象中,最后它将调用上图红框中的逻辑 :this.registrar.registerEndpointregistrarRabbitListenerEndpointRegistrar 类的对象。

3.RabbitListenerEndpointRegistrar

RabbitListenerEndpointRegistrar 是一个工具类,该类的说明中写道:RabbitListenerEndpointRegistrar 的作用是将 RabbitListenerEndpoint 注册到 RabbitListenerEndpointRegistry 中。

进入到 registerEndpoint 方法:

在该方法的逻辑中,首先会创建一个 AmqpListenerEndpointDescriptor对象,AmqpListenerEndpointDescriptor 是对 MethodRabbitListenerEndpointRabbitListenerContainerFactory 的封装。创建了该类的对象后,接着就来到了一个同步代码块中,由于 startImmediately 值为 false,所以同步代码块会走 this.endpointDescriptors.add(descriptor) 的逻辑。endpointDescriptorsAmqpListenerEndpointDescriptorList 列表。

RabbitListenerEndpointRegistrar 实现了 InitializingBean 接口,我们接下来看一下它的 afterPropertiesSet 初始化方法:

我们可以看到,在 afterPropertiesSet 初始化方法中,调用了 registerAllEndpoints 方法,registerAllEndpoints 方法中也有一个同步代码块,代码块中的逻辑为循环 endpointDescriptors 列表,然后调用 this.endpointRegistry.registerListenerContainer 这段逻辑;整个循环结束后,将 startImmediately 这一变量设置为 true

4.RabbitListenerEndpointRegistry

接着,我们来到 RabbitListenerEndpointRegistry 类的 registerListenerContainer 方法:

该方法的主要作用便是为已经注册的 RabbitListenerEndpoint 创建 MessageListenerContainer 实例。

RabbitListenerEndpointRegistry 实现了 SmartLifecycle 接口,那么自然地,我们就要去看一下它的 start 方法:

start 方法中调用了 startIfNecessary 方法,进入 startIfNecessary 方法:

最终,我们看到了,在 startIfNecessary 中,调用了 MessageListenerContainerstart 方法,而后面的内容,便衔接到了 SimpleMessageListenerContainer 原理的部分。

至此,我们也终于明白了 @RabbitListener 注解其原理的精妙所在。

总结

在今日的文章分享中,我向大家简单介绍了 Spring AMQP 中重要的核心组件以及这些组件的原理。

通过这篇文章,你需要知晓的内容有:

  1. 什么是消息队列?消息队列有什么作用?
  2. 什么是 AMQP 协议?Direct,Fanout,Topic 三种 Exchange 有什么区别?
  3. RabbitMQ 如何保证消息的可靠性?
  4. 请介绍一下 RabbitMQ 发送端的 Confirm 机制?
  5. 请介绍一下 RabbitMQ 消息返回机制?
  6. 请介绍一下 RabbitMQ 消费端的确认机制?
  7. 消费端的 basicConsume 方法中,将 autoAck 参数设置为 true 会有什么问题?
  8. 消费端的限流机制有什么作用?
  9. 如何为消息设置过期时间?
  10. 什么是死信队列?
  11. 哪些情况下,一条消息会被标记为死信?

好啦,至此为止,这篇文章就到这里了,Java 面试八股文系列后面依旧会继续更新并查缺补漏,感谢您的阅读与支持~~

欢迎大家关注我的公众号,在这里希望你可以收获更多的知识,我们下一期再见!

分类:

后端

标签:

Java

作者介绍

jrh
V1