
jrh
2023/04/17阅读:18主题:丘比特忙
Java 面试八股文之消息队列篇(二)
前言
封面是德国童话故事《The Duration of Life》的插页,它被格林兄弟收集于格林童话故事中,编号为 176。
这篇故事讲述的内容是,上帝在创造了世界后,为所有的生物都赋予了 30 年寿命。驴子觉得自己勤勤恳恳为人类服务,却换来人类对它的拳打脚踢,所以请求上帝让它少活几年,上帝便收回驴子 18 年的寿命;狗害怕衰老,因为衰老会让它咬不动食物,并且无法快乐地奔跑,便请求上帝让它少活几年,上帝便收回狗 12 年的寿命;猴子不想自己总是干着扮鬼脸逗人发笑的事儿,也请求上帝让它少活几年,上帝便收回了猴子 10 年的寿命。最后轮到人类,人类嫌弃 30 年寿命太短,请求上帝多赐予他们些生命。于是,上帝把从驴子那里拿来的 18 年赐给了人类,可是人类还是不满足;上帝又把从狗那里拿来的 12 年赐给人类,人类依旧不满足;最后,上帝将猴子多出来的 10 年也给了人类。这样,人的寿命便延长到了 70岁,起先 30 年是他们的本份,这个阶段人类健康、快乐,高兴地工作,生活也充满了欢乐;接下来是驴子的 18 年,在这个阶段,生活的负担压在肩上,人类虽然会辛勤地劳作,但是他们这种忠实的服务换来的却是别人的嫌弃与谩骂;然后是狗的 12 年,那时人类失去了牙齿,失去了健壮的骨骼,只能躺在床上哀怨;最后,就是猴子的 10 年,人类在最后的 10 年里,变得痴呆,糊里糊涂,成了孩子们捉弄、嘲笑的对象。
这是系列文章【 Java 面试八股文 】消息队列篇的第二期。
【 Java 面试八股文 】系列会陆续更新 Java 面试中的高频问题,旨在从问题出发,带你理解 Java 基础,数据结构与算法,数据库,常用框架等知识点。该系列前几期文章可以通过下方给出的链接进行查看~
往期文章
-
Java 面试八股文之基础篇(一) -
Java 面试八股文之基础篇(二) -
Java 面试八股文之基础篇(三) -
Java 面试八股文之数据结构与算法篇(一) -
Java 面试八股文之数据结构与算法篇(二) -
Java 面试八股文之数据结构与算法篇(三) -
Java 面试八股文之数据结构与算法篇(四) -
Java 面试八股文之设计模式篇(一) -
Java 面试八股文之设计模式篇(二) -
Java 面试八股文之设计模式篇(三) -
Java 面试八股文之数据库篇(一) -
Java 面试八股文之数据库篇(二) -
Java 面试八股文之数据库篇(三) -
Java 面试八股文之框架篇(一) -
Java 面试八股文之框架篇(二) -
Java 面试八股文之框架篇(三) -
Java 面试八股文之框架篇(四) -
Java 面试八股文之中间件篇(一) -
Java 面试八股文之中间件篇(二) -
Java 面试八股文之中间件篇(三) -
Java 面试八股文之中间件篇(四) -
Java 面试八股文之消息队列篇(一)
消息队列篇(二)
1. Spring-AMQP
Spring-AMQP 是 Spring 对 AMQP 协议的封装与扩展,它将 Spring 的核心概念应用于基于 AMQP 的消息传递解决方案中,使得开发者可以通过 Spring-AMQP 更简单方便地完成声明组件(队列,交换机等),收发消息等工作。
Spring-AMQP 是一个抽象层,不依赖于特定的 AMQP Broker 的实现,这样做的好处在于,可以使用户只针对抽象层来进行开发,而不用关心底层具体的实现是什么。
本篇文章内容基于 spring-boot-starter-amqp:2.7.5
创作。
2. RabbitAdmin
RabbitAdmin 的基本使用
RabbitAdmin
是 Spring-AMQP 中的核心组件。顾名思义,RabbitAdmin
是用来连接、配置与管理 RabbitMQ 的,其主要功能包括:
-
declareExchange
:创建交换机 -
deleteExchange
:删除交换机 -
declareQueue
:创建队列 -
deleteQueue
:删除队列 -
purgeQueue
:清空队列 -
declareBinding
:创建绑定关系 -
removeBinding
:删除绑定关系 -
... ...
来看一个 🌰:
Producer
@Service
@Slf4j
public class Producer {
final String QUEUE = "queue.test";
final String EXCHANGE = "exchange.test";
final String ROUTING_KEY = "key.test";
public void initRabbit() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 声明 Exchange
Exchange exchange = new DirectExchange(EXCHANGE, false, false);
// 声明 Queue
Queue queue = new Queue(QUEUE, false);
// 声明 Binding
Binding binding = new Binding(
QUEUE,
Binding.DestinationType.QUEUE,
EXCHANGE,
ROUTING_KEY,
null
);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(binding);
}
}
TestController
@RestController
public class TestController {
@Autowired
private Producer producer;
@GetMapping("/test")
public String test() {
producer.initRabbit();
return "success";
}
}
启动 Spring Boot 项目,调用接口,我们可以在 RabbitMQ 管控台看到 Exchange,Queue,Binding 声明及创建成功:

除了手动调用 RabbitAdmin
方法这种方式以外,我们还可以通过 Spring Boot Config 声明式地完成队列,交换机,绑定关系的创建。
Spring-AMQP 充分地发挥了 Spring Boot 的 Convention Over Configuration ,即:约定优于配置的特性。我们可以通过 Spring Boot Config 将 RabbitAdmin
交给 Spring 管理,并声明式地将队列,交换机,绑定关系注册为 Bean,Spring Boot 会为我们优雅地完成这些组件的创建:
@Configuration
@Slf4j
public class RabbitConfig {
final String QUEUE = "queue.test";
final String EXCHANGE = "exchange.test";
final String ROUTING_KEY = "key.test";
/**
* 声明队列 queue.test
*
* @return
*/
@Bean
public Queue testQueue() {
return new Queue(QUEUE);
}
/**
* 声明交换机 exchange.test
*
* @return
*/
@Bean
public Exchange testExchange() {
return new DirectExchange(EXCHANGE);
}
/**
* 声明绑定关系
*
* @return
*/
@Bean
public Binding testBinding() {
return new Binding(QUEUE,
Binding.DestinationType.QUEUE,
EXCHANGE,
ROUTING_KEY,
null);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
如上面的代码所示,我们将 RabbitAdmin
注册为一个 Bean,交给 Spring 管理;并将 Exchange,Queue,Binding 都声明为了 Bean。
我们从 RabbitMQ 管控台中将队列,交换机删除后,启动 Spring Boot 项目。
项目启动完成,回到 RabbitMQ 管控台,我们可以发现,Spring Boot “神奇地”创建了我们在 Spring Boot Config 中声明的交换机,队列,以及绑定关系:

Spring-AMQP 是如何做到通过 Spring Boot Config 声明式创建 Exchang,Queue,Binding 的?
查看源代码,我们可以看到 RabbitAdmin
实现了多个接口,其中便有 ApplicationContextAware
与 InitializingBean
接口。

ApplicationContextAware
接口的作用就是获取应用上下文资源,实现了该接口的 Bean 便可以拿到 Spring 容器(ApplicationContext)。
而 InitializingBean
则是一个 Bean 的生命周期接口,对应一个 Bean 的初始化阶段。
在往期文章《Java 面试八股文之框架篇(二)》中,我向大家介绍了 Spring Bean 的完整生命周期,其中便有关于 InitializingBean
接口的解读。

该接口只有一个 afterPropertiesSet()
方法,当一个 Bean 实现了 InitializingBean
接口,那么在这个 Bean 的初始化阶段,便会自动调用 afterPropertiesSet()
方法,执行其初始化的逻辑。
我们跟随源码,来到 RabbitAdmin
实现的 afterPropertiesSet()
方法中,便会看到方法内有如下逻辑:

this.connectionFactory.addConnectionListener
该方法的作用是为 ConnectionFactory
添加连接监听器,一旦发现有连接,即会回调 Lambda 表达式内的逻辑。
进入到上图中红框圈出的 initialize()
方法:

由于 RabbitAdmin
实现了 ApplicationContextAware
接口,所以它可以获取到整个 Spring 上下文。在逻辑中,我们看到,它获取到了上下文中所有类型为 Exchange
,Queue
,Binding
的 Bean。

获取到这些 Bean 后,RabbitAdmin
便使用如上方式,对 Exchange,Queue,Binding 进行了声明与创建。
总结归纳:
-
RabbitAdmin
实现了ApplicationContextAware
接口与InitializingBean
接口 -
RabbitAdmin
在初始化方法afterPropertiesSet()
中,首先获取到 Spring 容器中,所有类型为Exchange
,Queue
,Binding
的 Bean,接着对其进行声明与创建;所以,我们可以通过 Spring Boot Config 声明式创建 Exchang,Queue,Binding 。
3. RabbitTemplate
RabbitTemplate 发送消息的方法:send 与 convertAndSend 的区别是什么?
在上文中,我们了解了 Spring-AMQP 的核心组件——RabbitAdmin
,知道了该如何使用 RabbitAdmin
连接配置客户端,并声明交换机,消息队列与绑定关系。本小节,我将向大家继续讲解 Spring-AMQP 另一个重要的核心组件——RabbitTemplate
。
RabbitTemplate
主要功能为收发消息,但是通常我们只使用其消息发送的功能。发送消息的方法为:
-
send
-
convertAndSend
先来看一下 send
方法的基本使用,示例代码如下:
RabbitConfig
@Configuration
@Slf4j
public class RabbitConfig {
final String QUEUE = "queue.test";
final String EXCHANGE = "exchange.test";
final String ROUTING_KEY = "key.test";
/**
* 声明队列 queue.test
*
* @return
*/
@Bean
public Queue testQueue() {
return new Queue(QUEUE);
}
/**
* 声明交换机 exchange.test
*
* @return
*/
@Bean
public Exchange testExchange() {
return new DirectExchange(EXCHANGE);
}
/**
* 声明绑定关系
*
* @return
*/
@Bean
public Binding testBinding() {
return new Binding(QUEUE,
Binding.DestinationType.QUEUE,
EXCHANGE,
ROUTING_KEY,
null);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
我们依旧使用 Spring Boot Config,将 RabbitTemplate
注册为 Bean,交给 Spring 管理。
Producer
@Service
@Slf4j
public class Producer {
final String QUEUE = "queue.test";
final String EXCHANGE = "exchange.test";
final String ROUTING_KEY = "key.test";
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
String messageToSend = "test message";
MessageProperties messageProperties = new MessageProperties();
// 设置单条消息 TTL 为 1 min
messageProperties.setExpiration("60000");
Message message = new Message(messageToSend.getBytes(), messageProperties);
CorrelationData correlationData = new CorrelationData();
rabbitTemplate.send(
EXCHANGE,
ROUTING_KEY,
message,
correlationData
);
log.info("message sent");
}
}
在 Producer
类中,我们使用了 rabbitTemplate.send
方法,发送了一条消息。
该方法(全参数重载)的第一个参数指定了交换机的名称;第二个参数为路由键名称;第三个参数接收一个 Message
对象:
Message message = new Message(messageToSend.getBytes(), messageProperties);
构建 Message
的第一个参数为消息体,消息体是一个 byte
数组,第二个参数为 MessageProperties
对象,该对象可以指定消息携带属性。示例代码中,我们指定了消息的 TTL,即失效时间为 1 分钟;send
方法的最后一个参数为一个 CorrelationData
对象,每一个发送的消息都要配备一个 CorrelationData
对象,该对象内部仅有一个 id 属性,用来表示当前消息的唯一性。
我们也可以手动指定这条消息唯一的 id,譬如:
CorrelationData correlationData = new CorrelationData(order.getID().toString()));
真实的业务场景中,我们一般会通过某种方式(譬如写入数据库)记录下这个 id,用来做纠错与对账。如果不指定 id,那么生成的 CorrelationData
对象将使用 UUID
来作为唯一 id。
TestController
@RestController
public class TestController {
@Autowired
private Producer producer;
@GetMapping("/test")
public String send() {
producer.sendMessage();
return "success";
}
}
启动 Spring Boot 项目,调用接口。我们可以在 RabbitMQ 管控台中看到,消息发送成功:

而 convertAndSend
也是 RabbitTemplate
发送消息的方法之一。
convertAndSend
翻译为“转换并发送”。send
方法接收一个 Message
对象,convertAndSend
方法则可以直接传入一个对象,该对象将会在发送到 RabbitMQ Brocker 之前,被转换为 Message
对象。
来看示例代码:
@Service
@Slf4j
public class Producer {
final String QUEUE = "queue.test";
final String EXCHANGE = "exchange.test";
final String ROUTING_KEY = "key.test";
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
String messageToSend = "test message";
CorrelationData correlationData = new CorrelationData();
rabbitTemplate.convertAndSend(
EXCHANGE,
ROUTING_KEY,
messageToSend,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置单条消息 TTL 为 1 min
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setExpiration("60000");
return message;
}
},
correlationData
);
log.info("message sent");
}
}
convertAndSend
方法(全参数重载)的第一个参数指定了交换机的名称;第二个参数为路由键名称;第三个参数接收一个 Object
对象,示例代码中,我传入的是一个 String
字符串,如果你愿意,可以传入任何对象,前提是该对象需要实现 Serializable
接口;第四个参数为一个 MessagePostProcessor
对象,MessagePostProcessor
的作用是让消息对象发送到 RabbitMQ Brocker 之前来操作消息对象,譬如为消息设置属性;最后一个参数为 CorrelationData
。
启动 Spring Boot 项目,调用接口。我们依旧可以在 RabbitMQ 管控台中看到,消息发送成功:

那么,convertAndSend
方法是如何做到将对象自动转换为消息对象的呢?
我们进入到 convertAndSend
方法:

可以看到 convertAndSend
的本质就是调用了 send
进行消息发送,不过它在内部调用了一个 convertMessageIfNecessary
方法,将我们传入的对象转换为了 Message
消息对象。
进入到 convertMessageIfNecessary
方法:

该方法首先会判断对象是否为 Message
类型,如果是,则强转并返回;如果不是,则会调用 getRequiredMessageConverter().toMessage()
方法。
getRequiredMessageConverter()
方法会返回一个 MessageConverter
对象,也就是说,convertAndSend
方法其实就是调用了 MessageConverter
对象的 toMessage
方法,将我们传入的对象转换为 Message
对象,并调用 send
方法进行消息发送。
关于 MessageConverter
这一组件,我会在稍后进行详细的介绍,先来总结一下 send
与 convertAndSend
的区别:
-
首先,二者均为 RabbitTemplate
发送消息的方法。send
方法指定我们传入一个Message
对象;而convertAndSend
方法则可以直接传递一个对象,传入的对象需实现Serializable
接口。 -
convertAndSend
方法的本质就是调用了MessageConverter
的toMessage
方法,将我们传入的对象转换为Message
对象,并调用send
方法进行消息发送。
使用 RabbitTemplate 开启消息确认机制与消息返回机制
关于 RabbitMQ 消息确认机制与消息返回机制的相关内容我就不再赘述了,对这两个知识点有疑问的童鞋,可以看上一篇文章《Java 面试八股文之消息队列篇(一)》进行复习~
使用 RabbitTemplate
开启消息确认与消息返回机制的方法十分简单,我们只需在配置类中进行配置即可,方法如下:
RabbitConfig
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 设置开启消息确认类型为 CORRELATED
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
// 设置开启消息返回
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 开启消息返回机制
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returned -> {
// 说明消息不可达
log.info("message:{}", returned.getMessage().toString());
log.info("replyCode:{}", returned.getReplyCode());
log.info("replyText:{}", returned.getReplyText());
log.info("exchange:{}", returned.getExchange());
log.info("routingKey:{}", returned.getRoutingKey());
});
// 开启消息确认机制
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("send msg to Broker success");
log.info("correlationData : {}", correlationData);
} else {
log.info("send msg to Broker fail");
log.info("cause : {}", cause);
}
});
return rabbitTemplate;
}
4. SimpleMessageListenerContainer
SimpleMessageListenerContainer 原理
1. 异步线程实现消息监听
上文中,我们学习了如何使用 RabbitAdmin
连接配置客户端,以及声明交换机,队列,绑定关系;也学习了如何使用 RabbitTemplate
发送消息。那么接下来,我们需要做的便是让消费者监听队列,并消费消息了。
在学习 Spring-AMQP 的消息监听容器之前,我们不妨先思考一下,如果让你去实现消息监听,你会怎么做?
首先,消息监听肯定是一个异步操作,所以,我的想法是通过异步线程池来管理异步线程,让异步线程去调用消费消息的方法;其次,为了能让异步线程持续监听消息队列,我们需要某种手段让其维持监听状态,最简单的方法就是 while(true) Thread.sleep()
。
具体代码如下:
AsyncTaskConfig
@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 设置线程池的核心线程数
threadPoolTaskExecutor.setCorePoolSize(5);
// 设置线程池最大线程数
threadPoolTaskExecutor.setMaxPoolSize(10);
// 设置缓冲队列的长度
threadPoolTaskExecutor.setQueueCapacity(10);
// 设置线程池关闭时,是否要等待所有线程结束后再关闭
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
// 设置线程池等待所有线程结束的时间
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
// 设置所有线程的前缀名称
threadPoolTaskExecutor.setThreadNamePrefix("Rabbit-Async-");
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
AsyncTaskConfig
为线程池配置类,我在该配置类上使用了 @EnableAsync
注解,该注解的作用是使线程池执行器开启异步执行功能。
通常,@EnableAsync
注解与 @Async
注解合用,@Async
注解可以标注在方法上,也可以标注在类上。当 @Async
注解标注在方法上时,则指定该方法为异步执行;当标注在类上时,则指定该类上的所有方法均异步执行。当然,只有在线程池配置类中标注 @EnableAsync
注解,@Async
注解才会生效。
Consumer
@Service
@Slf4j
public class Consumer {
final String QUEUE = "queue.test";
@Autowired
private RabbitTemplate rabbitTemplate;
@Async
public void handleMessage() {
try (
Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
Channel channel = connection.createChannel(false);
) {
channel.basicConsume(QUEUE,
true,
deliverCallback,
consumerTag -> {
}
);
while (true) {
Thread.sleep(1000);
}
} catch (Exception e) {
log.error("error message : {}", e.getMessage());
}
}
DeliverCallback deliverCallback = (this::handle);
private void handle(String consumerTag, Delivery message) {
String messageBody = new String(message.getBody());
log.info("receive message");
log.info("consumerTag : {}", consumerTag);
log.info("messageBody : {}", messageBody);
}
}
Consumer
为消费者,在 handleMessage
方法上,我使用 @Async
注解,指定该方法为异步执行。方法内,我使用了 channel.basicConsume
来消费消息,并使用了 while(true) Thread.sleep()
来维持监听状态。
RabbitmqTutorialApplication
@SpringBootApplication
public class RabbitmqTutorialApplication implements ApplicationRunner {
@Autowired
private Consumer consumer;
public static void main(String[] args) {
SpringApplication.run(RabbitmqTutorialApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
consumer.handleMessage();
}
}
RabbitmqTutorialApplication
为 Demo 工程的启动类,启动类实现了 ApplicationRunner
接口(或者可以实现 CommandLineRunner
接口),并重写了 run
方法,其作用为:使我们可以在启动类启动之后做一些事情,我们做的事情便是调用 Consumer
的异步方法 handleMessage
,实现对消息队列的监听。
RabbitConfig
与 Producer
等代码不变,为了节省文章篇幅就不再赘述了。
启动 Spring Boot 项目,调用接口。我们可以在控制台输出中看到,Producer
消息发送成功,Consumer
监听到了消息,并成功消费了消息;重复调用接口,Consumer
依旧能够监听到消息,并成功消费消息:
2023-04-06 23:30:14.655 INFO 143 --- [nectionFactory1] c.d.r.config.RabbitConfig : send msg to Broker success
2023-04-06 23:30:14.655 INFO 143 --- [nectionFactory1] c.d.r.config.RabbitConfig : correlationData : CorrelationData [id=fb01bfcb-ddc7-495b-8c6c-251911f154a7]
2023-04-06 23:30:14.656 INFO 143 --- [pool-1-thread-4] c.d.rabbitmqtutorial.service.Consumer : receive message
2023-04-06 23:30:14.656 INFO 143 --- [pool-1-thread-4] c.d.rabbitmqtutorial.service.Consumer : consumerTag : amq.ctag-0YmVWOIrKlKA85833g7VeQ
2023-04-06 23:30:14.656 INFO 143 --- [pool-1-thread-4] c.d.rabbitmqtutorial.service.Consumer : messageBody : test message
2023-04-06 23:30:16.528 INFO 143 --- [nio-8080-exec-3] c.d.rabbitmqtutorial.service.Producer : message sent
2023-04-06 23:30:16.529 INFO 143 --- [nectionFactory1] c.d.r.config.RabbitConfig : send msg to Broker success
2023-04-06 23:30:16.529 INFO 143 --- [nectionFactory1] c.d.r.config.RabbitConfig : correlationData : CorrelationData [id=9bbf5139-76a3-4609-b03b-e73b9fbe5f7d]
2023-04-06 23:30:16.529 INFO 143 --- [pool-1-thread-5] c.d.rabbitmqtutorial.service.Consumer : receive message
2023-04-06 23:30:16.529 INFO 143 --- [pool-1-thread-5] c.d.rabbitmqtutorial.service.Consumer : consumerTag : amq.ctag-0YmVWOIrKlKA85833g7VeQ
2023-04-06 23:30:16.529 INFO 143 --- [pool-1-thread-5] c.d.rabbitmqtutorial.service.Consumer : messageBody : test message
2.SimpleMessageListenerContainer 的基本使用
在学习了异步线程调用这样一种逻辑后,我们来看一下 Spring-AMQP 提供的简单消息监听容器——SimpleMessageListenerContainer
。
SimpleMessageListenerContainer
的功能十分强大,其支持:
-
设置同时监听多个队列,自动启动,自动配置 RabbitMQ -
设置消费者数量(最大数量,最小数量,批量消费) -
设置消息确认模式,是否重回队列,异常捕获 -
设置是否独占,其他消费者属性等 -
设置具体的监听器,消息转换器等 -
支持动态设置,运行中修改监听器配置 -
... ...
来看示例代码:
RabbitConfig
@Configuration
@Slf4j
public class RabbitConfig {
final String QUEUE = "queue.test";
final String EXCHANGE = "exchange.test";
final String ROUTING_KEY = "key.test";
// ... ...
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
// 设置监听队列
messageListenerContainer.setQueueNames(QUEUE);
// 设置消费者线程数量
messageListenerContainer.setConcurrentConsumers(3);
// 设置最大的消费者线程数量
messageListenerContainer.setMaxConcurrentConsumers(5);
// 消费端开启手动确认
messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置消息监听器
messageListenerContainer.setMessageListener(new MyChannelAwareMessageListener());
// 消费端限流
messageListenerContainer.setPrefetchCount(20);
return messageListenerContainer;
}
}
MyChannelAwareMessageListener
@Slf4j
public class MyChannelAwareMessageListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
log.info("receive message:{}", new String(message.getBody()));
// 消费端手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重回队列
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 拒绝消息
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
我们依旧使用 Spring Boot Config 的方式,将 SimpleMessageListenerContainer
这一组件注册为 Bean,交给 Spring 管理。
在代码中,我设置了 SimpleMessageListenerContainer
监听的队列,消费者线程数量,并设置了消费端手动确认,开启了消费端限流;同时,我为其设置了自定义的消息监听器:MyChannelAwareMessageListener
,该消息监听器实现了 ChannelAwareMessageListener
接口。
我们通常会使用两种消息监听器的实现:
-
ChannelAwareMessageListener
-
MessageListener
这二者的不同点在于,MessageListener
只能获取到 Message
信息,而 ChannelAwareMessageListener
则可以获取到 Message
与 Channel
的信息,我们便可以通过 Channel
开启消费端手动确认等设置。
启动 Spring Boot 项目,调用接口。我们可以在控制台输出中看到,消息被成功监听且消费:
2023-04-08 23:01:08.442 INFO 7713 --- [nio-8080-exec-1] c.d.rabbitmqtutorial.service.Producer : message sent
2023-04-08 23:01:08.444 INFO 7713 --- [enerContainer-2] c.d.r.c.MyChannelAwareMessageListener : receive message:test message
2023-04-08 23:01:08.450 INFO 7713 --- [nectionFactory1] c.d.r.config.RabbitConfig : send msg to Broker success
2023-04-08 23:01:08.450 INFO 7713 --- [nectionFactory1] c.d.r.config.RabbitConfig : correlationData : CorrelationData [id=9a48e245-8930-4047-94c2-dbac8fdfb583]
那么,SimpleMessageListenerContainer
是如何做到的呢?接下来,便由我带大家一探究竟~
3. SimpleMessageListenerContainer 原理

上图便是 SimpleMessageListenerContainer
继承链的 UML 类图。我们可以看到,它最终实现了 Lifecycle
接口。Lifecycle
用于对一个 Bean 生命周期的控制操作,该接口共定义了三个方法:
public interface Lifecycle {
void start();
void stop();
boolean isRunning();
}
start/stop
对应启动与停止,isRunning
用于判断组件是否正在运行;当一个 Bean 实现了 Lifecycle
接口时,在 Bean 初始化完毕后,Spring 会自动调用该组件实现的 start
方法。
SimpleMessageListenerContainer
的 start
方法在父类 AbstractMessageListenerContainer
中实现:

可以看到,AbstractMessageListenerContainer
的 start
方法中,调用了 doStart
方法;而 SimpleMessageListenerContainer
对父类的 doStart
方法进行了重写:

我们顺着 doStart
的逻辑往下走,着重地看一下我在上图中用红框圈出的代码。
首先,在 doStart
方法内,调用了 initializeConsumers
方法,我们直观地理解该方法的含义为:对消费者进行初始化。
进入到 initializeConsumers
方法:

initializeConsumers
方法内调用了 createBlockingQueueConsumer
方法来创建消费者对象,消费者对象的类型为 BlockingQueueConsumer
;BlockingQueueConsumer
是 Spring AMQP 定义的消费者类,它具有自己的生命周期(包含 start
与 stop
方法)。
回到 doStart
方法。在初始化消费者对象后,代码里创建了一个存储类型为 AsyncMessageProcessingConsumer
的 HashSet
对象 processors
;所有的 BlockingQueueConsumer
对象重新包装成了 AsyncMessageProcessingConsumer
对象,然后被 add
进 processores
中,接着调用了 getTaskExecutor().execute(processor)
方法。
AsyncMessageProcessingConsumer
翻译为 “支持异步消息处理的消费者”;它是 SimpleMessageListenerContainer
的一个内部类,其实现了 Runnable
接口;而 getTaskExecutor()
方法将会返回一个 Executor
即:异步任务执行器,异步任务执行器的 execute
方法需要传入一个实现 Runnable
接口的“任务对象”,它会直接调用实现了 Runnable
接口的“任务对象”的 run
方法。
那么这段逻辑做的事情,翻译成人话就是:将 BlockingQueueConsumer
类型的消费者包装为实现了 Runnable
接口,可异步处理消息的 AsyncMessageProcessingConsumer
,并丢进异步线程池中执行。
我们进入 AsyncMessageProcessingConsumer
实现的 run
方法,直接来看代码中的核心逻辑:
// ... ...
try {
initialize();
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
}
// ... ...
核心逻辑中主要做了两个关键操作:
-
initialize
-
mainLoop
首先,我们来看一下 initialize
初始化操作中做了什么:

在初始化逻辑中,调用了 BlockingQueueConsumer
生命周期的 start
方法,进入 start
方法:

start
方法中,有两个主要操作;passiveDeclarations
方法的作用为校验监听队列是否存在,setQosAndCreateConsumers
方法用于为客户端设置 Qos
以及消息订阅。进入到 setQosAndCreateConsumers
方法,我们可以看到其内部调用了 consumeFromQueue
方法,进入到 consumeFromQueue
方法:

该方法使用了 channel.basicConsume
订阅消息,它向 RabbitMQ Broker 发送指令,就相当于告诉服务器,我已经准备好了,如果监听队列有消息,你就把它推送给我~
RabbitMQ Brocker 接收到客户端发送的指令后,便会向客户端反馈,它通过 Basic.Deliver
指令类型将消息推送给客户端,一条消息对应一个 Basic.Deliver
指令的反馈,客户端收到服务端返回的指令后,将通过 ChannelN
这个类的 processAsync
方法进行处理,最终,将会调用 BlockingQueueConsumer
内部类 InternalConsumer
的 handleDelivery
方法:

handleDelivery
方法的核心操作就是将向 Brocker 发送的消息 offer
至 BlockingQueueConsumer
对象存储的队列 queue
中。
看完了 initialize
方法做的事情后,我们来看第二个核心操作 mainLoop
。
mainLoop
外层通过一个 while
无限循环套用,它做的事情就是从队列 queue
拿消息,并经过一系列操作最终传递到用户实现的 MessageListener
中。

mainLoop
的核心逻辑为 receiveAndExecute
,receiveAndExecute
方法内部调用了 doReceiveAndExecute
方法,doReceiveAndExecute
有两处重要的逻辑:
-
consumer.nextMessage
-
executeListener

我们可以看到,nextMessage
方法的本质就是通过 queue.poll
来获取下一条消息的。
接着,我们进入到 executeListener
方法,不断跟踪,可以看到,该方法最终调用了 doInvokeListener
方法:

而 doInvokeListener
正是调用了我们实现的 MessageListener
的 onMessage
方法。
至此为止,我们也就清楚了 SimpleMessageListenerContainer
的原理,总结:
-
SimpleMessageListenerContainer
实现了Lifecycle
接口,在它完成初始化后,Spring 会自动调用它的start
方法;SimpleMessageListenerContainer
的父类实现了start
方法,而在start
方法中会调用SimpleMessageListenerContainer
的doStart
方法。 -
doStart
中做了两件事,首先就是调用了initializeConsumers
方法,对消费者进行初始化,生成了BlockingQueueConsumer
;然后,就是将BlockingQueueConsumer
类型的消费者包装为实现了Runnable
接口,可异步处理消息的AsyncMessageProcessingConsumer
,并丢进异步线程池中执行。 -
异步线程池会调用 AsyncMessageProcessingConsumer
的run
方法。run
方法中,首先就是调用了initialize
方法,initialize
主要的作用就是为客户端设置 Qos 以及消息订阅等操作,当 RabbitMQ 服务端收到客户端发送的指令后,会将消息推送给客户端,本质便是将消息缓存到队列中(queue.offer()
);第二个核心的操作就是mainLoop
操作,mainLoop
外层通过一个while
无限循环套用,它做的事情就是从队列queue
拿消息(queue.poll()
),并经过一系列操作最终传递并调用到用户实现的MessageListener
的onMessage
方法中。
5. MessageListenerAdapter
MessageListenerAdapter 的基本使用
MessageListenerAdapter
即:消息监听适配器。话不多说,我们先来看一下它的基本使用:
MessageDelegate
@Slf4j
@Component
public class MessageDelegate {
public void handleMessage(byte[] msgBody) {
log.info("invoke handleMessage,msgBody : {}", new String(msgBody));
}
}
首先,我定义了一个负责消息处理的代理类 MessageDelegate
,该类有一个方法 handleMessage
,方法传参为 byte
类型的数组。对消息处理的逻辑也很简单,只是打印日志。
RabbitConfig
@Configuration
@Slf4j
public class RabbitConfig {
final String QUEUE = "queue.test";
final String EXCHANGE = "exchange.test";
final String ROUTING_KEY = "key.test";
@Resource
private MessageDelegate messageDelegate;
// ... ...
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
// 设置监听队列
messageListenerContainer.setQueueNames(QUEUE);
// 设置消费者线程数量
messageListenerContainer.setConcurrentConsumers(3);
// 设置最大的消费者线程数量
messageListenerContainer.setMaxConcurrentConsumers(5);
// 消费端开启手动确认
messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 消费端限流
messageListenerContainer.setPrefetchCount(20);
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
// 设置代理
messageListenerAdapter.setDelegate(messageDelegate);
// 设置消息监听器
messageListenerContainer.setMessageListener(messageListenerAdapter);
return messageListenerContainer;
}
}
在 RabbitConfig
配置类中,我将 MessageDelegate
组件注入,并通过如上方式进行了设置;我们可以看到,MessageListenerAdapter
对象可以被传入到 messageListenerContainer.setMessageListener()
方法中,这便说明了 MessageListenerAdapter
本质上就是消息监听器。
通过 UML 类图,也可以直观地看出,MessageListenerAdapter
实现了 MessageListener
接口:

启动 Spring Boot 项目,调用接口。可以在控制台输出中看到,我们自定义的 MessageDelegate
的 handleMessage
方法被成功调用了:
2023-04-10 19:33:17.038 INFO 50607 --- [nio-8080-exec-1] c.d.rabbitmqtutorial.service.Producer : message sent
2023-04-10 19:33:17.040 INFO 50607 --- [enerContainer-1] c.d.r.delegate.MessageDelegate : invoke handleMessage,msgBody : test message
2023-04-10 19:33:17.087 INFO 50607 --- [nectionFactory1] c.d.r.config.RabbitConfig : send msg to Broker success
2023-04-10 19:33:17.087 INFO 50607 --- [nectionFactory1] c.d.r.config.RabbitConfig : correlationData : CorrelationData [id=25d0b20d-4bbd-4ff7-9a54-41d9385bea83]
MessageListenerAdapter 原理
上文中,我介绍了 MessageListenerAdapter
的一个基本使用,大家可能会好奇:Spring AMQP 是如何找到并调用代理的 handleMessage
方法处理消息的?
我们知道,MessageListenerAdapter
本质就是一个消息监听器。在为 SimpleMessageListenerContainer
设置消息监听器后,它会回调用户实现的 onMessage
方法来处理消息,所以,MessageListenerAdapter
也必然实现了 onMessage
方法:

上图所示的代码,便是 MessageListenerAdapter
实现的 onMessage
方法,其中有两处重要的逻辑,我已经使用红框圈出,第一处为 getListenerMethodName
方法,第二处为 invokeListenerMethod
方法。
进入到 getListenerMethodName
方法:

我们看到,该方法首先会对 queueOrTagToMethodName
这个 HashMap
进行判断,如果该 HashMap
的 size
不为空,则会走 if
中的逻辑;由于我们的代码中未设置该参数,所以将会走 getDefaultListenerMethod
的逻辑。
getDefaultListenerMethod
方法直接返回一个 defaultListenerMethod
常量,这个常量便是 handleMessage
,而 handleMessage
便是 MessageListenerAdapter
默认调用的方法名:

第一处逻辑是通过 getListenerMethodName
方法获取到了监听方法名;那么第二处逻辑想必大家已经猜测到其含义了,invokeListenerMethod
方法会通过拿到的方法名,使用 org.springframework.util
包下的 MethodInvoker
工具类的 invoke
方法进行回调,其本质便是使用了 Java 的反射技术。因为篇幅限制,这里我就不带着大家一步一步地进行代码跟踪了~
也许有人会问,难道 MessageListenerAdapter
只能识别用户中介类的 handleMessage
方法么?我们是否可以自定义方法,让 Spring AMQP 进行调用呢?
答案当然是可以的,还记得 getListenerMethodName
中的逻辑么?该方法首先会对 queueOrTagToMethodName
这个 HashMap
进行判断,如果 HashMap
的 size
为空才会走 getDefaultListenerMethod
方法,所以,我们自然是可以通过设置 queueOrTagToMethodName
,来使 Spring AMQP 识别到我们自定义的消息监听方法,代码如下:
RabbitConfig
@Configuration
@Slf4j
public class RabbitConfig {
final String QUEUE = "queue.test";
final String EXCHANGE = "exchange.test";
final String ROUTING_KEY = "key.test";
@Resource
private MessageDelegate messageDelegate;
// ... ...
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
// 设置监听队列
messageListenerContainer.setQueueNames(QUEUE);
// 设置消费者线程数量
messageListenerContainer.setConcurrentConsumers(3);
// 设置最大的消费者线程数量
messageListenerContainer.setMaxConcurrentConsumers(5);
// 消费端开启手动确认
messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 消费端限流
messageListenerContainer.setPrefetchCount(20);
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
// 设置自定义方法名
Map<String, String> map = new HashMap<>(8);
map.put(QUEUE, "handle");
messageListenerAdapter.setQueueOrTagToMethodName(map);
// 设置代理
messageListenerAdapter.setDelegate(messageDelegate);
// 设置消息监听器
messageListenerContainer.setMessageListener(messageListenerAdapter);
return messageListenerContainer;
}
}
在代码中,我通过 MessageListenerAdapter
对象的 setQueueOrTagToMethodName
方法设置了 queueOrTagToMethodName
这个 HashMap
,HashMap
的键为队列名称,值为方法名称,示例代码中规定 MessageListenerAdapter
回调的监听方法名为 handle
。
MessageDelegate
@Slf4j
@Component
public class MessageDelegate {
public void handleMessage(byte[] msgBody) {
log.info("invoke handleMessage,msgBody : {}", new String(msgBody));
}
public void handle(byte[] msgBody) {
log.info("invoke handle,msgBody : {}", new String(msgBody));
}
}
启动 Spring Boot 项目,调用接口。可以在控制台输出中看到,我们自定义的 MessageDelegate
的 handle
方法被成功调用了:
2023-04-11 15:40:12.566 INFO 56939 --- [nio-8080-exec-1] c.d.rabbitmqtutorial.service.Producer : message sent
2023-04-11 15:40:12.577 INFO 56939 --- [enerContainer-1] c.d.r.delegate.MessageDelegate : invoke handle,msgBody : test message
2023-04-11 15:40:12.599 INFO 56939 --- [nectionFactory1] c.d.r.config.RabbitConfig : send msg to Broker success
2023-04-11 15:40:12.599 INFO 56939 --- [nectionFactory1] c.d.r.config.RabbitConfig : correlationData : CorrelationData [id=faff2552-52bd-47a2-8e90-110ee6106d5f]
总结:
-
MessageListenerAdapter
本质就是一个消息监听器。 -
在 MessageListenerAdapter
实现的onMessage
方法中,首先会通过getListenerMethodName
来获取消息监听的方法名,默认为handleMessage
;在拿到方法名后,它便会通过反射,来回调方法。 -
我们可以通过设置 queueOrTagToMethodName
来自定义MessageListenerAdapter
回调的监听方法名。
6. MessageConverter
Jackson2JsonMessageConverter
在讲解 RabbitTemplate
时,我们已经简单地了解了 MessageConverter
这一组件了。
RabbitTemplate
的 convertAndSend
方法的本质便是调用了 MessageConverter
组件的 toMessage
方法,将我们传入的 Object
对象转换为 Message
对象,并调用 send
方法来进行消息发送。
MessageConverter
翻译为“消息转换器”。在之前,我们进行收发消息时,都会使用byte
数组作为消息体,但是,在真实的业务场景下,我们通常需要使用 Java 对象来作为消息体,这时,我们就可以使用 MessageConverter
在收发消息时,对消息进行转换。
Jackson2JsonMessageConverter
是最常用的 MessageConverter
,顾名思义,它的作用是用来转换 JSON 格式消息的,而 JSON 又是我们做数据传输时,最常见的一种格式。
代码示例 🌰:
Order
@Getter
@Setter
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Order implements Serializable {
private String orderId;
private Double price;
}
Order
为订单类,是我创建的一个实体类。
JSONUtils
public class JSONUtils {
/**
* 将 Java 对象序列化为 JSON 字符串
*
* @return
*/
public static String objectToJson(Object obj) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}
/**
* 将 JSON 字符串反序列化为 Java 对象
*
* @param jsonStr
* @return
*/
public static Object jsonToObject(String jsonStr, Class<?> clazz) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(jsonStr, clazz);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}
}
JSONUtils
是一个可以将对象与 JSON 进行相互转换的工具类。
Producer
@Service
@Slf4j
public class Producer {
final String QUEUE = "queue.test";
final String EXCHANGE = "exchange.test";
final String ROUTING_KEY = "key.test";
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
// String messageToSend = "test message";
Order order = new Order().builder()
.orderId("111")
.price(888.8)
.build();
String json = JSONUtils.objectToJson(order);
CorrelationData correlationData = new CorrelationData();
rabbitTemplate.convertAndSend(
EXCHANGE,
ROUTING_KEY,
json,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置单条消息 TTL 为 1 min
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setContentType("application/json");
messageProperties.setExpiration("60000");
return message;
}
},
correlationData
);
log.info("message sent");
}
}
Producer
为发送消息的生产者。代码中,首先,我创建了一个订单对象,并将订单对象转换为一个 JSON;然后,我将 JSON 作为消息体,并使用 rabbitTemplate.convertAndSend
方法进行消息发送。在方法内,我对 MessageProperties
的 ContentType
进行了设置,将其设置为 "application/json"
,这一点是需要注意的,如果没有设置 ContentType
,由于发送的消息体为 String
字符串,默认的 ContentType
则默认为 "text/plain"
,消息就会被当作 String
字符串发送。
RabbitConfig
@Configuration
@Slf4j
public class RabbitConfig {
final String QUEUE = "queue.test";
final String EXCHANGE = "exchange.test";
final String ROUTING_KEY = "key.test";
@Resource
private MessageDelegate messageDelegate;
// ... ...
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
// 设置监听队列
messageListenerContainer.setQueueNames(QUEUE);
// 设置消费者线程数量
messageListenerContainer.setConcurrentConsumers(3);
// 设置最大的消费者线程数量
messageListenerContainer.setMaxConcurrentConsumers(5);
// 消费端开启手动确认
messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 消费端限流
messageListenerContainer.setPrefetchCount(20);
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
// 设置自定义方法名
Map<String, String> map = new HashMap<>(8);
map.put(QUEUE, "handle");
messageListenerAdapter.setQueueOrTagToMethodName(map);
// 设置代理
messageListenerAdapter.setDelegate(messageDelegate);
// 设置消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter("*");
jackson2JsonMessageConverter.setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class<?> clazz, MessageProperties properties) {
}
@Override
public Class<?> toClass(MessageProperties properties) {
return Order.class;
}
});
messageListenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
// 设置消息监听器
messageListenerContainer.setMessageListener(messageListenerAdapter);
return messageListenerContainer;
}
}
在 RabbitConfig
配置类中,我为 messageListenerAdapter
设置了 Jackson2JsonMessageConverter
消息转换器。转换器可以通过设置 ClassMapper
指定 JSON 到具体类的映射。在代码中,我重写了 ClassMapper
的 toClass
方法,使 JSON 类型的消息映射为 Order
类型,这样我们便可以在消息监听方法中,使用 Order
类型来接收消息。如果不设置 ClassMapper
,那么即默认接收消息的类型为 Map
。
MessageDelegate
@Slf4j
@Component
public class MessageDelegate {
public void handle(Order order) {
log.info("invoke handle, msg : {}", order);
}
}
由于我们在消息转换器中已经指定了消息从 JSON 到 Order
的映射,所以,消息监听方法 handle
的传参类型为 Order
。
启动 Spring Boot 项目,调用接口,消息收发成功,控制台输出内容如下:
2023-04-15 21:56:26.631 INFO 99952 --- [nio-8080-exec-1] c.d.rabbitmqtutorial.service.Producer : message sent
2023-04-15 21:56:26.668 INFO 99952 --- [enerContainer-3] c.d.r.delegate.MessageDelegate : invoke handle, msg : Order(orderId=111, price=888.8)
2023-04-15 21:56:26.682 INFO 99952 --- [nectionFactory1] c.d.r.config.RabbitConfig : send msg to Broker success
2023-04-15 21:56:26.683 INFO 99952 --- [nectionFactory1] c.d.r.config.RabbitConfig : correlationData : CorrelationData [id=deded32a-a28e-449e-8455-3a8f6ef5248b]
如果在你的项目中,需要自定义消息转换器,那便可以通过实现 MessageConverter
接口,重写 toMessage
与 fromMessage
方法来完成。这部分内容就不再赘述了~
RabbitListener
RabbitListener
是 Spring AMQP 消费者监听消息的终极方案,它的使用方法非常简单,我们仅需在消息处理类或指定方法上添加 @RabbitListener
注解即可。
当 @RabbitListener
注解标注于方法之上时,该方法便是一个可用于消息监听的监听器;当 @RabbitListener
注解标注于类上,则需要同另一个注解 @RabbitHandler
一同配合使用。这种使用方式可用于实现监听不同队列,不同类型消息的处理,类中标注了 @RabbitHandler
的方法就是可用于消息监听的监听器。RabbitListener
基本上可以满足开发者 90% 以上的业务开发需求。
关于 RabbitListener
的基本使用方法,因为文章篇幅限制,就不再赘述了,我们来重点探讨它的原理。
RabbitListener 原理
1.RabbitAutoConfiguration
我们应该都知道 Spring Boot 自动配置的原理——当 Spring Boot 项目启动时,会通过 @EnableAutoConfiguration
注解定位到 META-INF/spring.factories 文件中,并获取所有 EnableAutoConfiguration
属性的值。这些值便是 Spring Boot 要执行的自动配置类。这些配置类通常以 XXXAutoConfiguration
这种形式来命名的。
RabbitAutoConfiguration
便是 Spring AMQP 的自动配置类:

我来解释一下 RabbitAutoConfiguration
配置类上这些注解的含义:
-
@AutoConfiguration
注解就不必多说了,它是自动配置注解。 -
@ConditionalOnClass
注解是一个条件注解,其含义为当 ClassPath 路径下有指定类时,才会去扫描解析当前的自动配置类;所以,只有 ClassPath 路径下存在RabbitTemplate
和Channel
这两个类时,才会去扫描解析RabbitAutoConfiguration
自动配置类。 -
@EnableConfigurationProperties
注解的作用是使标注了@ConfigurationProperties
注解的类生效;源代码的含义便是使RabbitProperties
类生效,RabbitProperties
类上标注了@ConfigurationProperties
注解,其作用是获取配置文件中所有以spring.rabbitmq
的属性 -
@Import
注解的作用是引入其他的配置类,源代码中,该注解引入了RabbitAnnotationDrivenConfiguration
配置类。
接下来,我们进入到 RabbitAnnotationDrivenConfiguration
配置类:

我们看到,在 RabbitAnnotationDrivenConfiguration
类上也有一个 @ConditionalOnClass
注解,其含义为当 ClassPath 路径下有指定类 EnableRabbit
时,才会去扫描解析当前的配置类。进入到 EnableRabbit
类:

我们可以看到 EnableRabbit
引入了 RabbitListenerConfigurationSelector
,像这种 XXXConfigurationSelector
的主要作用是收集需要导入的配置类,进入到 RabbitListenerConfigurationSelector
:
RabbitListenerConfigurationSelector
的 selectImports
方法返回了一个字符串数组,该字符串数组中有两个配置类名,分别是 RabbitBootstrapConfiguration
与 MultiRabbitBootstrapConfiguration
,这两个配置类便是 EnableRabbit
需要导入的配置类 ;进入到 RabbitBootstrapConfiguration
类中:

我们看到 RabbitBootstrapConfiguration
配置类的 registerBeanDefinitions
方法的逻辑就是向注册表注册两个 BeanDefinition
实例。
BeanDefinition
就是对 Bean
的一个定义,它保存了关于一个 Bean
的各种信息,注册的过程就是将 Bean
定义为 BeanDefinition
,然后放入至 Spring 容器中。
而这两个 BeanDefinition
则是 RabbitListenerAnnotationBeanPostProcessor
和 RabbitListenerEndpointRegistry
。
2.RabbitListenerAnnotationBeanPostProcessor
BeanPostProcessor
是 Spring 容器提供的一个重要接口。从字面的意思来看,BeanPostProcessor
翻译为 Bean 的后置处理器。当一个 Bean 实现了 BeanPostProcessor
接口,重写 postProcessBeforeInitialization
与 postProcessAfterInitialization
方法,就可以在 Bean 的初始化方法的前后,进行一些特殊的逻辑处理。
RabbitListenerAnnotationBeanPostProcessor
便实现了 BeanPostProcessor
接口。
先来看一下 RabbitListenerAnnotationBeanPostProcessor
的 postProcessBeforeInitialization
方法:

我们看到,在该 Bean 初始化之前,postProcessBeforeInitialization
方法并未对其作任何处理。
接着,我们来到 postProcessAfterInitialization
方法:

该方法中,重点的逻辑有三处,在上图中我已经使用红框圈出。
第一处逻辑是 buildMetadata
方法,该方法我就不带领大家一起深入跟踪了。
它的作用是对所有标注 @RabbitListener
注解的类与方法,以及标注了 @RabbitHandle
注解的方法进行获取解析,并封装到 TypeMetadata
对象中,然后,将 TypeMetadata
对象交给 processAmqpListener
与 processMultiMethodListeners
方法处理。
processAmqpListener
会对标注了 @RabbitListener
注解的方法进行解析;processMultiMethodListeners
则会对标注了 @RabbitListener
注解的类中,标注了 @RabbitHandle
注解的方法进行解析。这两个方法的逻辑大体相同,我们就来看下 processAmqpListener
方法:

在该方法中,首先调用了 checkProxy
方法,其作用是对 JDK 动态代理的情况进行检查,检查代理的目标接口是否含有对应方法;然后,便创建了一个 MethodRabbitListenerEndpoint
对象,将该对象与其他参数一同传入 processListener
方法。
继续进入到 processListener
方法:

processListener
方法前面的逻辑很简单,它会对 @RabbitListener
注解的属性进行校验,并设置到 MethodRabbitListenerEndpoint
对象中,最后它将调用上图红框中的逻辑 :this.registrar.registerEndpoint
,registrar
是 RabbitListenerEndpointRegistrar
类的对象。
3.RabbitListenerEndpointRegistrar
RabbitListenerEndpointRegistrar
是一个工具类,该类的说明中写道:RabbitListenerEndpointRegistrar
的作用是将 RabbitListenerEndpoint
注册到 RabbitListenerEndpointRegistry
中。
进入到 registerEndpoint
方法:

在该方法的逻辑中,首先会创建一个 AmqpListenerEndpointDescriptor
对象,AmqpListenerEndpointDescriptor
是对 MethodRabbitListenerEndpoint
与 RabbitListenerContainerFactory
的封装。创建了该类的对象后,接着就来到了一个同步代码块中,由于 startImmediately
值为 false
,所以同步代码块会走 this.endpointDescriptors.add(descriptor)
的逻辑。endpointDescriptors
是 AmqpListenerEndpointDescriptor
的 List
列表。
RabbitListenerEndpointRegistrar
实现了 InitializingBean
接口,我们接下来看一下它的 afterPropertiesSet
初始化方法:

我们可以看到,在 afterPropertiesSet
初始化方法中,调用了 registerAllEndpoints
方法,registerAllEndpoints
方法中也有一个同步代码块,代码块中的逻辑为循环 endpointDescriptors
列表,然后调用 this.endpointRegistry.registerListenerContainer
这段逻辑;整个循环结束后,将 startImmediately
这一变量设置为 true
。
4.RabbitListenerEndpointRegistry
接着,我们来到 RabbitListenerEndpointRegistry
类的 registerListenerContainer
方法:


该方法的主要作用便是为已经注册的 RabbitListenerEndpoint
创建 MessageListenerContainer
实例。
RabbitListenerEndpointRegistry
实现了 SmartLifecycle
接口,那么自然地,我们就要去看一下它的 start
方法:

start
方法中调用了 startIfNecessary
方法,进入 startIfNecessary
方法:

最终,我们看到了,在 startIfNecessary
中,调用了 MessageListenerContainer
的 start
方法,而后面的内容,便衔接到了 SimpleMessageListenerContainer
原理的部分。
至此,我们也终于明白了 @RabbitListener
注解其原理的精妙所在。
总结
在今日的文章分享中,我向大家简单介绍了 Spring AMQP 中重要的核心组件以及这些组件的原理。
通过这篇文章,你需要知晓的内容有:
-
什么是消息队列?消息队列有什么作用? -
什么是 AMQP 协议?Direct,Fanout,Topic 三种 Exchange 有什么区别? -
RabbitMQ 如何保证消息的可靠性? -
请介绍一下 RabbitMQ 发送端的 Confirm 机制? -
请介绍一下 RabbitMQ 消息返回机制? -
请介绍一下 RabbitMQ 消费端的确认机制? -
消费端的 basicConsume
方法中,将autoAck
参数设置为true
会有什么问题? -
消费端的限流机制有什么作用? -
如何为消息设置过期时间? -
什么是死信队列? -
哪些情况下,一条消息会被标记为死信?
好啦,至此为止,这篇文章就到这里了,Java 面试八股文系列后面依旧会继续更新并查缺补漏,感谢您的阅读与支持~~
欢迎大家关注我的公众号,在这里希望你可以收获更多的知识,我们下一期再见!
作者介绍
