# 消息队列MQ ## 初识MQ ### **同步调用** image-20250527173401081 同步调用有3个问题: - **拓展性差**,每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动 - **性能下降**,每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和 - **级联失败**,当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。 ### 异步调用 ![image-20250527175753038](https://pic.bitday.top/i/2025/05/27/t2dfdb-0.png) ### 技术选型 ![image-20250527190824767](https://pic.bitday.top/i/2025/05/27/vk3zfw-0.png) ## RabbitMQ ### 部署 ```yml mq: image: rabbitmq:3.8-management container_name: mq restart: unless-stopped hostname: mq environment: - TZ=Asia/Shanghai RABBITMQ_DEFAULT_USER: admin RABBITMQ_DEFAULT_PASS: "admin" RABBITMQ_PLUGINS_DIR: "/plugins:/custom-plugins" ports: - "15672:15672" - "5672:5672" volumes: - ./mq-plugins:/custom-plugins networks: - hmall-net ``` http://localhost:15672/ 访问控制台 ### 架构图 ![image-20250527200935901](https://pic.bitday.top/i/2025/05/27/x8b2ej-0.png) - **`publisher`**:生产者,发送消息的一方 - **`consumer`**:消费者,消费消息的一方 - **`queue`**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理 - **`exchange`**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。**不存储** - **`virtual host`**:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue(每个项目+环境有各自的vhost) 一个队列最多指定给一个消费者! ## Spring AMQP ### 快速开始 **交换机和队列都是直接在控制台创建,消息的发送和接收在Java应用中实现!** 简单案例:直接向队列发送消息,**不经过交换机** ![image-20250528120304174](https://pic.bitday.top/i/2025/05/28/jwaiez-0.png) **引入依赖** ```xml org.springframework.boot spring-boot-starter-amqp ``` **配置MQ地址**,在`publisher`和`consumer`服务的`application.yml`中添加配置: ```yaml spring: rabbitmq: host: localhost # 你的虚拟机IP port: 5672 # 端口 virtual-host: /hmall # 虚拟主机 username: hmall # 用户名 password: 123 # 密码 ``` **消息发送:** 然后在`publisher`服务中编写测试类`SpringAmqpTest`,并利用`RabbitTemplate`实现消息发送: ```java @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); } } ``` **消息接收** ```java @Component public class SpringRabbitListener { // 利用RabbitListener来声明要监听的队列信息 // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。 // 可以看到方法体中接收的就是消息体的内容 @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } } ``` 然后启动启动类,它能自动从队列中取出消息。取出后队列中就没消息了! ### 交换机 **1)fanout:广播给每个绑定的队列** ![image-20250528133703660](https://pic.bitday.top/i/2025/05/28/m40swr-0.png) ![image-20250528132709273](https://pic.bitday.top/i/2025/05/28/ly3wne-0.png) 发送消息: `convertAndSend`如果2个参数,第一个表示队列名,第二个表示消息;如果3个参数,第一个表示交换机,第二个表示`RoutingKey`,第三个表示消息。 ```java @Test public void testFanoutExchange() { // 交换机名称 String exchangeName = "hmall.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); } ``` **2)Direct交换机** - 队列与交换机的绑定,不能是任意绑定了,而是要**指定**一个`RoutingKey`(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 `RoutingKey`。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的`Routing Key`进行判断,只有队列的`Routingkey`与消息的 `Routing key`完全一致,才会接收到消息 **注意,RoutingKey不等于队列名称** ![image-20250528141029943](https://pic.bitday.top/i/2025/05/28/nbn2ku-0.png) **3)Topic交换机** `Topic`类型的`Exchange`与`Direct`相比,都是可以根据`RoutingKey`把消息路由到不同的队列。 只不过`Topic`类型`Exchange`可以让队列在绑定`BindingKey` 的时候使用**通配符**! BindingKey一般都是有一个或**多个单词**组成,多个单词之间以`.`分割 通配符规则: - `#`:匹配一个或多个词 - `*`:匹配不多不少恰好**1个词** 举例: - `item.#`:能够匹配`item.spu.insert` 或者 `item.spu` - `item.*`:只能匹配`item.spu` ### 基于注解声明交换机队列 以往我们都在 RabbitMQ 管理控制台手动创建队列和交换机,开发人员还得把所有配置整理一遍交给运维,既繁琐又容易出错。更好的做法是在应用启动时自动检测所需的队列和交换机,若不存在则直接创建。 **基于注解方式来声明** `durable="true"`:队列在 RabbitMQ 重启后依然存在。 `type` 默认交换机类型为ExchangeTypes.DIRECT ```java @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1",durable = "true"), exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); } ``` **检查队列** - 如果 RabbitMQ 中已经有名为 `direct.queue1` 的队列,就不会重复创建; - 如果不存在,`RabbitAdmin` 会自动帮你创建一个。 **检查交换机** - 同理,会查看有没有名为 `hmall.direct`、类型为 `direct` 的交换机,若不存在就新建。 **检查绑定** - 最后再去声明绑定关系:把 `direct.queue1` 绑定到 `hmall.direct`,并且 routing-key 为 `"red"` 和 `"blue"`。 - 如果已有相同的绑定(队列、交换机、路由键都一致),也不会再重复创建。 ### 消息转换器 使用JSON方式来做序列化和反序列化,替换掉默认方式。 1)引入依赖 ```XML com.fasterxml.jackson.dataformat jackson-dataformat-xml 2.9.10 ``` 2)配置消息转换器,在`publisher`和`consumer`两个服务的启动类中添加一个Bean即可: ```Java @Bean public MessageConverter messageConverter(){ // 1.定义消息转换器 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息 jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; } ```