md_files/自学/消息队列MQ.md

629 lines
21 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 消息队列MQ
## 初识MQ
### **同步调用**
<img src="https://pic.bitday.top/i/2025/05/27/so4pss-0.png" alt="image-20250527173401081" style="zoom: 67%;" />
同步调用有3个问题
- **拓展性差**,每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动
- **性能下降**,每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和
- **级联失败**,当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。
### 异步调用
![image-20250527175753038](https://pic.bitday.top/i/2025/05/27/t2dfdb-0.png)
### 技术选型
![image-20250527190824767](https://pic.bitday.top/i/2025/05/27/vk3zfw-0.png)
## RabbitMQ
### 部署
```yml
mq: #消息队列
image: rabbitmq:3.8-management
container_name: mq
restart: unless-stopped
hostname: mq
environment:
TZ: "Asia/Shanghai"
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: "admin"
ports:
- "15672:15672"
- "5672:5672"
volumes:
- mq-plugins:/plugins
# 持久化数据卷,保存用户/队列/交换机等元数据
- ./mq-data:/var/lib/rabbitmq
networks:
- hmall-net
volumes:
mq-plugins:
```
http://localhost:15672/ 访问控制台
### 架构图
![image-20250527200935901](https://pic.bitday.top/i/2025/05/27/x8b2ej-0.png)
- **`publisher`**:生产者,发送消息的一方
- **`consumer`**:消费者,消费消息的一方
- **`queue`**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
- **`exchange`**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。**不存储**
- **`virtual host`**虚拟主机起到数据隔离的作用。每个虚拟主机相互独立有各自的exchange、queue每个项目+环境有各自的vhost
一个队列最多指定给一个消费者!
## Spring AMQP
### 快速开始
**交换机和队列都是直接在控制台创建消息的发送和接收在Java应用中实现**
简单案例:直接向队列发送消息,**不经过交换机**
![image-20250528120304174](https://pic.bitday.top/i/2025/05/28/jwaiez-0.png)
**引入依赖**
```xml
<!--AMQP依赖包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
```
**配置MQ地址**,在`publisher``consumer`服务的`application.yml`中添加配置:
```yaml
spring:
rabbitmq:
host: localhost # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
```
**消息发送:**
然后在`publisher`服务中编写测试类`SpringAmqpTest`,并利用**`RabbitTemplate`**实现消息发送:
```java
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
```
**消息接收**
```java
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
```
然后启动启动类,它能自动从队列中取出消息。取出后队列中就没消息了!
### 交换机
**1fanout广播给每个绑定的队列**
![image-20250528133703660](https://pic.bitday.top/i/2025/05/28/m40swr-0.png)
![image-20250528132709273](https://pic.bitday.top/i/2025/05/28/ly3wne-0.png)
发送消息:
`convertAndSend`如果2个参数第一个表示队列名第二个表示消息如果3个参数第一个表示交换机第二个表示`RoutingKey`,第三个表示消息。
```java
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
```
**2Direct交换机**
- 队列与交换机的绑定,不能是任意绑定了,而是要**指定**一个`RoutingKey`路由key
- 消息的发送方在 向 Exchange发送消息时也必须指定消息的 `RoutingKey`
- Exchange不再把消息交给每一个绑定的队列而是根据消息的`Routing Key`进行判断,只有队列的`Routingkey`与消息的 `Routing key`完全一致,才会接收到消息
**注意RoutingKey不等于队列名称**
![image-20250528141029943](https://pic.bitday.top/i/2025/05/28/nbn2ku-0.png)
**3Topic交换机**
`Topic`类型的`Exchange``Direct`相比,都是可以根据`RoutingKey`把消息路由到不同的队列。
只不过`Topic`类型`Exchange`可以让队列在绑定`BindingKey` 的时候使用**通配符**
BindingKey一般都是有一个或**多个单词**组成,多个单词之间以`.`分割
通配符规则:
- `#`:匹配一个或多个词
- `*`:匹配不多不少恰好**1个词**
举例:
- `item.#`:能够匹配`item.spu.insert` 或者 `item.spu`
- `item.*`:只能匹配`item.spu`
### 基于注解声明交换机、队列
以往我们都在 RabbitMQ 管理控制台手动创建队列和交换机,开发人员还得把所有配置整理一遍交给运维,既繁琐又容易出错。更好的做法是在应用启动时自动检测所需的队列和交换机,若不存在则直接创建。
**基于注解方式来声明**
`type` 默认交换机类型为ExchangeTypes.DIRECT
```java
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息【" + msg + "】");
}
```
**检查队列**
- 如果 RabbitMQ 中已经有名为 `direct.queue1` 的队列,就不会重复创建;
- 如果不存在,`RabbitAdmin` 会自动帮你创建一个。
**检查交换机**
- 同理,会查看有没有名为 `hmall.direct`、类型为 `direct` 的交换机,若不存在就新建。
**检查绑定**
- 最后再去声明绑定关系:把 `direct.queue1` 绑定到 `hmall.direct`,并且 routing-key 为 `"red"``"blue"`
- 如果已有相同的绑定(队列、交换机、路由键都一致),也不会再重复创建。
### 消息转换器
使用JSON方式来做序列化和反序列化替换掉默认方式。
更小或可压缩的消息体、易读、易调试
1引入依赖
```XML
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
```
2配置消息转换器`publisher``consumer`两个服务的启动类中添加一个Bean即可
```Java
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
```
## MQ高级
我们要解决消息丢失问题保证MQ的可靠性就必须从3个方面入手
- 确保生产者一定把消息发送到MQ
- 确保MQ不会将消息弄丢
- 确保消费者一定要处理消息
### 发送者的可靠性
#### **发送者重试**
修改发送者模块的`application.yaml`文件,添加下面的内容:
```YAML
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
```
- *阻塞重试,一般不建议开启。*
#### **发送者确认**
![image-20250529170017117](https://pic.bitday.top/i/2025/05/29/s49oln-0.png)
- 当消息投递到MQ但是**路由失败**没有队列绑定交换机、或者你routingKey设置错误等通过**Publisher Return**返回异常信息同时返回ack的确认信息代表投递成功
- 临时消息投递到了MQ并且入队成功返回ACK告知投递成功
- 持久消息投递到了MQ并且入队完成持久化返回ACK ,告知投递成功
- 其它情况都会返回NACK告知投递失败
1.在发送者模块的`application.yaml`中添加配置:
```YAML
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制并设置confirm类型
publisher-returns: true # 开启publisher return机制
```
- `none`关闭confirm机制
- `simple`同步阻塞等待MQ的回执
- `correlated`MQ异步回调返回回执
2.每个`RabbitTemplate`只能配置一个`ReturnCallback`,因此我们可以在**配置类**中统一设置。我们在publisher模块定义一个配置类
```java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
```
3.定义ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同因此ConfirmCallback需要在每次发消息时定义。具体来说是在调用RabbitTemplate中的convertAndSend方法时多传递一个参数
![image-20250529180404355](https://pic.bitday.top/i/2025/05/29/tu2fve-0.png)
这里的CorrelationData中包含两个核心的东西
- `id`消息的唯一标示MQ对不同的消息的回执以此做判断避免混淆
- `SettableListenableFuture`回执结果的Future对象
```java
@Test
void testPublisherConfirm() {
// 1.创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑基本不会触发
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回执的处理逻辑参数中的result就是回执内容
if(result.isAck()){ // result.isAck()boolean类型true代表ack回执false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
}else{ // result.getReason()String类型返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
```
**端到端投递保障**
- **ConfirmCallback** 只告诉你:消息“到”了 RabbitMQ 服务器吗ACKNACK没到
- **ReturnsCallback** 只告诉你:到达服务器的消息,能“进”队列吗?(能进就不回;进不了就退)
两者都成功,才能确认:“这条消息真的安全地进了队列,等着消费者去拿。”
- *开启生产者确认比较消耗MQ性能一般不建议开启。*
### MQ的可靠性
#### 数据持久化
为了保证数据的可靠性,必须配置数据持久化(从内存保存到磁盘上),包括:
- 交换机持久化选Durable
- 队列持久化选Durable
- 消息持久化选Persistent
控制台方式:
![image-20250530154302987](https://pic.bitday.top/i/2025/05/30/pipl8z-0.png)
![image-20250530154321546](https://pic.bitday.top/i/2025/05/30/pis74e-0.png)
这样重启后还能恢复。
代码方式,默认都是持久化的,不用变动。
### 消费者可靠性
#### 消费者确认机制
当消费者**处理消息结束**后向RabbitMQ返回自己的处理状态MQ做出相应反应...
![image-20250530160814244](https://pic.bitday.top/i/2025/05/30/qlfwrn-0.png)
上述的NACK状态时MQ会**不断向消费者重投**消息,直至被正确处理!!!
在消费者方通过下面的配置可以修改消费者收到消息后的ACK处理方式
none,收到消息就直接返回ack
manual手动实现ack
**auto**自动档业务异常返回nack 消息处理异常返回reject其他ack (默认也是这个模式)
```yaml
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
```
#### 消费者重试
- 类似发送者的重试机制,在消费者出现异常时利用**本地重试**而不是无限制的requeue到mq队列。
- 重试达到最大次数后、会返回reject消息会被丢弃
修改consumer服务的application.yml文件添加内容
```YAML
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态false有状态。如果业务中包含事务这里改为false
```
**Stateless无状态重试**:所有的重试都在**同一个事务上下文**里进行,直到重试次数用尽后才会把异常抛回容器并最终回滚或提交事务。
**Stateful有状态重试**:每次重试都会被当成一次**独立**的消息交付Spring 会为每次尝试开启新的事务,失败时立即回滚并重新投递,下次重试又是一个干净的事务环境。这对事务性操作(@Transactional)来说,能保证“第 N 次重试失败就回滚第 N 次的事务”,避免把所有尝试都裹在一笔大事务里
#### 失败处理策略
前面默认的达到最大重试次数后,消息会被丢弃,对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略这个策略是由`MessageRecovery`接口来定义的它有3个不同实现
- `RejectAndDontRequeueRecoverer`:重试耗尽后,直接`reject`,丢弃消息。默认就是这种方式
- `ImmediateRequeueMessageRecoverer`:重试耗尽后,返回`nack`,消息重新入队
- `RepublishMessageRecoverer`:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是`RepublishMessageRecoverer`,失败后将消息投递到一个指定的,专门存放异常消息的交换机->队列,后续由人工集中处理。
#### 业务幂等性
在程序开发中,幂等则是指同一个业务,执行一次或多次对业务状态的影响是一致的。如:
- 根据id删除数据
- 查询数据
- 新增数据
但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
- 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
- 退款业务。重复退款对商家而言会有经济损失。
**所以我们要尽可能避免业务被重复执行MQ消息的重复投递、页面卡顿时频繁刷新导致表单重复提交、服务间调用的重试**
法一唯一ID
1. 每一条消息都生成一个唯一的id与消息一起投递给消费者。
2. 消费者接收到消息后处理自己的业务业务处理成功后将消息ID保存到数据库
3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
法一存在业务侵入因为mq的消息ID与业务无关现在却多了一张专门记录 ID 的表或结构
法二:业务判断,基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
![image-20250603151010579](https://pic.bitday.top/i/2025/06/03/oz399u-0.png)
综上,支付服务与交易服务之间的订单状态一致性是如何保证的?
- 首先支付服务会正在用户支付成功以后利用MQ消息通知交易服务完成订单状态同步。
- 其次为了保证MQ消息的可靠性我们采用了生产者确认机制、消费者确认、消费者失败重试等策略确保消息投递的可靠性
- 最后我们还在交易服务设置了定时任务定期查询订单支付状态。这样即便MQ通知失败还可以利用定时任务作为兜底方案确保订单支付状态的最终一致性。
### 延迟消息
对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。
例如订单支付超时时间为30分钟则我们应该在用户下单后的第30分钟检查订单支付状态如果发现未支付应该立刻取消订单释放库存。
![image-20250603154136058](https://pic.bitday.top/i/2025/06/03/phnk4w-0.png)
#### 延迟消息插件
1.下载
[GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange)
2.上传插件由于之前docker部署MQ挂载了数据卷
```shell
docker volume ls #查看所有数据卷
docker volume inspect hmall_all_mq-plugins #获取数据卷的目录
#"Mountpoint": "/var/lib/docker/volumes/hmall_all_mq-plugins/_data"
```
我们上传插件到该目录下。
3.安装插件
```shell
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
```
![image-20250603163744968](https://pic.bitday.top/i/2025/06/03/r2v3hh-0.png)
#### 声明延迟交换机
额外指定参数 `delayed = "true"`
```Java
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息{}", msg);
}
```
#### 发送延迟消息
```Java
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
```
#### 超时订单问题
![image-20250603171922542](https://pic.bitday.top/i/2025/06/03/sfjqx5-0.png)
三种可能性1.如果用户支付成功,但是消息通知失败(未传递给订单服务),那么会导致数据不一致情况,这时延迟消息到达后,它会先看本地订单状态,发现处于'待支付'状态此时不确定是否是真的未支付还是消息通知失败需要再openfeign调用支付服务查询支付流水状态发现支付成功那么就更新本地订单状态为'已支付'。
2.如果用户支付成功且消息通知成功,那么订单服务会更新订单状态为'已支付',延迟消息到达时查询本地订单状态,确实'已支付'直接return。
3.用户确实到时间了扔未支付,此时本地订单状态和远程的支付流水状态都是'待支付',此时取消订单、恢复库存。