MQ 概述

  • 概述

    MQ 全称Message Queue(消息队列),是在消息传输过程中保存消息的容器。多用于分布式系统之间进行通信。

    消息队列解决的不是存放消息的队列的目的,解决的是通信问题。

  • 小结

    • MQ,消息队列,存储消息的中间件
    • 分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
    • 发送方称为生产者,接收方称为消费者
  • MQ 的优势和劣势

    • 优势

      1. 应用解耦:提高系统容错性和可维护性

        如果库存系统发生错误,订单系统随着产生错误,导致整体容错性降低 加入中间件后,具有隔离性,降低耦合度

        系统的耦合性·越高,容错性就越低,可维护性就越低。

        使用MQ 使得应用间解耦,提升容错性和可维护性。

      2. 异步提速:提升用户体验和系统吞吐量

        吞吐量: 是指系统在单位时间内处理请求的数量。

      3. 削峰填谷: 提高系统稳定性

        削峰填谷

        使用了MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ 中,高峰就被掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做填谷

    • 劣势

      1. 系统可用性降低

        系统引入的外部依赖越多,系统稳定性越差。一旦MQ 宕机,就会对业务造成影响。如何保证MQ 的高可用?

      2. 系统复杂度提高

        MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?如何保证消息传递的顺序性?

      3. 一致性问题

        A 系统处理完业务,通过MQ B、C、D 三个系统发送消息,如果B 系统,C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

  • 使用MQ 应满足以下条件

    1. 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成功做完了继续往后走,即所谓异步成为了可能
    2. 容许短暂的不一致性
    3. 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ 这些成本。

RabbitMQ 简介

  • AMQP(Advanced Message Queuing Protocol 高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息设计的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间不同产品,不同的开发语言等条件的限制。

RabbitMQ 安装

Docker-安装-RabbitMQ

  • 建议docker 安装

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 文件名: docker-compose.yml
    version: "3.1"
    services:
    rabbitmq:
    image: daocloud.io/library/rabbitmq:management
    restart: always
    container_name: rabbitmq
    ports:
    - 5672:5672
    - 15672:15672
    volumes:
    - ./data:/var/lib/rabbitmq

    yml 文件所在目录下执行docker-compose up -d

  • 官网

    https://rabbitmq.com/

独立安装

安装 Erlang
安装 RabbitMQ
  • 安装

    1
    2
    3
    4
    5
    # 安装
    rpm -ivh *.rpm --force --nodeps socat-1.7.3.2-1.1.el7.x86_64.rpm

    # 安装
    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
    安装与启动
  • 官网教程文档

    https://www.rabbitmq.com/getstarted.html

开启管理界面及配置

  • 配置

    1
    2
    3
    4
    5
    6
    7
    # 开启管理界面
    rabbitmq-plugins enable rabbitmq_management
    # 修改默认配置信息
    # vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
    vim +42 /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
    # 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
    {loopback_users, [guest]},
  • 新建用户

    • 查看当前拥有的角色

      1
      rabbitmqctl list_users;
    • 创建账号

      1
      rabbitmqctl add_user admin(用户名) admin(密码)
    • 设置用户角色

      1
      rabbitmqctl set_user_tags admin administrator
    • 设置用户权限

      1
      2
      3
      4
      5
      6
      # ".*": 配置 
      # ".*": 写
      # ".*": 读
      rabbitmqctl set_permissions -p "/虚拟机名称" admin(用户) ".*" ".*" ".*"
      # Eg:
      rabbitmqctl set_permissions -p "/" admin(用户) ".*" ".*" ".*"
  • 重启服务

    1
    service rabbitmq-server restart
  • 访问测试

    default port: 15672,以guest/guest 登录

启动

  • 服务

    1
    2
    3
    service rabbitmq-server start # 启动服务
    service rabbitmq-server stop # 停止服务
    service rabbitmq-server restart # 重启服务

基础操作

  • 添加用户(非必须)

    添加用户,赋予超级管理员权限
    • 角色说明

      1. 超级管理员(administrator): 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

      2. 监控者(monitoring): 可登陆管理控制台,同时可以查看rabbitmq 节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

      3. 策略制定者(policymaker): 可登陆管理控制台, 同时可以对policy 进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

      4. 普通管理者(management): 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

      5. 其他: 无法登陆管理控制台,通常就是普通的生产者和消费者。

  • 添加虚拟机

    添加虚拟机
  • 添加可操作权限

    权限
  • 配置文件配置

    关于找不见配置文件问题
    • 配置流程

      1
      2
      3
      4
      5
      6
      # 进入默认的安装目录
      cd /usr/share/doc/rabbitmq-server-3.6.5/
      # 复制配置文件到指定目录
      cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
      # 重启 rabbitmq 服务
      service rabbitmq-server restart
      配置文件放入指定目录

核心组件

  • Publisher-生产者: 发布消息到RabbitMQ 中的Exchange
  • Consumer-消费者: 监听RabbitMQ 中的Queue 中的消息
  • Exchange-交换机: 和生产者建立连接并接受生产者的消息
  • Queue-队列: Exchange 会将消息分发到指定的Queue,Queue 和消费者进行交互
  • Routes: 路由: 交换机什么样的策略将消息发布到Queue

快速入门(简单模式)

  • 架构图

    了解流程
  • 简单模型

    P: producer(生产者) C: consumer(消费者)
  • 需求: 使用简单模式完成消息传递

    • 步骤
      1. 创建工程(生产者、消费者)

        • 创建生产者

          • 添加依赖

            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            13
            14
            15
            16
            17
            18
            19
            20
            21
            22
            23
            24
            25
            26
            27
            28
            29
            30
            31
            32
            33
            34
            35
            36
            37
            38
            39
            40
            41
            42
            43
            44
            <?xml version="1.0" encoding="UTF-8"?>
            <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xmlns="http://maven.apache.org/POM/4.0.0"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>

            <groupId>org.example</groupId>
            <artifactId>rabbitmq-producer</artifactId>
            <version>1.0-SNAPSHOT</version>


            <dependencies>
            <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
            </dependency>
            <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            </dependency>
            <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.36</version>
            </dependency>
            </dependencies>


            <build>
            <plugins>
            <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.0</version>
            <configuration>
            <source>1.8</source>
            <target>1.8</target>
            </configuration>
            </plugin>
            </plugins>
            </build>
            </project>
          • 创建测试

            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            13
            14
            15
            16
            17
            18
            19
            20
            21
            22
            23
            24
            25
            26
            27
            28
            29
            30
            31
            32
            33
            34
            35
            36
            37
            38
            39
            40
            41
            42
            43
            44
            45
            46
            47
            48
            49
            50
            51
            52
            53
            54
            55
            56
            package com.example.producer;

            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;

            import java.io.IOException;
            import java.util.concurrent.TimeoutException;

            public class HelloProducer {
            public static void main(String[] args) throws IOException, TimeoutException {
            // TODO: Producer 实现步骤
            // 1. 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 2. 设置连接参数
            // 设置连接主机,不设置时,默认连接 localhost
            factory.setHost("192.168.247.129");
            // 默认Port: 5672
            factory.setPort(5672);
            // 设置虚拟机
            factory.setVirtualHost("/coderitl");
            /* 用户名和密码默认: guest */
            factory.setUsername("coder-itl");
            factory.setPassword("coder-itl");
            // 3. 创建 Connection
            Connection connection = factory.newConnection();
            // 4. 创建 channel
            Channel channel = connection.createChannel();
            // 5. 创建队列 Queue
            /**
            * String queue: 队列名称
            * boolean durable: 是否持久化,如果是的化,会持久化到自带的数据库中,重启 mq 之后,还在
            * boolean exclusive:
            * 1. 是否独占.只能有一个消费者监听这个队列
            * 2. 当 Connection 关闭时,是否删除队列
            * boolean autoDelete: 是否自动删除。当没有consumer 时,自动删除掉
            * Map<String, Object> arguments: 参数
            *
            * 如果没有一个名为 hello_world 的队列,则会自动创建,如果有,则使用已存在的队列
            */
            channel.queueDeclare("hello_world", true, false, false, null);
            // 6. 发送消息到 Queue
            String body = "hello rabbitmq................";
            /**
            * String exchange: 交换机名称.简单模式下会使用默认的 ""
            * String routingKey: 路由名称
            * AMQP.BasicProperties props: 配置信息
            * byte[] body: 发送的消息数据
            */
            channel.basicPublish("", "hello_world", null, body.getBytes());
            // 7. 释放资源
            channel.close();
            connection.close();
            }
            }

          • 关于虚拟主机

            启动时虚拟主机必须已经创建,否则会出先找不见/xxx 虚拟主机错误

            需要先行创建虚拟主机
          • 消费者

            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            13
            14
            15
            16
            17
            18
            19
            20
            21
            22
            23
            24
            25
            26
            27
            28
            29
            30
            31
            32
            33
            34
            35
            36
            37
            38
            39
            40
            41
            42
            43
            44
            45
            46
            47
            48
            49
            50
            51
            52
            53
            54
            55
            56
            57
            58
            59
            60
            61
            62
            63
            64
            65
            66
            67
            68
            69
            70
            71
            // 依赖: 消费者和生产者一样
            package com.example.consumer;

            import com.rabbitmq.client.*;

            import java.io.IOException;
            import java.util.concurrent.TimeoutException;

            public class HelloConsumer {
            public static void main(String[] args) throws IOException, TimeoutException {
            // TODO: Consumer 实现步骤
            // 1. 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 2. 设置连接参数
            // 设置连接主机,不设置时,默认连接 localhost
            factory.setHost("192.168.247.129");
            // 默认Port: 5672
            factory.setPort(5672);
            // 设置虚拟机
            factory.setVirtualHost("/coderitl");
            /* 用户名和密码默认: guest */
            factory.setUsername("coder-itl");
            factory.setPassword("coder-itl");
            // 3. 创建 Connection
            Connection connection = factory.newConnection();
            // 4. 创建 channel
            Channel channel = connection.createChannel();
            // 5. 创建队列 Queue
            /**
            * String queue: 队列名称
            * boolean durable: 是否持久化,如果是的化,会持久化到自带的数据库中,重启 mq 之后,还在
            * boolean exclusive:
            * 1. 是否独占.只能有一个消费者监听这个队列
            * 2. 当 Connection 关闭时,是否删除队列
            * boolean autoDelete: 是否自动删除。当没有consumer 时,自动删除掉
            * Map<String, Object> arguments: 参数
            *
            * 如果没有一个名为 hello_world 的队列,则会自动创建,如果有,则使用已存在的队列
            */
            channel.queueDeclare("hello_world", true, false, false, null);
            // TODO: com.rabbitmq.client.Consumer;
            // 创建一个 Consumer 对象,指明具体处理消息的程序
            Consumer consumer = new DefaultConsumer(channel) {
            /**
            * 回调方法: 当收到消息后,会自动执行该方法
            * @param consumerTag 标识
            * @param envelope 获取一些信息,交换机,路由key..
            * @param properties 配置信息
            * @param body 发送方的数据
            * @throws IOException
            */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("consumerTag: " + consumerTag);
            System.out.println("Exchange: " + envelope.getExchange());
            System.out.println("RoutingKey: " + envelope.getRoutingKey());
            System.out.println("properties: " + properties);
            // 字节转换为字符串
            System.out.println("properties: " + new String(body));
            }
            };
            /**
            * String queue: 队列名称
            * boolean autoAck: 是否自动确认
            * Consumer callback: 回调对象
            */
            channel.basicConsume("hello_world", true, consumer);
            }
            }
            // TODO: 消费者处不能关闭资源

            1
            2
            3
            4
            5
            6
            7
            // 消息处理的其他方式
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 处理消息
            System.out.println(new String(delivery.getBody()));
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
            消费者
      2. 分别添加依赖

      3. 编写生产者发送消息

      4. 编写消费者接受消息

RabbitMQ 工作模式

Work queues 工作队列模式

  • 简单队列的问题

    当多个消费者消费同一个队列时。这个时候rabbitmq 的公平调度机制就开启了,于是, 无论消费者的消费能力如何,每个消费者都能公平均匀分到相同数量的消息,而不能出现能者多劳的情况。

    能者多劳指的是: 假如生产者产生了100个消息,消费者1 正常启动,消费者2 在消费的过程中加入沉睡300ms,真正的消费完毕是所有消费者消费完生产者的所产生的数据,而没有实现能者多劳(消费者1早早消费完毕,却在等待消费者2进行消费)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 生产者
    for (int i = 1; i <= 100; i++) {
    // 5. 定义数据消息
    String message = "I'm hello world rabbitmq module ................: " + i;
    // 6. 发送消息
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println("消息发送完毕..................");
    }

    1
    2
    3
    4
    5
    6
    7
    8
    // 消费者 1
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    // 处理消息
    System.out.println(new String(delivery.getBody()));
    };
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
    });
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 消费者 2
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
    Thread.sleep(300L);
    // 处理消息
    System.out.println(new String(delivery.getBody()));
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    };
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
    });
  • 模式说明

    Work queues
  • Work queues: 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息

  • 应用场景: 对于任务过重或者任务较多情况使用工作队列可以提高任务的处理速度

  • 消费记录

    消费记录(自动ack)
  • 实现: 在简单模式的基础上,添加一个消费者

  • 自动ACK

    不管消费者有没有消费完就马上ack broker,此时意味着broker 会推送下一条消息给消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 消费者 2 在消费地过程中进行了睡眠,由于自动 ack,导致 broker 堆积,消费者 1 的消费能力强,但是消费者2却将整体拉长,导致能力降低
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
    Thread.sleep(300L);
    // 处理消息
    System.out.println(new String(delivery.getBody()));
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    };
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
    });
    消费情况
  • 手动ACK-解决上述问题

    broker 收到手动的ack 后,才会推送下一条消息给消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    // 生产者未发生变化
    // 消费者 2 的改动
    package com.example.consumer;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;

    public class WorkQueueConsumer2 {
    public static final String QUEUE_NAME = "hello_world";

    public static void main(String[] args) throws Exception {
    // 1. 想办法连接上 broker
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.2.3");
    // 默认项(可以省略)
    factory.setUsername("root");
    factory.setPassword("root");
    factory.setPort(5672);
    // 设置虚拟主机
    factory.setVirtualHost("/qf-mq");
    // 2. 从连接工厂获得连接
    Connection connection = factory.newConnection();
    // 3. 获得 Channel
    Channel channel = connection.createChannel();
    // TODO: 声明一次只接受一条消息
    channel.basicQos(1);
    // 4. 声明队列
    /**
    * String queue, QUEUE_NAME
    * boolean durable, false
    * boolean exclusive, false
    * boolean autoDelete, false
    * Map<String, Object> arguments null
    */
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
    Thread.sleep(300L);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    // 处理消息
    System.out.println(new String(delivery.getBody()));
    // TODO: 手动 ACK
    /**
    * delivery.getEnvelope().getDeliveryTag(): 相当于 id
    * false: 是否批量处理
    */
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    };
    // TODO: 将自动 ACK 修改为 手动 ACK
    channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
    });
    }
    }

    (消费者2) 实现能者多劳(消费者 1)

Pub/Sub 订阅模式

  • 模式说明

    Pub/Sub 订阅模式
  • P: 生产者,也就是要发送消息的那个程序,但是不再发送到队列中,而是发送给X(交换机)

  • C: 消费者,消息的接收者,会一直等待消息到来

  • Queue: 消息队列,接受消息,缓存消息

  • Exchange: 交换机(X). 一方面,接受生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列,或是将消息丢弃。到底如何操作,取决于Exchange 的类型。Exchange 有常见以下三种类型

    • Fanout: 广播,将消息交给所有绑定到交换机的队列
    • Direct: 定向,把消息交给符合指定routing key 的队列
    • Topic: 通配符,把消息交给副歌routing pattern(路由模式)的队列

    Exchange(交换机) 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失

  • 实现: fanout 类型

    • 工具类

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      package com.example.utils;

      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;

      public class RabbitMQUtils {
      private static String host;
      private static int port;
      private static String virtualHost;
      private static String userName;
      private static String password;
      private static ConnectionFactory factory;

      static {
      host = "192.168.2.3";
      port = 5672;
      virtualHost = "/qf-mq";
      userName = "root";
      password = "root";
      factory = new ConnectionFactory();
      factory.setHost(host);
      factory.setPort(port);
      factory.setVirtualHost(virtualHost);
      factory.setUsername(userName);
      factory.setPassword(password);
      }

      /**
      * 获取连接工厂
      *
      * @return
      */
      public static ConnectionFactory getConnectionFactory() {
      return factory;
      }

      public static Connection getConnection() throws Exception {
      return factory.newConnection();
      }
      }

    • fanout 类型-生产者

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      package com.example.pubsub.producer;

      import com.example.utils.RabbitMQUtils;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;

      public class PubSubproducer {
      // fanout: 发布订阅模型
      // 定义交换机名称
      public static final String EXCHANGE_FANNOUT_NAME = "fannout_exchange";

      public static void main(String[] args) throws Exception {
      Connection connection = RabbitMQUtils.getConnection();
      // 创建通道
      Channel channel = connection.createChannel();
      // 声明交换机 exchangeDeclare(交换机名称,交换机类型)
      channel.exchangeDeclare(EXCHANGE_FANNOUT_NAME, "fanout");
      // 生产消息,发送给交换机
      for (int i = 1; i <= 10; i++) {
      String message = "fanout data...: " + i;
      channel.basicPublish(EXCHANGE_FANNOUT_NAME, "", null, message.getBytes("UTF-8"));
      }
      System.out.println("消息已发送...............");
      // 释放资源
      channel.close();
      connection.close();
      }
      }

    • 消费者

      • 消费者1

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        package com.example.pubsub.consumer;

        import com.example.utils.RabbitMQUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        public class PubSubConsumer1 {
        // 定义交换机和队列名称
        public static final String EXCHANGE_FANNOUT_NAME = "fannout_exchange";
        public static final String QUEUE_FANNOUT_NAME = "fannout_queue1";

        public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列和交换机
        channel.queueDeclare(QUEUE_FANNOUT_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHANGE_FANNOUT_NAME, "fanout");
        // 将队列绑定到交换机上
        channel.queueBind(QUEUE_FANNOUT_NAME, EXCHANGE_FANNOUT_NAME, "");
        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 消息处理 => 打印
        System.out.println(new String(body));
        }
        };
        // 消费者监听队列
        channel.basicConsume(QUEUE_FANNOUT_NAME, true, consumer);
        }
        }

      • 消费者2

        1
        2
        3
        // 差异
        public static final String QUEUE_FANNOUT_NAME = "fannout_queue2";

        实现模型
        消费者1 消费者2

        一份数据,被两个消费者同时消费的实现

Routing 路由模式

  • 模式说明

    Routing
  • 说明

    • 队列与交换机的绑定,不能是任意的绑定了,而是指定一个RoutingKey(路由key)
    • 消息的发送方在向Exchange 发送消息时,也必须指定消息的RoutingKey
    • Exchange 不在把消息交给每一个绑定的队列,而是根据消息的Routing Key 进行判断,只有队列的RoutingKey 与消息的RoutingKey 完全一致,才会接收到消息
  • 根据路由规则

    根据路由规则
  • 实现

    • 生产者

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      package com.example.routing.producer;

      import com.example.utils.RabbitMQUtils;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;

      public class RoutingKeyProducer {
      // 定义交换机名称
      public static final String ROUTING_EXCHANGE_NAME = "routing_direct_exchange";

      public static void main(String[] args) throws Exception {
      Connection connection = RabbitMQUtils.getConnection();
      Channel channel = connection.createChannel();
      // 声明交换机
      channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, "direct");
      // 发送消息
      String message = "direct exchange about routingKey is apple...................";
      channel.basicPublish(ROUTING_EXCHANGE_NAME, "apple", null, message.getBytes("UTF-8"));

      System.out.println("消息已发送................");
      channel.close();
      connection.close();
      }
      }

    • 消费者

      • 消费者1

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        package com.example.routing.consumer;

        import com.example.utils.RabbitMQUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        public class RoutingKeyConsumer1 {
        // 定义交换机名称
        public static final String ROUTING_EXCHANGE_NAME = "routing_direct_exchange";
        public static final String ROUTING_QUEUE_NAME_1 = "routing_direct_queue_1";

        public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机和队列
        channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, "direct");
        channel.queueDeclare(ROUTING_QUEUE_NAME_1, false, false, false, null);
        // 将队列绑定到交换机
        channel.queueBind(ROUTING_QUEUE_NAME_1, ROUTING_EXCHANGE_NAME, "orange");
        // 数据消费
        Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(new String(body));
        }
        };
        channel.basicConsume(ROUTING_QUEUE_NAME_1, true, consumer);
        }
        }

      • 消费者2

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        package com.example.routing.consumer;

        import com.example.utils.RabbitMQUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        public class RoutingKeyConsumer2 {
        // 定义交换机名称
        public static final String ROUTING_EXCHANGE_NAME = "routing_direct_exchange";
        public static final String ROUTING_QUEUE_NAME_1 = "routing_direct_queue_2";

        public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机和队列
        channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, "direct");
        channel.queueDeclare(ROUTING_QUEUE_NAME_1, false, false, false, null);
        // 将队列绑定到交换机
        channel.queueBind(ROUTING_QUEUE_NAME_1, ROUTING_EXCHANGE_NAME, "apple");
        // 数据消费
        Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(new String(body));
        }
        };
        channel.basicConsume(ROUTING_QUEUE_NAME_1, true, consumer);
        }
        }

        绑定模型 输出

Topics 模式

  • 模式说明

    根据通配符匹配
  • 通配符说明

    • *: 只能代表一个单词
    • #:可以是多单词
  • 实现

    • 注意

      通配符出现在消费者的绑定关系处

      1
      channel.queueBind(MQContaint.TOPIC_QUEUE, MQContaint.TOPIC_EXCHANGE, "coderitl.#");

      生产者处:

      1
      2
      channel.basicPublish(MQContaint.TOPIC_EXCHANGE, "coderitl.jd.order", null,message.getBytes("UTF-8"));

      绑定后
    • 生产者

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      package com.example.topics.producers;

      import com.example.utils.RabbitMQUtils;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;

      public class TopicsProducer {
      public static final String TOPIC_EXCHANGE_NAME = "topic_exchange";

      public static void main(String[] args) throws Exception {
      Connection connection = RabbitMQUtils.getConnection();
      Channel channel = connection.createChannel();
      // 声明交换机
      channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, "topic");
      // 发送消息
      String message = "exchange is topic............";
      channel.basicPublish(TOPIC_EXCHANGE_NAME, "product.order.coderitl", null, message.getBytes("UTF-8"));
      System.out.println("消息已发送................");
      channel.close();
      connection.close();
      }
      }

    • 消费者

      • 消费者1

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        package com.example.topics.consumer;

        import com.example.utils.RabbitMQUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        public class TopicsConsumer1 {
        // 声明交换机和队列
        public static final String TOPIC_EXCHANGE_NAME = "topic_exchange";
        public static final String TOPIC_QUEUE_NAME = "topic_queue_1";
        public static final String ROUTING_KEY = "product.*";

        public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(TOPIC_QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, "topic");
        // TODO: 忘记了什么? => 绑定关系
        channel.queueBind(TOPIC_QUEUE_NAME, TOPIC_EXCHANGE_NAME, ROUTING_KEY);
        Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("*: " + new String(body));
        }
        };
        channel.basicConsume(TOPIC_QUEUE_NAME, true, consumer);

        }
        }

      • 消费者2

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        package com.example.topics.consumer;

        import com.example.utils.RabbitMQUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        public class TopicsConsumer2 {
        // 声明交换机和队列
        public static final String TOPIC_EXCHANGE_NAME = "topic_exchange";
        public static final String TOPIC_QUEUE_NAME = "topic_queue_2";
        public static final String ROUTING_KEY = "product.#";

        public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(TOPIC_QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, "topic");
        // TODO: 忘记了什么? => 绑定关系
        channel.queueBind(TOPIC_QUEUE_NAME, TOPIC_EXCHANGE_NAME, ROUTING_KEY);
        Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("#: " + new String(body));
        }
        };
        channel.basicConsume(TOPIC_QUEUE_NAME, true, consumer);

        }
        }

        绑定关系 输出

        输出是生产者的routing-key 导致的结果

Spring-整合RabbitMQ

  • 生产者

    • 创建生产者模块,添加依赖

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns="http://maven.apache.org/POM/4.0.0"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>

      <groupId>com.example</groupId>
      <artifactId>spring-producers</artifactId>
      <version>1.0-SNAPSHOT</version>

      <dependencies>
      <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>5.1.7.RELEASE</version>
      </dependency>
      <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>2.1.8.RELEASE</version>
      </dependency>
      <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      </dependency>
      <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-test</artifactId>
      <version>5.1.7.RELEASE</version>
      </dependency>
      </dependencies>

      <build>
      <plugins>
      <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.8.0</version>
      <configuration>
      <source>1.8</source>
      <target>1.8</target>
      </configuration>
      </plugin>
      </plugins>
      </build>
      </project>
    • resources 目录下创建属性配置文件rabbitmq.properties

      1
      2
      3
      4
      5
      rabbitmq.host=192.168.2.3
      rabbitmq.port=5672
      rabbitmq.username=root
      rabbitmq.password=root
      rabbitmq.virtual-host=/spring-mq
    • 创建rabbitmq 的配置文件spring-rabbitmq-producers.xml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context"
      xmlns:rabbit="http://www.springframework.org/schema/rabbit"
      xmlns="http://www.springframework.org/schema/beans"
      xsi:schemaLocation="http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/context
      https://www.springframework.org/schema/context/spring-context.xsd
      http://www.springframework.org/schema/rabbit
      http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
      <!--加载配置文件-->
      <context:property-placeholder location="classpath:properties/rabbitmq.properties"/>


      <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
      port="${rabbitmq.port}"
      username="${rabbitmq.username}"
      password="${rabbitmq.password}"
      virtual-host="${rabbitmq.virtual-host}"/>
      <!--定义管理交换机、队列-->
      <rabbit:admin connection-factory="connectionFactory"/>

      <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
      默认交换机类型为direct,名字为:"",路由键为队列的名称
      -->
      <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>

      <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
      <!--定义广播交换机中的持久化队列,不存在则自动创建-->
      <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>

      <!--定义广播交换机中的持久化队列,不存在则自动创建-->
      <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>

      <!--定义广播类型交换机;并绑定上述两个队列-->
      <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
      <rabbit:bindings>
      <rabbit:binding queue="spring_fanout_queue_1"/>
      <rabbit:binding queue="spring_fanout_queue_2"/>
      </rabbit:bindings>
      </rabbit:fanout-exchange>


      <!--定义广播交换机中的持久化队列,不存在则自动创建-->
      <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
      <!--定义广播交换机中的持久化队列,不存在则自动创建-->
      <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
      <!--定义广播交换机中的持久化队列,不存在则自动创建-->
      <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>

      <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
      <rabbit:bindings>
      <rabbit:binding pattern="coderitl.*" queue="spring_topic_queue_star"/>
      <rabbit:binding pattern="coderitl.#" queue="spring_topic_queue_well"/>
      <rabbit:binding pattern="itl.#" queue="spring_topic_queue_well2"/>
      </rabbit:bindings>
      </rabbit:topic-exchange>


      <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
      </beans>
    • MQ 网页控制台

      创建虚拟机并配置权限
    • 测试

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      package com.example.producers;

      import org.junit.Test;
      import org.junit.runner.RunWith;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.test.context.ContextConfiguration;
      import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

      @ContextConfiguration(locations = "classpath:spring-rabbitmq-producers.xml")
      @RunWith(SpringJUnit4ClassRunner.class)
      public class ProducersTest {
      @Autowired
      private RabbitTemplate rabbitTemplate;

      @Test
      public void helloWorldTest() {
      rabbitTemplate.convertAndSend("spring_queue", "hell world spring.....");
      }

      @Test
      public void fanoutTest() {
      rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "fannout spring.....");
      }

      @Test
      public void topicTest() {
      rabbitTemplate.convertAndSend("spring_topic_exchange", "coderitl.hehe.haha", "topic spring haha.....");
      rabbitTemplate.convertAndSend("spring_topic_exchange", "coderitl.hehe", "topic spring hehe.....");
      }
      }

      查看
  • 消费者

    • 创建项目,添加相同依赖

    • 创建属性配置文件

    • 创建rabbitmq 的配置文件spring-rabbitmq-consumers.xml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context"
      xmlns:rabbit="http://www.springframework.org/schema/rabbit"
      xmlns="http://www.springframework.org/schema/beans"
      xsi:schemaLocation="http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/context
      https://www.springframework.org/schema/context/spring-context.xsd
      http://www.springframework.org/schema/rabbit
      http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
      <!--加载配置文件-->
      <context:property-placeholder location="classpath:properties/rabbitmq.properties"/>


      <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
      port="${rabbitmq.port}"
      username="${rabbitmq.username}"
      password="${rabbitmq.password}"
      virtual-host="${rabbitmq.virtual-host}"/>

      <bean id="springQueueListener" class="com.coderitl.rabbitmq.listener.SpringQueueListener"/>
      <bean id="fanoutListener1" class="com.coderitl.rabbitmq.listener.FanoutListener1"/>
      <bean id="fanoutListener2" class="com.coderitl.rabbitmq.listener.FanoutListener2"/>
      <bean id="topicListenerStar" class="com.coderitl.rabbitmq.listener.TopicListenerStar"/>
      <bean id="topicListenerWell" class="com.coderitl.rabbitmq.listener.TopicListenerWell"/>
      <bean id="topicListenerWell2" class="com.coderitl.rabbitmq.listener.TopicListenerWell2"/>

      <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
      <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
      <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
      <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
      <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
      <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
      <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>
      </rabbit:listener-container>
      </beans>
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      package com.coderitl.rabbitmq.listener;

      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.core.MessageListener;

      // SpringQueueListener 其他类似
      public class SpringQueueListener implements MessageListener {
      // 重写 onMessage
      @Override
      public void onMessage(Message message) {
      // 获取数据
      System.out.println(new String(message.getBody()));
      }
      }

    • 测试

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      // 只需要启动
      @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumers.xml")
      @RunWith(SpringJUnit4ClassRunner.class)
      public class ConsumersTest {
      @Test
      public void test() {
      boolean flag = true;
      while (flag) {
      }
      }
      }

SpringBoot整合 RabbitMQ

发布订阅模式

生产者
  • 创建项目,添加依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
    </dependency>
    </dependencies>
  • 创建配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    server:
    port: 8079

    spring:
    rabbitmq:
    host: 192.168.2.3
    port: 5672
    username: root
    password: root
    virtual-host: /spring-mq

  • 创建配置类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14

    @Configuration
    public class MyRabbitMQProducerConfig {
    // TODO: 确定实现目标类型(发布订阅模型) 【fanout*,direct,topics】
    public static final String EXCHANGE_NAME = "boot_fannout_exchange";

    // 声明交换机
    @Bean
    public FanoutExchange bootExchange() {
    return new FanoutExchange(EXCHANGE_NAME, true, false);
    }

    }

  • 创建测试,发送数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @SpringBootTest
    class BootMqProducerApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void sendMessage() throws UnsupportedEncodingException {
    String message = "我是本次发送的数据..............";
    rabbitTemplate.convertAndSend("boot_fannout_exchange", "", message.getBytes("UTF-8"));
    }

    }

消费者
  • 创建消费者项目,添加依赖,和生产者一致

  • 创建配置文件,和生产者一致

  • 创建配置类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26

    @Configuration
    public class MyRabbitMQConsumerConfig {
    // TODO: 确定实现目标类型(发布订阅模型) 【fannout*,direct,topics】
    public static final String EXCHANGE_NAME = "boot_fannout_exchange";
    public static final String QUEUE_NAME = "boot_fannout_queue";

    // 声明交换机
    @Bean
    public FanoutExchange bootExchange() {
    return new FanoutExchange(EXCHANGE_NAME, true, false);
    }

    // 声明队列
    @Bean
    public Queue bootQueue() {
    return new Queue(QUEUE_NAME, true, false, false);
    }

    // 声明绑定关系
    @Bean
    public Binding QueueBindToExchange(Queue queue, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    }
  • 数据处理

    1
    2
    3
    4
    5
    6
    7
    8
    @Component
    public class BootMqConsumers {
    // 监听队列: 当队列中有消息,则监听器工作,处理接收到的消息
    @RabbitListener(queues = "boot_fannout_queue")
    public void process(Message message) {
    System.out.println(new String(message.getBody()));
    }
    }
    启动测试

Topics模式

生产者
  • 生产者实现步骤

    1. 创建生产者SpringBoot 工程

    2. 引入依赖坐标

      1
      2
      3
      4
      5
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      <version>2.7.2</version>
      </dependency>
    3. 编写yml 配置,基本信息配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      spring:
      rabbitmq:
      # ip
      host: 192.168.247.129
      # port
      port: 5672
      # user
      username: coder-itl
      # password
      password: coder-itl
      # VirtualHost
      virtual-host: /coderitl
    4. 声明交换机

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      package com.example.config;

      import org.springframework.amqp.core.TopicExchange;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;

      @Configuration
      public class TopicModuleProducerConfig {
      // 交换机
      public static final String TOPIC_EXCHANGE = "topic_exchange";

      @Bean
      public TopicExchange topicExchange() {
      return new TopicExchange(TOPIC_EXCHANGE, false, false);
      }
      }

    5. 注入RabbitTemplate,调用方法,完成消息的发送

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      package com.example;

      import org.junit.jupiter.api.Test;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.boot.test.context.SpringBootTest;

      import java.io.UnsupportedEncodingException;

      @SpringBootTest
      class BootMqTopicProducersApplicationTests {
      public static final String TOPIC_EXCHANGE = "topic_exchange";

      @Autowired
      private RabbitTemplate rabbitTemplate;

      @Test
      void sendMessage() throws UnsupportedEncodingException {
      String message = "boot send test data add..............";
      rabbitTemplate.convertAndSend(TOPIC_EXCHANGE, "topic.module", message.getBytes("UTF-8"));
      }
      }

消费者
  • 消费者实现步骤

    1. 创建消费者SpringBoot 工程

    2. 引入starter

      1
      2
      3
      4
      5
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      <version>2.7.2</version>
      </dependency>
    3. 编写yml 配置,进行基本信息配置

      1
      2
      3
      4
      5
      6
      7
      spring:
      rabbitmq:
      host: 192.168.247.129
      port: 5672
      username: coder-itl
      password: coder-itl
      virtual-host: /coderitl
    4. 声明交换机、队列以及绑定关系

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      package com.example.config.topic;

      import org.springframework.amqp.core.Binding;
      import org.springframework.amqp.core.BindingBuilder;
      import org.springframework.amqp.core.Queue;
      import org.springframework.amqp.core.TopicExchange;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;

      @Configuration
      public class TopicModuleConfig {
      // 声明交换机和队列名称
      public static final String TOPIC_EXCHANGE = "topic_exchange";
      public static final String TOPIC_QUEUE = "topic_queue";
      public static final String ROUTING_KEY = "topic.*";

      @Bean
      public TopicExchange topicExchange() {
      return new TopicExchange(TOPIC_EXCHANGE, false, false);
      }

      @Bean
      public Queue queue() {
      return new Queue(TOPIC_QUEUE, false, false, false, null);
      }

      // 将队列绑定搭到交换机
      @Bean
      public Binding queueToExchange(Queue queue, TopicExchange topicExchange) {
      return BindingBuilder.bind(queue).to(topicExchange).with(ROUTING_KEY);
      }
      }

    5. 定义监听类,使用@RabbitListener 注解完成队列监听

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      package com.example.config.consumer;

      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;

      @Component
      public class MyConsumer {
      // 监听队列: 当队列中有消息,则监听器工作,处理接收到的消息
      @RabbitListener(queues = "topic_queue")
      public void process(Message message) {
      System.out.println(new String(message.getBody()));
      }
      }

      获取到数据 绑定

      关于交换机的绑定可以在生产者处声明,以上大部分在消费者处声明,将模型图看作两部分,如下

      关系划分

手动 ACK 实现

  1. 消费端的配置文件中添加如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    server:
    port: 8089

    spring:
    rabbitmq:
    host: 192.168.2.3
    port: 5672
    username: root
    password: root
    virtual-host: /spring-mq
    listener:
    simple:
    acknowledge-mode: manual

  2. 在消息处理最后进行手动确认

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    package com.example.consumer;
    // Channel: client 包下
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;

    import java.io.IOException;

    @Component
    public class BootMqConsumers {
    // 监听队列: 当队列中有消息,则监听器工作,处理接收到的消息
    @RabbitListener(queues = "boot_fannout_queue")
    public void process(Message message, Channel channel) throws IOException {
    System.out.println(new String(message.getBody()));
    // 手动 ack 确认: 告知 broker 要签收的消息的 id(DeliveryTag()) false: 本次不批量
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    }

高级特性

消息的可靠性投递

  • 问题描述

    在使用RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的可靠性模式

    • confirm 确认模式
    • return 退回模式
  • rabbitmq 整个消息投递的路径

    投递的路径
    • confirm: 消息从producer exchange 则会返回一个confirmCallback

    • return: 消息从exchange queue 投递失败则会返回一个returnCallback

      我们将利用这两个callback 控制消息的可靠性投递

  • 可靠性投递的三个保障

    • 生产者把消息准确的投递给交换机【通过 confirm 机制】
    • 交换机准确无误的把消息投递给队列【通过 return 机制】
    • 队列准确无误的把消息推送给消费者【将自动 ack 修改为 手动 ack】
  • durable

    • durable 是持久化

    • 作用

      rabbitmq 重启时,在交换机处设置持久化为true 时,rabbitmq 重启时交换机依然会存在, 队列 一样

Confirm 机制

  • SpringBoot 中的实现

  • 修改生产者端的配置文件()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    spring:
    rabbitmq:
    host: 192.168.2.3
    port: 5672
    username: root
    password: root
    virtual-host: /spring-mq
    # 添加如下部分
    publisher-confirm-type: correlated
    • publisher-confirm-type 的取值
      1. simple: 简单的执行ack 的判断,在发布消息成功后使用rabbitmTemplate 调用waitForConfirms waitForConfirmsOrDie 方法等待broker 节点返回发送结果,根据返回结果来判断下一步的逻辑。但是要注意的是当waitForConfirmsOrDir 方法如果返回false 则会关闭channel
      2. correlated: 执行ack 的时候还会携带数据(消息的元数据)
      3. none: 禁用发布确认模式,默认的
  • 编写一个ConfirmCallback 的实现类,并注入到rabbitTemplate

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    package com.example.confirm;

    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;

    import javax.annotation.PostConstruct;
    import java.util.Objects;

    @Component
    public class MyConfirmCollback implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 把监听器注入到 RabbitTemplate
    @PostConstruct
    public void init() {
    rabbitTemplate.setConfirmCallback(this);
    }

    /**
    * @param correlationData 消息的元数据
    * @param ack 生产者发送消息到 broker,如果签收成功,返回 True,否则返回 False
    * @param cause 当 ack=False时的失败的原因
    */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    // 获取消息的 id
    String id = "";
    if (Objects.nonNull(correlationData)) {
    id = correlationData.getId();
    }
    if (ack) {
    // 消息投递成功
    System.out.println("消息投递成功.................: " + id);
    } else {
    // 消息投递失败,后续可以存入到缓存中,通过定时任务,定时重发
    System.out.println("消息投递失败.................: " + cause);
    }

    }

    }

return 机制

  • 修改生产者的配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    spring:
    rabbitmq:
    host: 192.168.2.3
    username: root
    password: root
    port: 5672
    virtual-host: /coderitl
    publisher-confirm-type: correlated
    # 添加如下部分
    publisher-returns: true

  • 实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    package com.example.confirm;

    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;

    import javax.annotation.PostConstruct;
    import java.util.Objects;

    @Component
    public class MyConfirmCollback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 把监听器注入到 RabbitTemplate
    @PostConstruct
    public void init() {
    rabbitTemplate.setConfirmCallback(this);
    rabbitTemplate.setReturnsCallback(this);
    }

    /**
    * @param correlationData 消息的元数据
    * @param ack 生产者发送消息到 broker,如果签收成功,返回 True,否则返回 False
    * @param cause 当 ack=False时的失败的原因
    */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    // 获取消息的 id
    String id = "";
    if (Objects.nonNull(correlationData)) {
    id = correlationData.getId();
    }
    if (ack) {
    // 消息投递成功
    System.out.println("消息投递成功.................: " + id);
    } else {
    // 消息投递失败,后续可以存入到缓存中,通过定时任务,定时重发
    System.out.println("消息投递失败.................: " + cause);
    }

    }

    /**
    * 当消息没有传递到队列的时候,回调的方法
    * ReturnedMessage:
    * private final Message message;
    * private final int replyCode;
    * private final String replyText;
    * private final String exchange;
    * private final String routingKey;
    *
    * @param returnedMessage
    */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
    System.out.println("消息: " + new String(returnedMessage.getMessage().getBody()) + "没有成功投递到队列 4");
    }
    }

    returnCallback 错误的routingKey

手动 ack,nack,reject的区别

  • 不做任何的ack

    RabbitMQ 会把消息标记成unacked,此时mq 是在等待消费者进行ack,如果消费者失去了会话,此时消息会重新回到ready 状态,被其他消费者消费

  • ack

    确认签收,之后消息会从队列中剔除

  • reject

    reject 就是拒绝此消息

    reject 一次只支持处理一条消息。消息被拒绝掉值后,并且requeue 设置成了false,将会进入到死信队列中。如果requeue 设置成true,将会重回到队列,但是这种情况很少使用。

    1
    2
    3
    4
    5
    // Consumer 消费的回调处这几个方法
    Consumer consumer = new DefaultConsumer(channel) {

    channel.basicNack(参数...);
    };
  • nack

    nackreject 相同,只是nack 支持批量处理多条消息

封装消息的元数据

保证消息的幂等性消费

  • 什么是幂等性

    幂等性: 多次操作造成的结果是一致的。对于非幂等的操作,幂等性如何保证?=>【使用分布式锁】

    • 在请求方式中的幂等性的体现

      • get: 多次get 结果是一致的
      • post: 添加,非幂等
      • put: 修改: 幂等,根据 id 修改
      • delete: 根据id 删除,幂等

      对于非幂等的请求,我们在业务里要做幂等性保证

  • 在消息队列中的幂等性体现

    消息队列中,很可能一条消息被冗余部署的多个消费者收到,对于非幂等的操作,比如用户的注册,就需要做幂等性保证,否则消息将会被重复消费。使用分布式锁解决幂等性问题。

  • message 的请求头中两个键值分别为

    • spring_listener_return_correlation: 该属性是用来确定消息被退回时调用哪个监听器
    • spring_returned_message_correlation: 该属性是指退回待确认消息的唯一标识
  • 业务代码中实现幂等性

    1. publisher-confirm-type: correlated 开启confirm 请求头中将会使⽤ “spring_returned_message_correlation键来传递业务id

      1
      2
      # ⽣产者端修改配置⽂件
      publisher-confirm-type: correlated
    2. ⽣产者端传递业务id

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      @Test
      public void testSendMessage() {
      //业务id
      String id = UUID.randomUUID().toString();
      //封装了业务id的消息元数据
      CorrelationData correlationData = new CorrelationData(id);
      //发送消息,并且携带消息的业务id
      rabbitTemplate.convertAndSend("my_boot_topic_exchange",
      "product.add",
      "hello message",
      correlationData
      );
      }
    3. 消费者端进⾏业务逻辑判断

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      /**
      * 消费端的幂等性的实现
      */
      @RabbitListener(queues = "my_boot_topic_queue")
      public void processByMSG(Message message, Channel channel)
      throws IOException {
      //如何获得消息的业务id
      String messageId =
      message.getMessageProperties().getHeader("spring_returned_message_correlation");
      //设置分布式锁
      Boolean lock =
      redisTemplate.opsForValue().setIfAbsent(messageId, 1, 100000,TimeUnit.MILLISECONDS);
      if (lock) {
      //做消费
      }
      System.out.println("添加⽤户成功");
      //⼿动ack
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
      }else{
      //不做消费
      System.out.println("已重复消费");

      channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
      }

死信队列

  • 死信队列的介绍

    死信队列 ,让⼀条消息,在满⾜⼀定的条件下,成为死信,会被发送到另⼀个交换 机上,再被消费。 这个过程就是死信队列的作⽤。 死信队列就可以做出“延迟”队列的效果。⽐如,在订单超时未⽀付 ,将订单状态改 成“已取消”,这个操作就可以使⽤死信队列来完成。设置消息的超时时间,当消息 超时则消息成为死信,于是通过监听死信队列的消费者来做取消订单的动作。

  • 消息成为死信的条件

    • 消息被拒签,并且没有重回队列,消息将成为死信。
    • 消息过期了,消息将成为死信。
    • 队列⻓度有限,存不下消息了,存不下的消息将会成为死信。