6.4 将mq抽取为公共方法写在common中

This commit is contained in:
zhangsan 2025-06-04 16:30:54 +08:00
parent 6111d6eb95
commit 9b61eb1018
7 changed files with 110 additions and 15 deletions

View File

@ -0,0 +1,32 @@
package com.hmall.common.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hmall.common.utils.RabbitMqHelper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnClass(value = {MessageConverter.class, RabbitTemplate.class})
public class MqConfig {
@Bean
@ConditionalOnBean(ObjectMapper.class)
public MessageConverter messageConverter(ObjectMapper mapper){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(mapper);
// 2.配置自动创建消息id用于识别不同消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
//注入一个 RabbitMqHelper 帮助类
@Bean
public RabbitMqHelper rabbitMqHelper(RabbitTemplate rabbitTemplate){
return new RabbitMqHelper(rabbitTemplate);
}
}

View File

@ -1,7 +1,11 @@
package com.hmall.trade.constants;
package com.hmall.common.constants;
public interface MQConstants {
String DELAY_EXCHANGE_NAME = "trade.delay.direct";
String DELAY_ORDER_QUEUE_NAME = "trade.delay.order.queue";
String DELAY_ORDER_KEY = "delay.order.query";
//支付成功消息的交换器和路由键
String PAY_EXCHANGE_NAME = "pay.direct";
String PAY_SUCCESS_ROUTING_KEY = "pay.success";
}

View File

@ -0,0 +1,54 @@
package com.hmall.common.utils;
import cn.hutool.core.lang.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Slf4j
@RequiredArgsConstructor
public class RabbitMqHelper {
private final RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, Object msg){
log.debug("准备发送消息exchange:{}, routingKey:{}, msg:{}", exchange, routingKey, msg);
rabbitTemplate.convertAndSend(exchange, routingKey, msg);
}
public void sendDelayMessage(String exchange, String routingKey, Object msg, int delay){
rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
message.getMessageProperties().setDelay(delay);
return message;
});
}
public void sendMessageWithConfirm(String exchange, String routingKey, Object msg, int maxRetries){
log.debug("准备发送消息exchange:{}, routingKey:{}, msg:{}", exchange, routingKey, msg);
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString(true));
cd.getFuture().addCallback(new ListenableFutureCallback<>() {
int retryCount;
@Override
public void onFailure(Throwable ex) {
log.error("处理ack回执失败", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
if (result != null && !result.isAck()) {
log.debug("消息发送失败收到nack已重试次数{}", retryCount);
if(retryCount >= maxRetries){
log.error("消息发送重试次数耗尽,发送失败");
return;
}
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString(true));
cd.getFuture().addCallback(this);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, cd);
retryCount++;
}
}
});
rabbitTemplate.convertAndSend(exchange, routingKey, msg, cd);
}
}

View File

@ -1,4 +1,5 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.hmall.common.config.MyBatisConfig,\
com.hmall.common.config.JsonConfig,\
com.hmall.common.config.MvcConfig
com.hmall.common.config.MqConfig,\
com.hmall.common.config.MvcConfig

View File

@ -6,8 +6,10 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmall.api.client.TradeClient;
import com.hmall.api.client.UserClient;
import com.hmall.common.constants.MQConstants;
import com.hmall.common.exception.BizIllegalException;
import com.hmall.common.utils.BeanUtils;
import com.hmall.common.utils.RabbitMqHelper;
import com.hmall.common.utils.UserContext;
import com.hmall.enums.PayStatus;
import com.hmall.pay.domain.dto.PayApplyDTO;
@ -42,8 +44,7 @@ public class PayOrderServiceImpl extends ServiceImpl<PayOrderMapper, PayOrder> i
private final TradeClient tradeClient;
private final RabbitTemplate rabbitTemplate;
private final RabbitMqHelper rabbitMqHelper;
@Override
public String applyPayOrder(PayApplyDTO applyDTO) {
// 1.幂等性校验
@ -72,9 +73,15 @@ public class PayOrderServiceImpl extends ServiceImpl<PayOrderMapper, PayOrder> i
// 5.修改订单状态
// tradeClient.markOrderPaySuccess(po.getBizOrderNo());
try {
rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
// 用常量替代硬编码并调用 RabbitMqHelper
rabbitMqHelper.sendMessage(
MQConstants.PAY_EXCHANGE_NAME,
MQConstants.PAY_SUCCESS_ROUTING_KEY,
po.getBizOrderNo()
);
} catch (Exception e) {
log.error("支付成功的消息发送失败支付单id{} 交易单id{}", po.getId(), po.getBizOrderNo(), e);
log.error("支付成功的消息发送失败支付单id{}交易单id{}",
po.getId(), po.getBizOrderNo(), e);
}
}

View File

@ -2,7 +2,7 @@ package com.hmall.trade.listener;
import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import com.hmall.trade.constants.MQConstants;
import com.hmall.common.constants.MQConstants;
import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;

View File

@ -7,10 +7,11 @@ import com.hmall.api.dto.ItemDTO;
import com.hmall.api.dto.OrderDetailDTO;
import com.hmall.common.exception.BadRequestException;
import com.hmall.common.utils.BeanUtils;
import com.hmall.common.utils.RabbitMqHelper;
import com.hmall.common.utils.UserContext;
import com.hmall.trade.constants.MQConstants;
import com.hmall.common.constants.MQConstants;
import com.hmall.trade.domain.dto.OrderFormDTO;
import com.hmall.trade.domain.po.Order;
import com.hmall.trade.domain.po.OrderDetail;
@ -19,7 +20,6 @@ import com.hmall.trade.service.IOrderDetailService;
import com.hmall.trade.service.IOrderService;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -45,7 +45,7 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements
private final ItemClient itemClient;
private final IOrderDetailService detailService;
private final CartClient cartClient;
private final RabbitTemplate rabbitTemplate;
private final RabbitMqHelper rabbitMqHelper;
@Override
@GlobalTransactional
@ -90,14 +90,11 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements
throw new RuntimeException("库存不足!");
}
// 5.发送延迟消息检测订单支付状态
rabbitTemplate.convertAndSend(
rabbitMqHelper.sendDelayMessage(
MQConstants.DELAY_EXCHANGE_NAME,
MQConstants.DELAY_ORDER_KEY,
order.getId(),
message -> {
message.getMessageProperties().setDelay(900000); //延迟15分钟
return message;
}
900000 // 延迟 15 分钟 (毫秒)
);
return order.getId();
}