From 9b61eb101819bc06c3386252786344b73420125c Mon Sep 17 00:00:00 2001 From: zhangsan <646228430@qq.com> Date: Wed, 4 Jun 2025 16:30:54 +0800 Subject: [PATCH] =?UTF-8?q?6.4=20=E5=B0=86mq=E6=8A=BD=E5=8F=96=E4=B8=BA?= =?UTF-8?q?=E5=85=AC=E5=85=B1=E6=96=B9=E6=B3=95=E5=86=99=E5=9C=A8common?= =?UTF-8?q?=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/hmall/common/config/MqConfig.java | 32 +++++++++++ .../hmall/common}/constants/MQConstants.java | 6 ++- .../hmall/common/utils/RabbitMqHelper.java | 54 +++++++++++++++++++ .../main/resources/META-INF/spring.factories | 3 +- .../pay/service/impl/PayOrderServiceImpl.java | 15 ++++-- .../listener/OrderDelayMessageListener.java | 2 +- .../trade/service/impl/OrderServiceImpl.java | 13 ++--- 7 files changed, 110 insertions(+), 15 deletions(-) create mode 100644 hm-common/src/main/java/com/hmall/common/config/MqConfig.java rename {trade-service/src/main/java/com/hmall/trade => hm-common/src/main/java/com/hmall/common}/constants/MQConstants.java (52%) create mode 100644 hm-common/src/main/java/com/hmall/common/utils/RabbitMqHelper.java diff --git a/hm-common/src/main/java/com/hmall/common/config/MqConfig.java b/hm-common/src/main/java/com/hmall/common/config/MqConfig.java new file mode 100644 index 0000000..2e0d1e1 --- /dev/null +++ b/hm-common/src/main/java/com/hmall/common/config/MqConfig.java @@ -0,0 +1,32 @@ +package com.hmall.common.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.hmall.common.utils.RabbitMqHelper; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConditionalOnClass(value = {MessageConverter.class, RabbitTemplate.class}) +public class MqConfig { + + @Bean + @ConditionalOnBean(ObjectMapper.class) + public MessageConverter messageConverter(ObjectMapper mapper){ + // 1.定义消息转换器 + Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(mapper); + // 2.配置自动创建消息id,用于识别不同消息 + jackson2JsonMessageConverter.setCreateMessageIds(true); + return jackson2JsonMessageConverter; + } + + //注入一个 RabbitMqHelper 帮助类 + @Bean + public RabbitMqHelper rabbitMqHelper(RabbitTemplate rabbitTemplate){ + return new RabbitMqHelper(rabbitTemplate); + } +} \ No newline at end of file diff --git a/trade-service/src/main/java/com/hmall/trade/constants/MQConstants.java b/hm-common/src/main/java/com/hmall/common/constants/MQConstants.java similarity index 52% rename from trade-service/src/main/java/com/hmall/trade/constants/MQConstants.java rename to hm-common/src/main/java/com/hmall/common/constants/MQConstants.java index c69ae3a..2c2e4b3 100644 --- a/trade-service/src/main/java/com/hmall/trade/constants/MQConstants.java +++ b/hm-common/src/main/java/com/hmall/common/constants/MQConstants.java @@ -1,7 +1,11 @@ -package com.hmall.trade.constants; +package com.hmall.common.constants; public interface MQConstants { String DELAY_EXCHANGE_NAME = "trade.delay.direct"; String DELAY_ORDER_QUEUE_NAME = "trade.delay.order.queue"; String DELAY_ORDER_KEY = "delay.order.query"; + + //支付成功消息的交换器和路由键 + String PAY_EXCHANGE_NAME = "pay.direct"; + String PAY_SUCCESS_ROUTING_KEY = "pay.success"; } \ No newline at end of file diff --git a/hm-common/src/main/java/com/hmall/common/utils/RabbitMqHelper.java b/hm-common/src/main/java/com/hmall/common/utils/RabbitMqHelper.java new file mode 100644 index 0000000..b2bc6e9 --- /dev/null +++ b/hm-common/src/main/java/com/hmall/common/utils/RabbitMqHelper.java @@ -0,0 +1,54 @@ +package com.hmall.common.utils; + +import cn.hutool.core.lang.UUID; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.util.concurrent.ListenableFutureCallback; + +@Slf4j +@RequiredArgsConstructor +public class RabbitMqHelper { + + private final RabbitTemplate rabbitTemplate; + + public void sendMessage(String exchange, String routingKey, Object msg){ + log.debug("准备发送消息,exchange:{}, routingKey:{}, msg:{}", exchange, routingKey, msg); + rabbitTemplate.convertAndSend(exchange, routingKey, msg); + } + + public void sendDelayMessage(String exchange, String routingKey, Object msg, int delay){ + rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> { + message.getMessageProperties().setDelay(delay); + return message; + }); + } + + public void sendMessageWithConfirm(String exchange, String routingKey, Object msg, int maxRetries){ + log.debug("准备发送消息,exchange:{}, routingKey:{}, msg:{}", exchange, routingKey, msg); + CorrelationData cd = new CorrelationData(UUID.randomUUID().toString(true)); + cd.getFuture().addCallback(new ListenableFutureCallback<>() { + int retryCount; + @Override + public void onFailure(Throwable ex) { + log.error("处理ack回执失败", ex); + } + @Override + public void onSuccess(CorrelationData.Confirm result) { + if (result != null && !result.isAck()) { + log.debug("消息发送失败,收到nack,已重试次数:{}", retryCount); + if(retryCount >= maxRetries){ + log.error("消息发送重试次数耗尽,发送失败"); + return; + } + CorrelationData cd = new CorrelationData(UUID.randomUUID().toString(true)); + cd.getFuture().addCallback(this); + rabbitTemplate.convertAndSend(exchange, routingKey, msg, cd); + retryCount++; + } + } + }); + rabbitTemplate.convertAndSend(exchange, routingKey, msg, cd); + } +} \ No newline at end of file diff --git a/hm-common/src/main/resources/META-INF/spring.factories b/hm-common/src/main/resources/META-INF/spring.factories index ea50f47..7aa72c4 100644 --- a/hm-common/src/main/resources/META-INF/spring.factories +++ b/hm-common/src/main/resources/META-INF/spring.factories @@ -1,4 +1,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.hmall.common.config.MyBatisConfig,\ com.hmall.common.config.JsonConfig,\ - com.hmall.common.config.MvcConfig \ No newline at end of file + com.hmall.common.config.MqConfig,\ + com.hmall.common.config.MvcConfig diff --git a/pay-service/src/main/java/com/hmall/pay/service/impl/PayOrderServiceImpl.java b/pay-service/src/main/java/com/hmall/pay/service/impl/PayOrderServiceImpl.java index 5c6af44..5193d6d 100644 --- a/pay-service/src/main/java/com/hmall/pay/service/impl/PayOrderServiceImpl.java +++ b/pay-service/src/main/java/com/hmall/pay/service/impl/PayOrderServiceImpl.java @@ -6,8 +6,10 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.hmall.api.client.TradeClient; import com.hmall.api.client.UserClient; +import com.hmall.common.constants.MQConstants; import com.hmall.common.exception.BizIllegalException; import com.hmall.common.utils.BeanUtils; +import com.hmall.common.utils.RabbitMqHelper; import com.hmall.common.utils.UserContext; import com.hmall.enums.PayStatus; import com.hmall.pay.domain.dto.PayApplyDTO; @@ -42,8 +44,7 @@ public class PayOrderServiceImpl extends ServiceImpl i private final TradeClient tradeClient; - private final RabbitTemplate rabbitTemplate; - + private final RabbitMqHelper rabbitMqHelper; @Override public String applyPayOrder(PayApplyDTO applyDTO) { // 1.幂等性校验 @@ -72,9 +73,15 @@ public class PayOrderServiceImpl extends ServiceImpl i // 5.修改订单状态 // tradeClient.markOrderPaySuccess(po.getBizOrderNo()); try { - rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo()); + // 用常量替代硬编码,并调用 RabbitMqHelper + rabbitMqHelper.sendMessage( + MQConstants.PAY_EXCHANGE_NAME, + MQConstants.PAY_SUCCESS_ROUTING_KEY, + po.getBizOrderNo() + ); } catch (Exception e) { - log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e); + log.error("支付成功的消息发送失败,支付单id:{},交易单id:{}", + po.getId(), po.getBizOrderNo(), e); } } diff --git a/trade-service/src/main/java/com/hmall/trade/listener/OrderDelayMessageListener.java b/trade-service/src/main/java/com/hmall/trade/listener/OrderDelayMessageListener.java index 11d5b61..ce5743f 100644 --- a/trade-service/src/main/java/com/hmall/trade/listener/OrderDelayMessageListener.java +++ b/trade-service/src/main/java/com/hmall/trade/listener/OrderDelayMessageListener.java @@ -2,7 +2,7 @@ package com.hmall.trade.listener; import com.hmall.api.client.PayClient; import com.hmall.api.dto.PayOrderDTO; -import com.hmall.trade.constants.MQConstants; +import com.hmall.common.constants.MQConstants; import com.hmall.trade.domain.po.Order; import com.hmall.trade.service.IOrderService; import lombok.RequiredArgsConstructor; diff --git a/trade-service/src/main/java/com/hmall/trade/service/impl/OrderServiceImpl.java b/trade-service/src/main/java/com/hmall/trade/service/impl/OrderServiceImpl.java index 5882501..08bf293 100644 --- a/trade-service/src/main/java/com/hmall/trade/service/impl/OrderServiceImpl.java +++ b/trade-service/src/main/java/com/hmall/trade/service/impl/OrderServiceImpl.java @@ -7,10 +7,11 @@ import com.hmall.api.dto.ItemDTO; import com.hmall.api.dto.OrderDetailDTO; import com.hmall.common.exception.BadRequestException; import com.hmall.common.utils.BeanUtils; +import com.hmall.common.utils.RabbitMqHelper; import com.hmall.common.utils.UserContext; -import com.hmall.trade.constants.MQConstants; +import com.hmall.common.constants.MQConstants; import com.hmall.trade.domain.dto.OrderFormDTO; import com.hmall.trade.domain.po.Order; import com.hmall.trade.domain.po.OrderDetail; @@ -19,7 +20,6 @@ import com.hmall.trade.service.IOrderDetailService; import com.hmall.trade.service.IOrderService; import io.seata.spring.annotation.GlobalTransactional; import lombok.RequiredArgsConstructor; -import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -45,7 +45,7 @@ public class OrderServiceImpl extends ServiceImpl implements private final ItemClient itemClient; private final IOrderDetailService detailService; private final CartClient cartClient; - private final RabbitTemplate rabbitTemplate; + private final RabbitMqHelper rabbitMqHelper; @Override @GlobalTransactional @@ -90,14 +90,11 @@ public class OrderServiceImpl extends ServiceImpl implements throw new RuntimeException("库存不足!"); } // 5.发送延迟消息,检测订单支付状态 - rabbitTemplate.convertAndSend( + rabbitMqHelper.sendDelayMessage( MQConstants.DELAY_EXCHANGE_NAME, MQConstants.DELAY_ORDER_KEY, order.getId(), - message -> { - message.getMessageProperties().setDelay(900000); //延迟15分钟 - return message; - } + 900000 // 延迟 15 分钟 (毫秒) ); return order.getId(); }