RabbitMQ
MQ 概述
-
概述
MQ全称 Message Queue(消息队列),是在消息传输过程中保存消息的容器。多用于分布式系统之间进行通信。消息队列解决的不是存放消息的队列的目的,
解决的是通信问题。 -
小结
MQ,消息队列,存储消息的中间件 - 分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
- 发送方称为
生产者,接收方称为消费者
-
MQ的优势和劣势 -
优势
-
应用解耦:提高系统容错性和可维护性
如果库存系统发生错误,订单系统随着产生错误,导致整体容错性降低 加入中间件后,具有隔离性,降低耦合度 

系统的耦合性·越高,容错性就越低,可维护性就越低。
使用
MQ使得应用间解耦,提升容错性和可维护性。 -
异步提速:提升用户体验和系统吞吐量
吞吐量: 是指系统在单位时间内处理请求的数量。
-
削峰填谷: 提高系统稳定性
削峰填谷 
使用了
MQ之后, 限制消费消息的速度为 1000,这样一来,高峰期产生的数据势必会被积压在 MQ中, 高峰就被 削掉了,但是因为消息积压,在高峰期过后的一段时间内, 消费消息的速度还是会维持在 1000,直到消费完积压的消息,这就叫做 填谷
-
-
劣势
-
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦
MQ宕机,就会对业务造成影响。如何保证 MQ的高可用? -
系统复杂度提高
MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ进行异步调用。如何保证消息没有被重复消费? 怎么处理消息丢失情况? 如何保证消息传递的顺序性? -
一致性问题
A系统处理完业务,通过 MQ给 B、C、D三个系统发送消息,如果 B系统, C系统处理成功, D系统处理失败。如何保证消息数据处理的一致性?
-
-
-
使用
MQ应满足以下条件 - 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的
返回值应该为空,这才让明明下层的动作还没做,上层却当成功做完了继续往后走,即所谓异步成为了可能 - 容许短暂的不一致性
- 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入
MQ,管理MQ这些成本。
- 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的
RabbitMQ 简介
AMQP,即 (Advanced Message Queuing Protocol 高级消息队列协议) 是一个网络协议, 是应用层协议的一个开放标准, 为面向消息设计的中间件设计。基于此协议的客户端与消息中间件可传递消息, 并不受客户端 / 中间不同产品,不同的开发语言等条件的限制。
RabbitMQ 安装
Docker-安装-RabbitMQ
-
建议
docker安装 1
2
3
4
5
6
7
8
9
10
11
12# 文件名: docker-compose.yml
version: "3.1"
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:management
restart: always
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
volumes:
- ./data:/var/lib/rabbitmq在
yml文件所在目录下执行 docker-compose up -d -
官网
独立安装
-
下载
3.6.5(github): https://github.com/rabbitmq/rabbitmq-server/releases/tag/rabbitmq_v3_6_5下载方式 需要匹配 Erlang版本 

-
在线安装依赖环境
1
yum install -y build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
安装 Erlang
-
Erlanghttps://www.rabbitmq.com/which-erlang.html
根据版本信息可以到
github进行下载 -
上传文件
文件上传 
-
安装
1
2
3yum install -y socat
# 安装
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
安装 RabbitMQ
-
安装
1
2
3
4
5# 安装
rpm -ivh *.rpm --force --nodeps socat-1.7.3.2-1.1.el7.x86_64.rpm
# 安装
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm安装与启动 
-
官网教程文档
https://www.rabbitmq.com/getstarted.html 
开启管理界面及配置
-
配置
1
2
3
4
5
6
7# 开启管理界面
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息
# vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
vim +42 /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留 guest
{loopback_users, [guest]}, -
新建用户
-
查看当前拥有的角色
1
rabbitmqctl list_users;
-
创建账号
1
rabbitmqctl add_user admin(用户名) admin(密码)
-
设置用户角色
1
rabbitmqctl set_user_tags admin administrator
-
设置用户权限
1
2
3
4
5
6# ".*": 配置
# ".*": 写
# ".*": 读
rabbitmqctl set_permissions -p "/虚拟机名称" admin(用户) ".*" ".*" ".*"
# Eg:
rabbitmqctl set_permissions -p "/" admin(用户) ".*" ".*" ".*"
-
-
重启服务
1
service rabbitmq-server restart
-
访问测试
default port: 15672,以guest/guest登录 
启动
-
服务
1
2
3service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
基础操作
-
添加用户
(非必须) 添加用户,赋予超级管理员权限 
-
角色说明:
-
超级管理员
( administrator): 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略( policy)进行操作。 -
监控者
( monitoring): 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息 (进程数,内存使用情况,磁盘使用情况等) -
策略制定者
( policymaker): 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息 (上图红框标识的部分)。 -
普通管理者
( management): 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 -
其他: 无法登陆管理控制台,通常就是普通的生产者和消费者。
-
-
-
添加虚拟机
添加虚拟机 
-
添加可操作权限
权限 
-
配置文件配置
关于找不见配置文件问题 
-
配置流程
1
2
3
4
5
6# 进入默认的安装目录
cd /usr/share/doc/rabbitmq-server-3.6.5/
# 复制配置文件到指定目录
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
# 重启 rabbitmq 服务
service rabbitmq-server restart配置文件放入指定目录 
-
核心组件
Publisher-生产者: 发布消息到RabbitMQ中的 ExchangeConsumer-消费者: 监听RabbitMQ中的 Queue中的消息 Exchange-交换机: 和生产者建立连接并接受生产者的消息Queue-队列:Exchange会将消息分发到指定的 Queue,Queue和消费者进行交互 Routes: 路由: 交换机什么样的策略将消息发布到Queue
快速入门 (简单模式)
-
架构图
了解流程 
-
简单模型
P: producer(生产者) C: consumer(消费者)
-
需求: 使用简单模式完成消息传递
- 步骤
-
创建工程
(生产者、消费者) -
创建生产者
-
添加依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>rabbitmq-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.36</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project> -
创建测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56package com.example.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class HelloProducer {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO: Producer 实现步骤
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置连接参数
// 设置连接主机,不设置时, 默认连接 localhost
factory.setHost("192.168.247.129");
// 默认Port: 5672
factory.setPort(5672);
// 设置虚拟机
factory.setVirtualHost("/coderitl");
/* 用户名和密码默认: guest */
factory.setUsername("coder-itl");
factory.setPassword("coder-itl");
// 3. 创建 Connection
Connection connection = factory.newConnection();
// 4. 创建 channel
Channel channel = connection.createChannel();
// 5. 创建队列 Queue
/**
* String queue: 队列名称
* boolean durable: 是否持久化,如果是的化,会持久化到自带的数据库中,重启 mq 之后, 还在
* boolean exclusive:
* 1. 是否独占.只能有一个消费者监听这个队列
* 2. 当 Connection 关闭时,是否删除队列
* boolean autoDelete: 是否自动删除。当没有consumer 时, 自动删除掉
* Map<String, Object> arguments: 参数
*
* 如果没有一个名为 hello_world 的队列,则会自动创建, 如果有, 则使用已存在的队列
*/
channel.queueDeclare("hello_world", true, false, false, null);
// 6. 发送消息到 Queue
String body = "hello rabbitmq................";
/**
* String exchange: 交换机名称.简单模式下会使用默认的 ""
* String routingKey: 路由名称
* AMQP.BasicProperties props: 配置信息
* byte[] body: 发送的消息数据
*/
channel.basicPublish("", "hello_world", null, body.getBytes());
// 7. 释放资源
channel.close();
connection.close();
}
} -
关于虚拟主机
启动时虚拟主机必须已经创建,
否则会出先找不见 /xxx虚拟主机错误 需要先行创建虚拟主机 
-
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71// 依赖: 消费者和生产者一样
package com.example.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class HelloConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO: Consumer 实现步骤
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置连接参数
// 设置连接主机,不设置时, 默认连接 localhost
factory.setHost("192.168.247.129");
// 默认Port: 5672
factory.setPort(5672);
// 设置虚拟机
factory.setVirtualHost("/coderitl");
/* 用户名和密码默认: guest */
factory.setUsername("coder-itl");
factory.setPassword("coder-itl");
// 3. 创建 Connection
Connection connection = factory.newConnection();
// 4. 创建 channel
Channel channel = connection.createChannel();
// 5. 创建队列 Queue
/**
* String queue: 队列名称
* boolean durable: 是否持久化,如果是的化,会持久化到自带的数据库中,重启 mq 之后, 还在
* boolean exclusive:
* 1. 是否独占.只能有一个消费者监听这个队列
* 2. 当 Connection 关闭时,是否删除队列
* boolean autoDelete: 是否自动删除。当没有consumer 时, 自动删除掉
* Map<String, Object> arguments: 参数
*
* 如果没有一个名为 hello_world 的队列,则会自动创建, 如果有, 则使用已存在的队列
*/
channel.queueDeclare("hello_world", true, false, false, null);
// TODO: com.rabbitmq.client.Consumer;
// 创建一个 Consumer 对象,指明具体处理消息的程序
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法: 当收到消息后,会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机, 路由 key..
* @param properties 配置信息
* @param body 发送方的数据
* @throws IOException
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag: " + consumerTag);
System.out.println("Exchange: " + envelope.getExchange());
System.out.println("RoutingKey: " + envelope.getRoutingKey());
System.out.println("properties: " + properties);
// 字节转换为字符串
System.out.println("properties: " + new String(body));
}
};
/**
* String queue: 队列名称
* boolean autoAck: 是否自动确认
* Consumer callback: 回调对象
*/
channel.basicConsume("hello_world", true, consumer);
}
}
// TODO: 消费者处不能关闭资源1
2
3
4
5
6
7// 消息处理的其他方式
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 处理消息
System.out.println(new String(delivery.getBody()));
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});消费者 
-
-
-
分别添加依赖
-
编写生产者发送消息
-
编写消费者接受消息
-
- 步骤
RabbitMQ 工作模式
Work queues 工作队列模式
-
简单队列的问题
当多个消费者消费同一个队列时。这个时候
rabbitmq的公平调度机制就开启了, 于是, 无论消费者的消费能力如何, 每个消费者都能公平均匀分到相同数量的消息, 而不能出现能者多劳的情况。 能者多劳指的是: 假如
生产者产生了,100 个消息 消费者 1正常启动, 消费者 2在消费的过程中加入沉睡 300ms,真正的消费完毕是所有消费者消费完生产者的所产生的数据, 而没有实现 能者多劳(消费者1 早早消费完毕,却在等待消费者 2 进行消费) 1
2
3
4
5
6
7
8
9// 生产者
for (int i = 1; i <= 100; i++) {
// 5. 定义数据消息
String message = "I'm hello world rabbitmq module ................: " + i;
// 6. 发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("消息发送完毕..................");
}1
2
3
4
5
6
7
8// 消费者 1
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 处理消息
System.out.println(new String(delivery.getBody()));
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});1
2
3
4
5
6
7
8
9
10
11
12
13// 消费者 2
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
Thread.sleep(300L);
// 处理消息
System.out.println(new String(delivery.getBody()));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
}); -
模式说明
Work queues
-
Work queues: 与入门程序的简单模式相比,多了一个或一些消费端, 多个消费端共同消费同一个队列中的消息 -
应用场景: 对于
任务过重或者任务较多情况使用工作队列可以提高任务的处理速度 -
消费记录
消费记录 (自动 ack)
-
实现: 在简单模式的基础上,
添加一个消费者 -
自动
ACK不管消费者有没有消费完就马上
ack给 broker,此时意味着broker会推送下一条消息给消费者 1
2
3
4
5
6
7
8
9
10
11
12
13// 消费者 2 在消费地过程中进行了睡眠,
由于自动 ack, 导致 broker 堆积, 消费者 1 的消费能力强, 但是消费者 2 却将整体拉长, 导致能力降低
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
Thread.sleep(300L);
// 处理消息
System.out.println(new String(delivery.getBody()));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});消费情况 
-
手动
ACK-解决上述问题当
broker收到手动的 ack后, 才会推送下一条消息给消费者 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58// 生产者未发生变化
// 消费者 2 的改动
package com.example.consumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class WorkQueueConsumer2 {
public static final String QUEUE_NAME = "hello_world";
public static void main(String[] args) throws Exception {
// 1. 想办法连接上 broker
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.3");
// 默认项(可以省略)
factory.setUsername("root");
factory.setPassword("root");
factory.setPort(5672);
// 设置虚拟主机
factory.setVirtualHost("/qf-mq");
// 2. 从连接工厂获得连接
Connection connection = factory.newConnection();
// 3. 获得 Channel
Channel channel = connection.createChannel();
// TODO: 声明一次只接受一条消息
channel.basicQos(1);
// 4. 声明队列
/**
* String queue, QUEUE_NAME
* boolean durable, false
* boolean exclusive, false
* boolean autoDelete, false
* Map<String, Object> arguments null
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
Thread.sleep(300L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 处理消息
System.out.println(new String(delivery.getBody()));
// TODO: 手动 ACK
/**
* delivery.getEnvelope().getDeliveryTag(): 相当于 id
* false: 是否批量处理
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// TODO: 将自动 ACK 修改为 手动 ACK
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
}
}(消费者 2)实现能者多劳 (消费者 1)

Pub/Sub 订阅模式
-
模式说明
Pub/Sub 订阅模式 
-
P: 生产者,也就是要发送消息的那个程序,但是不再发送到队列中, 而是发送给 X(交换机) -
C: 消费者,消息的接收者, 会一直等待消息到来 -
Queue: 消息队列,接受消息,缓存消息 -
Exchange: 交换机(X). 一方面, 接受生产者发送的消息。另一方面, 知道如何处理消息, 例如递交给某个特别队列、递交给所有队列, 或是将消息丢弃。到底如何操作, 取决于 Exchange的类型。 Exchange有常见以下三种类型 Fanout: 广播,将消息交给所有绑定到交换机的队列 Direct: 定向,把消息交给符合指定 routing key的队列 Topic: 通配符,把消息交给副歌 routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息, 不具备存储消息的能力, 因此如果没有任何队列与 Exchange绑定, 或者没有符合路由规则的队列, 那么消息会丢失 -
实现:
fanout类型 -
工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41package com.example.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
private static String host;
private static int port;
private static String virtualHost;
private static String userName;
private static String password;
private static ConnectionFactory factory;
static {
host = "192.168.2.3";
port = 5672;
virtualHost = "/qf-mq";
userName = "root";
password = "root";
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setVirtualHost(virtualHost);
factory.setUsername(userName);
factory.setPassword(password);
}
/**
* 获取连接工厂
*
* @return
*/
public static ConnectionFactory getConnectionFactory() {
return factory;
}
public static Connection getConnection() throws Exception {
return factory.newConnection();
}
} -
fanout类型-生产者 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29package com.example.pubsub.producer;
import com.example.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class PubSubproducer {
// fanout: 发布订阅模型
// 定义交换机名称
public static final String EXCHANGE_FANNOUT_NAME = "fannout_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明交换机 exchangeDeclare(交换机名称,交换机类型)
channel.exchangeDeclare(EXCHANGE_FANNOUT_NAME, "fanout");
// 生产消息,发送给交换机
for (int i = 1; i <= 10; i++) {
String message = "fanout data...: " + i;
channel.basicPublish(EXCHANGE_FANNOUT_NAME, "", null, message.getBytes("UTF-8"));
}
System.out.println("消息已发送...............");
// 释放资源
channel.close();
connection.close();
}
} -
消费者
-
消费者
11
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package com.example.pubsub.consumer;
import com.example.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class PubSubConsumer1 {
// 定义交换机和队列名称
public static final String EXCHANGE_FANNOUT_NAME = "fannout_exchange";
public static final String QUEUE_FANNOUT_NAME = "fannout_queue1";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列和交换机
channel.queueDeclare(QUEUE_FANNOUT_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHANGE_FANNOUT_NAME, "fanout");
// 将队列绑定到交换机上
channel.queueBind(QUEUE_FANNOUT_NAME, EXCHANGE_FANNOUT_NAME, "");
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消息处理 => 打印
System.out.println(new String(body));
}
};
// 消费者监听队列
channel.basicConsume(QUEUE_FANNOUT_NAME, true, consumer);
}
} -
消费者
21
2
3// 差异
public static final String QUEUE_FANNOUT_NAME = "fannout_queue2";实现模型 
消费者 1消费者 2

一份数据,
被两个消费者同时消费的实现
-
-
Routing 路由模式
-
模式说明
Routing
-
说明
- 队列与交换机的绑定,
不能是任意的绑定了,而是指定一个 RoutingKey(路由key) - 消息的发送方在向
Exchange发送消息时, 也必须指定消息的 RoutingKey Exchange不在把消息交给每一个绑定的队列, 而是根据消息的 Routing Key进行判断, 只有队列的 RoutingKey与消息的 RoutingKey完全一致, 才会接收到消息
- 队列与交换机的绑定,
-
根据路由规则
根据路由规则 
-
实现
-
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25package com.example.routing.producer;
import com.example.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class RoutingKeyProducer {
// 定义交换机名称
public static final String ROUTING_EXCHANGE_NAME = "routing_direct_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, "direct");
// 发送消息
String message = "direct exchange about routingKey is apple...................";
channel.basicPublish(ROUTING_EXCHANGE_NAME, "apple", null, message.getBytes("UTF-8"));
System.out.println("消息已发送................");
channel.close();
connection.close();
}
} -
消费者
-
消费者
11
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31package com.example.routing.consumer;
import com.example.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class RoutingKeyConsumer1 {
// 定义交换机名称
public static final String ROUTING_EXCHANGE_NAME = "routing_direct_exchange";
public static final String ROUTING_QUEUE_NAME_1 = "routing_direct_queue_1";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机和队列
channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, "direct");
channel.queueDeclare(ROUTING_QUEUE_NAME_1, false, false, false, null);
// 将队列绑定到交换机
channel.queueBind(ROUTING_QUEUE_NAME_1, ROUTING_EXCHANGE_NAME, "orange");
// 数据消费
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
channel.basicConsume(ROUTING_QUEUE_NAME_1, true, consumer);
}
} -
消费者
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31package com.example.routing.consumer;
import com.example.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class RoutingKeyConsumer2 {
// 定义交换机名称
public static final String ROUTING_EXCHANGE_NAME = "routing_direct_exchange";
public static final String ROUTING_QUEUE_NAME_1 = "routing_direct_queue_2";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机和队列
channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, "direct");
channel.queueDeclare(ROUTING_QUEUE_NAME_1, false, false, false, null);
// 将队列绑定到交换机
channel.queueBind(ROUTING_QUEUE_NAME_1, ROUTING_EXCHANGE_NAME, "apple");
// 数据消费
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
channel.basicConsume(ROUTING_QUEUE_NAME_1, true, consumer);
}
}绑定模型 输出 

-
-
Topics 模式
-
模式说明
根据通配符匹配 
-
通配符说明
*: 只能代表一个单词#:可以是多单词
-
实现
-
注意
通配符出现在消费者的绑定关系处1
channel.queueBind(MQContaint.TOPIC_QUEUE, MQContaint.TOPIC_EXCHANGE, "coderitl.#");
生产者处:
1
2channel.basicPublish(MQContaint.TOPIC_EXCHANGE, "coderitl.jd.order", null,message.getBytes("UTF-8"));
绑定后 
-
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23package com.example.topics.producers;
import com.example.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class TopicsProducer {
public static final String TOPIC_EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, "topic");
// 发送消息
String message = "exchange is topic............";
channel.basicPublish(TOPIC_EXCHANGE_NAME, "product.order.coderitl", null, message.getBytes("UTF-8"));
System.out.println("消息已发送................");
channel.close();
connection.close();
}
} -
消费者
-
消费者
11
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31package com.example.topics.consumer;
import com.example.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TopicsConsumer1 {
// 声明交换机和队列
public static final String TOPIC_EXCHANGE_NAME = "topic_exchange";
public static final String TOPIC_QUEUE_NAME = "topic_queue_1";
public static final String ROUTING_KEY = "product.*";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TOPIC_QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, "topic");
// TODO: 忘记了什么? => 绑定关系
channel.queueBind(TOPIC_QUEUE_NAME, TOPIC_EXCHANGE_NAME, ROUTING_KEY);
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("*: " + new String(body));
}
};
channel.basicConsume(TOPIC_QUEUE_NAME, true, consumer);
}
} -
消费者
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31package com.example.topics.consumer;
import com.example.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TopicsConsumer2 {
// 声明交换机和队列
public static final String TOPIC_EXCHANGE_NAME = "topic_exchange";
public static final String TOPIC_QUEUE_NAME = "topic_queue_2";
public static final String ROUTING_KEY = "product.#";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TOPIC_QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, "topic");
// TODO: 忘记了什么? => 绑定关系
channel.queueBind(TOPIC_QUEUE_NAME, TOPIC_EXCHANGE_NAME, ROUTING_KEY);
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("#: " + new String(body));
}
};
channel.basicConsume(TOPIC_QUEUE_NAME, true, consumer);
}
}绑定关系 输出 

输出是生产者的
routing-key导致的结果
-
-
Spring-整合 RabbitMQ
-
生产者
-
创建生产者模块,
添加依赖 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spring-producers</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project> -
在
resources目录下创建属性配置文件 rabbitmq.properties1
2
3
4
5rabbitmq.host=192.168.2.3
rabbitmq.port=5672
rabbitmq.username=root
rabbitmq.password=root
rabbitmq.virtual-host=/spring-mq -
创建
rabbitmq的配置文件 spring-rabbitmq-producers.xml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:properties/rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
默认交换机类型为direct,名字为:"",路由键为队列的名称
-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<!--定义广播类型交换机;并绑定上述两个队列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"/>
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="coderitl.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="coderitl.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="itl.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans> -
MQ网页控制台 创建虚拟机并配置权限 
-
测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32package com.example.producers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
public class ProducersTest {
private RabbitTemplate rabbitTemplate;
public void helloWorldTest() {
rabbitTemplate.convertAndSend("spring_queue", "hell world spring.....");
}
public void fanoutTest() {
rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "fannout spring.....");
}
public void topicTest() {
rabbitTemplate.convertAndSend("spring_topic_exchange", "coderitl.hehe.haha", "topic spring haha.....");
rabbitTemplate.convertAndSend("spring_topic_exchange", "coderitl.hehe", "topic spring hehe.....");
}
}查看 
-
-
消费者
-
创建项目,
添加相同依赖 -
创建属性配置文件
-
创建
rabbitmq的配置文件 spring-rabbitmq-consumers.xml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:properties/rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<bean id="springQueueListener" class="com.coderitl.rabbitmq.listener.SpringQueueListener"/>
<bean id="fanoutListener1" class="com.coderitl.rabbitmq.listener.FanoutListener1"/>
<bean id="fanoutListener2" class="com.coderitl.rabbitmq.listener.FanoutListener2"/>
<bean id="topicListenerStar" class="com.coderitl.rabbitmq.listener.TopicListenerStar"/>
<bean id="topicListenerWell" class="com.coderitl.rabbitmq.listener.TopicListenerWell"/>
<bean id="topicListenerWell2" class="com.coderitl.rabbitmq.listener.TopicListenerWell2"/>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
<rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
<rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
<rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>
</rabbit:listener-container>
</beans>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15package com.coderitl.rabbitmq.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
// SpringQueueListener 其他类似
public class SpringQueueListener implements MessageListener {
// 重写 onMessage
public void onMessage(Message message) {
// 获取数据
System.out.println(new String(message.getBody()));
}
} -
测试
1
2
3
4
5
6
7
8
9
10
11
12// 只需要启动
public class ConsumersTest {
public void test() {
boolean flag = true;
while (flag) {
}
}
}
-
SpringBoot 整合 RabbitMQ
发布订阅模式
生产者
-
创建项目,
添加依赖 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies> -
创建配置
1
2
3
4
5
6
7
8
9
10
11server:
port: 8079
spring:
rabbitmq:
host: 192.168.2.3
port: 5672
username: root
password: root
virtual-host: /spring-mq -
创建配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MyRabbitMQProducerConfig {
// TODO: 确定实现目标类型(发布订阅模型) 【fanout*,direct,topics】
public static final String EXCHANGE_NAME = "boot_fannout_exchange";
// 声明交换机
public FanoutExchange bootExchange() {
return new FanoutExchange(EXCHANGE_NAME, true, false);
}
} -
创建测试,
发送数据 1
2
3
4
5
6
7
8
9
10
11
12
13
class BootMqProducerApplicationTests {
private RabbitTemplate rabbitTemplate;
void sendMessage() throws UnsupportedEncodingException {
String message = "我是本次发送的数据..............";
rabbitTemplate.convertAndSend("boot_fannout_exchange", "", message.getBytes("UTF-8"));
}
}
消费者
-
创建消费者项目,
添加依赖, 和生产者一致 -
创建配置文件,
和生产者一致 -
创建配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MyRabbitMQConsumerConfig {
// TODO: 确定实现目标类型(发布订阅模型) 【fannout*,direct,topics】
public static final String EXCHANGE_NAME = "boot_fannout_exchange";
public static final String QUEUE_NAME = "boot_fannout_queue";
// 声明交换机
public FanoutExchange bootExchange() {
return new FanoutExchange(EXCHANGE_NAME, true, false);
}
// 声明队列
public Queue bootQueue() {
return new Queue(QUEUE_NAME, true, false, false);
}
// 声明绑定关系
public Binding QueueBindToExchange(Queue queue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
} -
数据处理
1
2
3
4
5
6
7
8
public class BootMqConsumers {
// 监听队列: 当队列中有消息,则监听器工作, 处理接收到的消息
public void process(Message message) {
System.out.println(new String(message.getBody()));
}
}启动测试 
Topics 模式
生产者
-
生产者实现步骤
-
创建生产者
SpringBoot工程 -
引入依赖坐标
1
2
3
4
5<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.2</version>
</dependency> -
编写
yml配置, 基本信息配置 1
2
3
4
5
6
7
8
9
10
11
12spring:
rabbitmq:
# ip
host: 192.168.247.129
# port
port: 5672
# user
username: coder-itl
# password
password: coder-itl
# VirtualHost
virtual-host: /coderitl -
声明交换机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17package com.example.config;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class TopicModuleProducerConfig {
// 交换机
public static final String TOPIC_EXCHANGE = "topic_exchange";
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE, false, false);
}
} -
注入
RabbitTemplate,调用方法,完成消息的发送 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23package com.example;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.UnsupportedEncodingException;
class BootMqTopicProducersApplicationTests {
public static final String TOPIC_EXCHANGE = "topic_exchange";
private RabbitTemplate rabbitTemplate;
void sendMessage() throws UnsupportedEncodingException {
String message = "boot send test data add..............";
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE, "topic.module", message.getBytes("UTF-8"));
}
}
-
消费者
-
消费者实现步骤
-
创建消费者
SpringBoot工程 -
引入
starter1
2
3
4
5<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.2</version>
</dependency> -
编写
yml配置, 进行基本信息配置 1
2
3
4
5
6
7spring:
rabbitmq:
host: 192.168.247.129
port: 5672
username: coder-itl
password: coder-itl
virtual-host: /coderitl -
声明交换机、队列以及绑定关系
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package com.example.config.topic;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class TopicModuleConfig {
// 声明交换机和队列名称
public static final String TOPIC_EXCHANGE = "topic_exchange";
public static final String TOPIC_QUEUE = "topic_queue";
public static final String ROUTING_KEY = "topic.*";
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE, false, false);
}
public Queue queue() {
return new Queue(TOPIC_QUEUE, false, false, false, null);
}
// 将队列绑定搭到交换机
public Binding queueToExchange(Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(ROUTING_KEY);
}
} -
定义监听类,
使用 @RabbitListener注解完成队列监听 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15package com.example.config.consumer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
public class MyConsumer {
// 监听队列: 当队列中有消息,则监听器工作, 处理接收到的消息
public void process(Message message) {
System.out.println(new String(message.getBody()));
}
}获取到数据 绑定 

关于交换机的绑定可以在生产者处声明,以上大部分在消费者处声明,
将模型图看作两部分, 如下 关系划分 
-
手动 ACK 实现
-
消费端的配置文件中
添加如下1
2
3
4
5
6
7
8
9
10
11
12
13
14server:
port: 8089
spring:
rabbitmq:
host: 192.168.2.3
port: 5672
username: root
password: root
virtual-host: /spring-mq
listener:
simple:
acknowledge-mode: manual -
在消息处理最后进行手动确认
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21package com.example.consumer;
// Channel: client 包下
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
public class BootMqConsumers {
// 监听队列: 当队列中有消息,则监听器工作, 处理接收到的消息
public void process(Message message, Channel channel) throws IOException {
System.out.println(new String(message.getBody()));
// 手动 ack 确认: 告知 broker 要签收的消息的 id(DeliveryTag()) false: 本次不批量
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
高级特性
消息的可靠性投递
-
问题描述
在使用
RabbitMQ的时候, 作为消息发送方希望杜绝任何消息丢失或者投递失败场景。 RabbitMQ为我们提供了两种方式用来控制消息的可靠性模式 confirm确认模式 return退回模式
-
rabbitmq整个消息投递的路径 投递的路径 
-
confirm: 消息从producer到 exchange则会返回一个 confirmCallback -
return: 消息从exchange到 queue投递失败则会返回一个 returnCallback我们将利用这两个
callback控制消息的可靠性投递
-
-
可靠性投递的三个保障
- 生产者把消息准确的投递给交换机
【通过 confirm 机制】 - 交换机准确无误的把消息投递给队列
【通过 return 机制】 - 队列准确无误的把消息推送给消费者
【将自动 ack 修改为 手动 ack】
- 生产者把消息准确的投递给交换机
-
durable-
durable是持久化 -
作用
当
rabbitmq重启时, 在交换机处设置持久化为 true时, rabbitmq重启时交换机依然会存在, 一样队列
-
Confirm 机制
-
SpringBoot中的实现 -
修改生产者端的配置文件
( ①)1
2
3
4
5
6
7
8
9spring:
rabbitmq:
host: 192.168.2.3
port: 5672
username: root
password: root
virtual-host: /spring-mq
# 添加如下部分
publisher-confirm-type: correlatedpublisher-confirm-type的取值 simple: 简单的执行ack的判断, 在发布消息成功后使用 rabbitmTemplate调用 waitForConfirms或 waitForConfirmsOrDie方法等待 broker节点返回发送结果, 根据返回结果来判断下一步的逻辑。但是要注意的是当 waitForConfirmsOrDir方法如果返回 false则会关闭 channelcorrelated: 执行ack的时候还会携带数据 ( 消息的元数据)none: 禁用发布确认模式,默认的
-
编写一个
ConfirmCallback的实现类, 并注入到 rabbitTemplate1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45package com.example.confirm;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Objects;
public class MyConfirmCollback implements RabbitTemplate.ConfirmCallback {
private RabbitTemplate rabbitTemplate;
// 把监听器注入到 RabbitTemplate
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
/**
* @param correlationData 消息的元数据
* @param ack 生产者发送消息到 broker,如果签收成功, 返回 True, 否则返回 False
* @param cause 当 ack=False时的失败的原因
*/
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 获取消息的 id
String id = "";
if (Objects.nonNull(correlationData)) {
id = correlationData.getId();
}
if (ack) {
// 消息投递成功
System.out.println("消息投递成功.................: " + id);
} else {
// 消息投递失败,后续可以存入到缓存中,通过定时任务, 定时重发
System.out.println("消息投递失败.................: " + cause);
}
}
}
return 机制
-
修改生产者的配置文件
1
2
3
4
5
6
7
8
9
10
11spring:
rabbitmq:
host: 192.168.2.3
username: root
password: root
port: 5672
virtual-host: /coderitl
publisher-confirm-type: correlated
# 添加如下部分
publisher-returns: true -
实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62package com.example.confirm;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Objects;
public class MyConfirmCollback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
private RabbitTemplate rabbitTemplate;
// 把监听器注入到 RabbitTemplate
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* @param correlationData 消息的元数据
* @param ack 生产者发送消息到 broker,如果签收成功, 返回 True, 否则返回 False
* @param cause 当 ack=False时的失败的原因
*/
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 获取消息的 id
String id = "";
if (Objects.nonNull(correlationData)) {
id = correlationData.getId();
}
if (ack) {
// 消息投递成功
System.out.println("消息投递成功.................: " + id);
} else {
// 消息投递失败,后续可以存入到缓存中,通过定时任务, 定时重发
System.out.println("消息投递失败.................: " + cause);
}
}
/**
* 当消息没有传递到队列的时候,回调的方法
* ReturnedMessage:
* private final Message message;
* private final int replyCode;
* private final String replyText;
* private final String exchange;
* private final String routingKey;
*
* @param returnedMessage
*/
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息: " + new String(returnedMessage.getMessage().getBody()) + "没有成功投递到队列 4");
}
}returnCallback 错误的 routingKey

手动 ack,nack,reject 的区别
-
不做任何的
ackRabbitMQ会把消息标记成 unacked,此时 mq是在等待消费者进行 ack,如果消费者失去了会话, 此时消息会重新回到 ready状态, 被其他消费者消费 -
ack
确认签收,之后消息会从队列中剔除
-
reject
reject就是拒绝此消息 reject一次只支持处理一条消息。消息被拒绝掉值后, 并且 requeue设置成了 false,将会进入到 死信队列中。如果requeue设置成 true,将会重回到队列,但是这种情况很少使用。 1
2
3
4
5// Consumer 消费的回调处这几个方法
Consumer consumer = new DefaultConsumer(channel) {
channel.basicNack(参数...);
}; -
nack
nack和reject相同, 只是 nack支持批量处理多条消息
封装消息的元数据
保证消息的幂等性消费
-
什么是幂等性
幂等性: 多次操作造成的结果是一致的。对于非幂等的操作,
幂等性如何保证? =>【使用分布式锁】-
在请求方式中的幂等性的体现
get: 多次get结果是一致的 post: 添加,非幂等 put: 修改: 幂等,根据id修改 delete: 根据id删除, 幂等
对于非幂等的请求,我们在业务里要做幂等性保证
-
-
在消息队列中的幂等性体现
消息队列中,很可能一条消息被冗余部署的多个消费者收到,对于非幂等的操作,比如用户的注册,就需要做幂等性保证,否则消息将会被重复消费。使用分布式锁解决幂等性问题。
-
message的请求头中两个键值分别为 spring_listener_return_correlation: 该属性是用来确定消息被退回时调用哪个监听器spring_returned_message_correlation: 该属性是指退回待确认消息的唯一标识
-
业务代码中实现幂等性
-
publisher-confirm-type: correlated开启confirm请求头中将会使⽤ “ spring_returned_message_correlation”键来传递业务 id1
2# ⽣产者端修改配置⽂件
publisher-confirm-type: correlated -
⽣产者端传递业务
id1
2
3
4
5
6
7
8
9
10
11
12
13
public void testSendMessage() {
//业务 id
String id = UUID.randomUUID().toString();
//封装了业务 id 的消息元数据
CorrelationData correlationData = new CorrelationData(id);
//发送消息,并且携带消息的业务 id
rabbitTemplate.convertAndSend("my_boot_topic_exchange",
"product.add",
"hello message",
correlationData
);
} -
消费者端进⾏业务逻辑判断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24/**
* 消费端的幂等性的实现
*/
public void processByMSG(Message message, Channel channel)
throws IOException {
//如何获得消息的业务 id
String messageId =
message.getMessageProperties().getHeader("spring_returned_message_correlation");
//设置分布式锁
Boolean lock =
redisTemplate.opsForValue().setIfAbsent(messageId, 1, 100000,TimeUnit.MILLISECONDS);
if (lock) {
//做消费
}
System.out.println("添加⽤户成功");
//⼿动 ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else{
//不做消费
System.out.println("已重复消费");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
-
死信队列
-
死信队列的介绍
死信队列 ,让⼀条消息,在满⾜⼀定的条件下,成为死信,会被发送到另⼀个交换 机上,再被消费。 这个过程就是死信队列的作⽤。 死信队列就可以做出
“延迟” 队列的效果。⽐如,在订单超时未⽀付 ,将订单状态改 成 “已取消”,这个操作就可以使⽤死信队列来完成。设置消息的超时时间,当消息 超时则消息成为死信,于是通过监听死信队列的消费者来做取消订单的动作。 -
消息成为死信的条件
- 消息被拒签,并且没有重回队列,消息将成为死信。
- 消息过期了,消息将成为死信。
- 队列⻓度有限,存不下消息了,存不下的消息将会成为死信。