高级消费队列RabbitMq入门
# 前言
说到消息队列,现在主流的很多,今天的主角 RabbitMQ,是一套基于 AMQP 协议的开源消息代理软件,编写语言是 Erlang。本文将讲解 RabbitMQ 入门,从安装到使用到代码。
# 1.什么是消息队列
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。通俗的来讲,消息队列就是生产者生产消息,消费者监听到消息做各自的业务操作,也就是消费消息的过程。
# 2.下载与安装
# 2.1安装 Erlang(由于 RabbitMQ 是基于 Erlang 的)
RabbitMQ 和 Erlang 的对应关系:
Erlang 下载地址:https://www.erlang.org/downloads
安装过程简单粗暴,一直 next 就行。
# 2.2 安装 RabbitMQ
安装:点 next 就行。
安装完成之后,配置 RabbitMQ,执行以下命令,启用 Web 管理插件:
rabbitmq-plugins enable rabbitmq_management
- 进入首页,用户名密码 guest/guest:
# 3.RabbitMQ 的工作原理
生产者发送/发布消息到代理。
消费者从代理那里接收消息。RabbitMQ 扮演代理中间件的角色。
当生产者发送消息时,它并不是直接把消息发送到队列里的,而是使用交换机来发送。
交换机把消息分发到不同的队列里,消费者就能从监听的队列中消费消息。
# 4.六种消息模型
# 4.1简单模式(simple)
消息产生着将消息放入队列。
消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被消费后,自动从队列中删除。
缺点: 这种模式下消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。
应用场景: 客户端服务端模式的聊天程序。
代码:
(1)先定义一个简单的队列存储消息:
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:25
* @description:
* @modified By:
* @version: $
*/
@Configuration
public class SimpleQueue {
/**
* 创建一个简单的队列,叫 hello
* @return
*/
@Bean
public Queue queue() {
return new Queue("hello");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(2) 定义消费者:
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:25
* @description:消费者,监听 hello 队列
* @modified By:
* @version: $
*/
@Component
public class Consumer1 {
@RabbitListener(queues = "hello")
@RabbitHandler
public void receive(String msg){
System.out.println("Consumer1 收到消息:"+msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(3)定义生产者:
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:27
* @description:这里直接用 rest 接口来做生产者
* @modified By:
* @version: $
*/
@RestController
@RequestMapping("/")
public class RabbitRestController {
@Autowired
@Qualifier("fireRabbitTemplate")
private RabbitTemplate rabbitTemplate;
private final Logger log =LoggerFactory.getLogger(getClass());
@RequestMapping("/send")
public String send() {
String context = "hello==========" + new Date();
log.info("发送消息 : " + context);
//生产者,正在往 hello 这个路由规则中发送,由于没有交换机,所以路由规则就是队列名称
this.rabbitTemplate.convertAndSend("hello", context);
return "success";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
(4) 前端发送 Rest 请求,看控制台效果
http://localhost:8090/send (opens new window)
# 4.2工作模式(work)
消息产生者将消息放入队列。生产者系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢。
消费者 A,消费者 B,当然可以更多,同时监听同一个队列,消费者共同争抢当前的消息队列内容,谁先拿到谁负责消费消息。
缺点: 高并发情况下,会产生某一个消息被多个消费者共同消费。
应用场景: 发红包。
代码:
work 模式我们只需要在简单模式的基础上添加一个消费者,也监听 hello 这个队列。
(1)添加消费者2:
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:29
* @description:
* @modified By:
* @version: $
*/
@Component
public class Consumer2 {
@RabbitHandler
@RabbitListener(queues = "hello")
public void receive(String msg){
System.out.println("Consumer2 收到消息:"+msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(2)前端发送 Rest 请求,看控制台效果
http://localhost:8090/send (opens new window)
发送四次请求,消费者 1 和 2 分别接收到两次。
# 4.3 发布订阅模式(publish/subscribe)
X 代表交换机 rabbitMQ 内部组件,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中。
消费者监听队列,对应消息队列的消费者拿到消息进行消费。
相关场景: 邮件群发、群聊天。
(1)定义交换机与队列:
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:47
* @description:交换机-发布订阅模式
* @modified By:
* @version: $
*/
@Configuration
public class QueueExchange {
@Bean
public Queue queueA() {
return new Queue("queueA", true);
}
@Bean
public Queue queueB() {
return new Queue("queueB", true);
}
/**
* 创建一个 fanoutExchange 交换机
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
/**
* 将 queueA 队列绑定到 fanoutExchange 交换机上面
*/
@Bean
Binding bindingExchangeMessageFanoutA(Queue queueA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueA).to(fanoutExchange);
}
/**
* 将 queueB 队列绑定到 fanoutExchange 交换机上面
*/
@Bean
Binding bindingExchangeMessageFanoutB(Queue queueB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueB).to(fanoutExchange);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
(2)定义消费者:
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:消费者 3
* @modified By:
* @version: $
*/
@Component
public class Consumer3 {
@RabbitHandler
@RabbitListener(queues = "queueA")
public void receive(String msg){
System.out.println("Consumer3 收到消息:"+msg);
}
}
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:消费者 4
* @modified By:
* @version: $
*/
@Component
public class Consumer4 {
@RabbitHandler
@RabbitListener(queues = "queueB")
public void receive(String msg){
System.out.println("Consumer4 收到消息:"+msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
(3)定义生产者:
@RequestMapping("/sendExchange")
public String sendToExchange(){
String context = "exchange=======" + new Date();
log.info("发送消息 : " + context);
//生产者,正在往交换机发送消息,交换机会根据绑定的队列来发送(如果多个客户端监听同一个队列,只有一个能收到消息)
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
return "success";
}
2
3
4
5
6
7
8
9
(4) 前端发送 Rest 请求,看控制台效果
http://localhost:8090/sendExchange (opens new window)
这里消费者 3 和 4 都收到了消息,因为他们分别监听不同的两个队列。
# 4.4 路由模式(routing)
消息生产者将消息发送给交换机按照路由判断,路由是字符串,交换机根据路由的 key 去匹配,只有匹配上路由 key 对应的消息队列,对应的消费者才能消费消息。
根据业务功能定义路由字符串。
从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
业务场景: 统一门户和子系统交互,每个子系统对应不同的业务处理,通过路由 key 分发不同的消息到对应子系统队列,完成消息消费。
(1)定义交换机与队列,绑定路由 key:
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 17:06
* @description:路由模式
* @modified By:
* @version: $
*/
@Configuration
public class QueueRouter {
public static final String DIRECT_EXCHANGE = "directExchange";
public static final String QUEUE_DIRECT_A = "direct.A";
public static final String QUEUE_DIRECT_B = "direct.B";
/**
* 创建一个 direct 交换机
* @return
*/
@Bean
DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
Queue queueDirectNameA() {
return new Queue(QUEUE_DIRECT_A);
}
/**
* 创建队列
* @return
*/
@Bean
Queue queueDirectNameB() {
return new Queue(QUEUE_DIRECT_B);
}
/**
* 将 direct.A 队列绑定到 directExchange 交换机中,使用 a.key 作为路由规则
* @param queueDirectNameA
* @param directExchange
* @return
*/
@Bean
Binding bindingExchangeMessageDirectA(Queue queueDirectNameA, DirectExchange directExchange) {
return BindingBuilder.bind(queueDirectNameA).to(directExchange).with("a.key");
}
/**
* 将 direct.B 队列绑定到 directExchange 交换机中,使用 b.key 作为路由规则
* @param queueDirectNameB
* @param directExchange
* @return
*/
@Bean
Binding bindingExchangeMessageDirectB(Queue queueDirectNameB, DirectExchange directExchange) {
return BindingBuilder.bind(queueDirectNameB).to(directExchange).with("b.key");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
(2)定义消费者
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:
* @modified By:
* @version: $
*/
@Component
public class Consumer5 {
@RabbitListener(queues = QueueRouter.QUEUE_DIRECT_A)
@RabbitHandler
public void receiveA(String msg){
System.out.println("Consumer5-direct-A 收到路由消息:"+msg);
}
@RabbitListener(queues = QueueRouter.QUEUE_DIRECT_B)
@RabbitHandler
public void receiveB(String msg){
System.out.println("Consumer5-direct-B 收到路由消息:"+msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
(3)定义生产者
@RequestMapping("/sendRouter")
public String sendToExchangeByRouter(){
String context = "exchange=======" + new Date();
log.info("发送路由消息 : " + context);
//生产者,正在往交换机发送消息,队列绑定了不同路由规则,交换机会使用 a.key 作为路由规则来发送
this.rabbitTemplate.convertAndSend(QueueRouter.DIRECT_EXCHANGE,"a.key", context);
return "success";
}
2
3
4
5
6
7
8
(4)发送前端请求,看效果
http://localhost:8090/sendRouter (opens new window)
可以看到只有监听了 QUEUE_DIRECT_A 的消费者能收到消息,因为队列 A 使用的路由 key 为 a.key。
# 4.5 主题模式(topic)
注意:与路由模式的区别就是路由 key 可以是通配符,模糊匹配。交换机类型为 topic。
星号井号代表通配符。
星号代表多个单词,井号代表一个单词。
路由功能添加模糊匹配。
消息产生者产生消息,把消息交给交换机。
交换机根据 key 的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。
(1) 定义队列与 topic 交换机:
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 17:06
* @description:主题模式
* @modified By:
* @version: $
*/
@Configuration
public class QueueTopic {
public static final String TOPIC_EXCHANGE = "topicExchange";
public static final String DIRECT_REGXA = "fire.topic.#";
public static final String DIRECT_REGXB = "fire.topic.b";
public static final String DIRECT_REGXC = "fire.topic.c";
public static final String QUEUE_TOPIC_A = "topic.A";
public static final String QUEUE_TOPIC_B = "topic.B";
public static final String QUEUE_TOPIC_C = "topic.C";
/**
* 创建一个 topic 交换机
* @return
*/
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
/**
* 创建队列
* @return
*/
@Bean
Queue queueTopicNameA() {
return new Queue(QUEUE_TOPIC_A);
}
@Bean
Queue queueTopicNameB() {
return new Queue(QUEUE_TOPIC_B);
}
@Bean
Queue queueTopicNameC() {
return new Queue(QUEUE_TOPIC_C);
}
/**
* 将 direct.A 队列绑定到 topicExchange 交换机中,使用 nr.topic.#作为路由规则
* @param queueTopicNameA
* @param topicExchange
* @return
*/
@Bean
Binding bindingExchangeMessageTopicA(Queue queueTopicNameA, TopicExchange topicExchange) {
return BindingBuilder.bind(queueTopicNameA).to(topicExchange).with(DIRECT_REGXA);
}
/**
* 将 direct.B 队列绑定到 topicExchange 交换机中,使用 nr.topic.b 作为路由规则
* @param queueTopicNameB
* @param topicExchange
* @return
*/
@Bean
Binding bindingExchangeMessageTopicB(Queue queueTopicNameB, TopicExchange topicExchange) {
return BindingBuilder.bind(queueTopicNameB).to(topicExchange).with(DIRECT_REGXB);
}
/**
* 将 direct.B 队列绑定到 topicExchange 交换机中,使用 nr.topic.c 作为路由规则
* @param queueTopicNameC
* @param topicExchange
* @return
*/
@Bean
Binding bindingExchangeMessageTopicC(Queue queueTopicNameC, TopicExchange topicExchange) {
return BindingBuilder.bind(queueTopicNameC).to(topicExchange).with(DIRECT_REGXC);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
(2)定义消费者
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:
* @modified By:
* @version: $
*/
@Component
public class Consumer6 {
@RabbitListener(queues = QueueTopic.QUEUE_TOPIC_A)
@RabbitHandler
public void receiveA(String msg){
System.out.println("Consumer6-topic-A 收到路由消息:"+msg);
}
@RabbitListener(queues = QueueTopic.QUEUE_TOPIC_B)
@RabbitHandler
public void receiveB(String msg){
System.out.println("Consumer6-topic-B 收到路由消息:"+msg);
}
@RabbitListener(queues = QueueTopic.QUEUE_TOPIC_C)
@RabbitHandler
public void receiveC(String msg){
System.out.println("Consumer6-topic-C 收到路由消息:"+msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
(3)定义生产者:
@RequestMapping("/sendTopic")
public String sendToExchangeByTopic(){
String context = "topic=======" + new Date();
log.info("发送 topic 消息 : " + context);
//生产者,正在往 topic 交换机发送消息,队列绑定了不同路由规则,交换机会使用 fire.topic.b 作为路由规则来发送
// 用 fire.topic.b 和 fire.topic.#作为路由 key 的队列都能收到消息
this.rabbitTemplate.convertAndSend(QueueTopic.TOPIC_EXCHANGE,"fire.topic.b", context);
return "success";
}
2
3
4
5
6
7
8
9
(4)前端发送 Rest 请求,看效果
和预期一样,A 和 B 都收到了消息。
# 4.6 RPC 模式
客户端启动时,它将创建一个匿名排他回调队列。
对于 RPC 请求,客户端发送一条消息,该消息具有两个属性:reply_to(设置为回调队列)和 correlation_id(设置为每个请求的唯一值)。
求被发送到 rpc_queue 队列。
RPC 工作程序(又名:服务器)正在等待该队列上的请求。出现请求时,它将使用 reply_to 字段中的队列来完成工作,并将消息和结果发送回客户端。
客户端等待回调队列上的数据。当出现一条消息时,它将检查 correlation_id 属性。如果它与请求中的值匹配,则将响应返回给应用程序。
注:此处来源于官网,这里只做简单介绍,详情可看官网。
https://www.rabbitmq.com/tutorials/tutorial-six-python.html (opens new window)
# 5.消息确认机制(ACK)
业务系统中,消息丢了怎么办,消息发送到哪了?我们通常需要一些消息补偿机制去处理这些问题。
消息确认分为两种,发送确认和接收确认
# 5.1消息发送确认
确认生产者将消息发送给交换机,交换机传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换机,二是确认是否到达队列。
(1)通过实现 ConfirmCallBack 接口确认消息发送到交换机
代码:
/**
* 如果消息到达交换机, 则 confirm 回调, ack = true
* 如果消息不到达交换机, 则 confirm 回调, ack = false
* 需要设置 spring.rabbitmq.publisher-confirm-type=correlated
*/
rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
log.info("收到回调:{}", ack == true ? "消息成功到达交换机" : "消息到达交换机失败");
if (!ack) {
log.info("correlationData:{}", correlationData.getId());
log.info("消息到达交换机失败原因:{}", cause);
// 根据业务逻辑实现消息补偿机制
}
});
2
3
4
5
6
7
8
9
10
11
12
13
(2)通过实现 ReturnCallback 接口确认消息从交换机发送到队列。
/**
* 消息从交换机到达队列成功, 则 returnedMessage 不回调
* 消息从交换机到达队列失败, 则 returnedMessage 回调
* 需要设置 spring.rabbitmq.publisher-returns=true
*/
rabbitTemplate.setReturnsCallback(returnedMessage->{
log.info("消息未到达队列,setReturnsCallback 回调");
log.info("消息报文:{}", new String(returnedMessage.getMessage().getBody()));
log.info("消息编号:{}", returnedMessage.getReplyCode());
log.info("描述:{}", returnedMessage.getReplyText());
log.info("交换机名称:{}", returnedMessage.getExchange());
log.info("路由名称:{}", returnedMessage.getRoutingKey());
// 根据业务逻辑实现消息补偿机制
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 5.2.消息接收确认
(1) 确认模式
AcknowledgeMode.NONE:不确认
AcknowledgeMode.AUTO:自动确认
AcknowledgeMode.MANUAL:手动确认
2
3
4
5
需要配置:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
(2)代码
消费者确认:
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:
* @modified By:
* @version: $
*/
@Component
public class Consumer8 {
@RabbitListener(queues = QueueRouter.QUEUE_DIRECT_A)
@RabbitHandler
public void receiveA(Message msg, Channel channel) throws IOException {
try {
//消息确认机制还可以起到限流作用,比如在接收到此条消息时休眠几秒钟
Thread.sleep(3000);
// 确认收到消息,消息将被队列移除
// false 只确认当前 consumer 一个消息收到,true 确认所有 consumer 获得的消息。
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Consumer8-direct-A 收到确认消息:"+msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
生产者:
@RequestMapping("/sendAck")
@ResponseBody
public String sendAck() {
String context = "exchange=======" + new Date();
log.info("发送确认消息 : " + context);
//生产者,正在往交换机发送消息,队列绑定了不同路由规则,交换机会使用 a.key 作为路由规则来发送
this.rabbitTemplate.convertAndSend(QueueRouter.DIRECT_EXCHANGE,"a.key", context);
return "success";
}
2
3
4
5
6
7
8
9
前端请求看效果:
http://localhost:8090/sendAck (opens new window)
当然还有其他,比如失败确认 basicNack、拒绝 basicReject、basicPublish 重新发布等。
# 6.Spring Boot 整合 RabbitMQ
(1)新建一个 Maven 工程:
勾选 Spring Web 和 RabbitMQ 的依赖,也可以建好工程自己添加,创建完成,来看看 POM 文件依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(2)配置 yml 或者 properties,我这里使用 properties。
server.port= 8090
spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#消息确认需要配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
#手动确认消息
spring.rabbitmq.listener.direct.acknowledge-mode=manual
2
3
4
5
6
7
8
9
(3)启动项目
到此,已经可以开始开发你自己的业务逻辑了。
# 7. 扩展面试思考题
消息基于什么传输?
如何避免消息重复投递或重复消费?
如何保证消息不丢失?
手动确认模式中,消息手动拒绝中如果 requeue 为 true 会重新放入队列,消费者处理过程中一直有异常情况下会导致入队—拒绝—入队的死循环,怎么处理?
参考:
https://www.rabbitmq.com/documentation.html (opens new window)