7.20 消费MQ结算消息

This commit is contained in:
zhangsan 2025-07-20 22:10:23 +08:00
parent 3c54ed79d4
commit a346dfdfb1
12 changed files with 195 additions and 22 deletions

View File

@ -12,6 +12,7 @@ app:
group-buy-market: group-buy-market:
api-url: http://127.0.0.1:8091 api-url: http://127.0.0.1:8091
notify-url: http://127.0.0.1:8092/api/v1/alipay/group_buy_notify notify-url: http://127.0.0.1:8092/api/v1/alipay/group_buy_notify
notify-type: MQ
source: s01 source: s01
chanel: c01 chanel: c01
@ -56,6 +57,16 @@ spring:
delivery-mode: persistent # 确保全局默认设置为持久化(可选) delivery-mode: persistent # 确保全局默认设置为持久化(可选)
# 消息配置 # 消息配置
config: config:
# 生产者
producer:
# 主题配置
topic_order_pay_success:
# 绑定交换机
exchange: pay_mall_exchange
# 消费主题
routing_key: topic.order_pay_success
# 消费队列
queue: pay_mall_queue_2_order_pay_success
consumer: consumer:
# 消费 topic 主题team_success # 消费 topic 主题team_success
topic_team_success: topic_team_success:
@ -65,6 +76,14 @@ spring:
routing_key: topic.team_success routing_key: topic.team_success
# 消费队列 - 每个系统有自己的消费队列 # 消费队列 - 每个系统有自己的消费队列
queue: pay_mall_queue_2_topic_team_success queue: pay_mall_queue_2_topic_team_success
# 主题配置
topic_order_pay_success:
# 绑定交换机
exchange: pay_mall_exchange
# 消费主题
routing_key: topic.order_pay_success
# 消费队列
queue: pay_mall_queue_2_order_pay_success
# MyBatis 配置【如需使用记得打开】 # MyBatis 配置【如需使用记得打开】
mybatis: mybatis:

View File

@ -3,10 +3,12 @@ package edu.whut.test;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.google.common.eventbus.EventBus; import com.google.common.eventbus.EventBus;
import edu.whut.domain.order.adapter.event.PaySuccessMessageEvent; import edu.whut.domain.order.adapter.event.PaySuccessMessageEvent;
import edu.whut.infrastructure.event.EventPublisher;
import edu.whut.types.event.BaseEvent; import edu.whut.types.event.BaseEvent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
@ -24,6 +26,12 @@ public class ApiTest {
@Resource @Resource
private PaySuccessMessageEvent paySuccessMessageEvent; private PaySuccessMessageEvent paySuccessMessageEvent;
@Resource
private EventPublisher eventPublisher;
@Value("${spring.rabbitmq.config.producer.topic_order_pay_success.routing_key}")
private String TOPIC_ORDER_PAY_SUCCESS;
@Test @Test
public void test() throws InterruptedException { public void test() throws InterruptedException {
@ -37,4 +45,16 @@ public class ApiTest {
new CountDownLatch(1).await(); new CountDownLatch(1).await();
} }
@Test
public void test_eventPublisher() throws InterruptedException {
BaseEvent.EventMessage<PaySuccessMessageEvent.PaySuccessMessage> paySuccessMessageEventMessage = paySuccessMessageEvent.buildEventMessage(
PaySuccessMessageEvent.PaySuccessMessage.builder()
.tradeNo("1100000111")
.build());
eventPublisher.publish(TOPIC_ORDER_PAY_SUCCESS, JSON.toJSONString(paySuccessMessageEventMessage.getData()));
new CountDownLatch(1).await();
}
} }

View File

@ -5,6 +5,7 @@ import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Date; import java.util.Date;
@ -13,6 +14,15 @@ import java.util.Date;
@Component @Component
public class PaySuccessMessageEvent extends BaseEvent<PaySuccessMessageEvent.PaySuccessMessage> { public class PaySuccessMessageEvent extends BaseEvent<PaySuccessMessageEvent.PaySuccessMessage> {
@Value("${spring.rabbitmq.config.producer.topic_order_pay_success.routing_key}")
private String TOPIC_ORDER_PAY_SUCCESS;
/**
* 构造一个携带业务数据的 EventMessage
* id随机生成的唯一流水号
* timestamp当前时间
* data具体的 PaySuccessMessage 对象
*/
@Override @Override
public EventMessage<PaySuccessMessage> buildEventMessage(PaySuccessMessage data) { public EventMessage<PaySuccessMessage> buildEventMessage(PaySuccessMessage data) {
return EventMessage.<PaySuccessMessage>builder() return EventMessage.<PaySuccessMessage>builder()
@ -24,7 +34,7 @@ public class PaySuccessMessageEvent extends BaseEvent<PaySuccessMessageEvent.Pay
@Override @Override
public String topic() { public String topic() {
return "pay_success"; return TOPIC_ORDER_PAY_SUCCESS;
} }
@Data @Data

View File

@ -131,6 +131,12 @@ public class OrderService extends AbstractOrderService{
return repository.changeOrderClose(orderId); return repository.changeOrderClose(orderId);
} }
/**
* 拼团成功后
* 将该拼团中的交易号列表对应的拼团订单更新为已结算状态
* 并逐条发送支付成功消息到事件总线
*/
@Override @Override
public void changeOrderMarketSettlement(List<String> outTradeNoList) { public void changeOrderMarketSettlement(List<String> outTradeNoList) {
repository.changeOrderMarketSettlement(outTradeNoList); repository.changeOrderMarketSettlement(outTradeNoList);

View File

@ -39,6 +39,10 @@
<artifactId>group-buying-sys-api</artifactId> <artifactId>group-buying-sys-api</artifactId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 系统模块 --> <!-- 系统模块 -->
<dependency> <dependency>
<groupId>edu.whut</groupId> <groupId>edu.whut</groupId>

View File

@ -32,7 +32,8 @@ public class ProductPort implements IProductPort {
private String chanel; private String chanel;
@Value("${app.config.group-buy-market.notify-url}") @Value("${app.config.group-buy-market.notify-url}")
private String notifyUrl; private String notifyUrl;
@Value("${app.config.group-buy-market.notify-type}")
private String notifyType; //http or mq
private final IGroupBuyMarketService groupBuyMarketService; private final IGroupBuyMarketService groupBuyMarketService;
private final ProductRPC productRPC; private final ProductRPC productRPC;
@ -68,7 +69,13 @@ public class ProductPort implements IProductPort {
requestDTO.setSource(source); requestDTO.setSource(source);
requestDTO.setChannel(chanel); requestDTO.setChannel(chanel);
requestDTO.setOutTradeNo(orderId); requestDTO.setOutTradeNo(orderId);
requestDTO.setNotifyUrl(notifyUrl);
// 根据配置决定用 HTTP 回调还是 MQ
if ("HTTP".equalsIgnoreCase(notifyType)) {
requestDTO.setNotifyUrl(notifyUrl);
} else {
requestDTO.setNotifyMQ();
}
try { try {
// 发起 HTTP 请求执行营销锁单 // 发起 HTTP 请求执行营销锁单

View File

@ -13,6 +13,7 @@ import edu.whut.domain.order.model.valobj.MarketTypeVO;
import edu.whut.domain.order.model.valobj.OrderStatusVO; import edu.whut.domain.order.model.valobj.OrderStatusVO;
import edu.whut.infrastructure.dao.IOrderDao; import edu.whut.infrastructure.dao.IOrderDao;
import edu.whut.infrastructure.dao.po.PayOrder; import edu.whut.infrastructure.dao.po.PayOrder;
import edu.whut.infrastructure.event.EventPublisher;
import edu.whut.types.event.BaseEvent; import edu.whut.types.event.BaseEvent;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
@ -31,6 +32,8 @@ public class OrderRepository implements IOrderRepository {
private final EventBus eventBus; private final EventBus eventBus;
private final EventPublisher eventPublisher;
/** /**
* 保存新订单到数据库 * 保存新订单到数据库
*/ */
@ -124,7 +127,10 @@ public class OrderRepository implements IOrderRepository {
.build() .build()
); );
PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = evtMsg.getData(); PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = evtMsg.getData();
eventBus.post(JSON.toJSONString(paySuccessMessage)); //发布到 EventBus // 旧版发送消息方式
// eventBus.post(JSON.toJSONString(paySuccessMessage)); //发布到 EventBus
eventPublisher.publish(paySuccessMessageEvent.topic(), JSON.toJSONString(paySuccessMessage));
} }
/** /**
@ -164,20 +170,32 @@ public class OrderRepository implements IOrderRepository {
} }
/**
* 拼团成功后
* 将该拼团中的交易号列表对应的拼团订单更新为已结算状态
* 并逐条发送支付成功消息到事件总线
*/
@Override @Override
public void changeOrderMarketSettlement(List<String> outTradeNoList) { public void changeOrderMarketSettlement(List<String> outTradeNoList) {
// 更新拼团结算状态 // 1. 更新数据库中这些订单的拼团结算状态为已结算
orderDao.changeOrderMarketSettlement(outTradeNoList); orderDao.changeOrderMarketSettlement(outTradeNoList);
// 循环成功发送消息 - 一般在公司的场景里还会有job任务扫描超时没有结算的订单查询订单状态查询对方服务端的接口会被限制一次查询多少频次多少 // 2. 遍历每个交易号构造支付成功事件并发送到 MQ或其它事件总线
outTradeNoList.forEach(outTradeNo -> { outTradeNoList.forEach(outTradeNo -> {
BaseEvent.EventMessage<PaySuccessMessageEvent.PaySuccessMessage> paySuccessMessageEventMessage = paySuccessMessageEvent.buildEventMessage( // 构建事件消息体
PaySuccessMessageEvent.PaySuccessMessage.builder() BaseEvent.EventMessage<PaySuccessMessageEvent.PaySuccessMessage> msg =
.tradeNo(outTradeNo) paySuccessMessageEvent.buildEventMessage(
.build()); PaySuccessMessageEvent.PaySuccessMessage.builder()
PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = paySuccessMessageEventMessage.getData(); .tradeNo(outTradeNo) // 填入当前交易号
.build()
);
// 获取消息体中的业务数据
PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = msg.getData();
// 旧版发送消息方式
// eventBus.post(JSON.toJSONString(paySuccessMessage));
eventBus.post(JSON.toJSONString(paySuccessMessage)); // 发布消息到指定 TopicRouting Key
eventPublisher.publish(paySuccessMessageEvent.topic(), JSON.toJSONString(paySuccessMessage));
}); });
} }

View File

@ -0,0 +1,36 @@
package edu.whut.infrastructure.event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 消息发送
*/
@Slf4j
@Component
public class EventPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${spring.rabbitmq.config.producer.topic_order_pay_success.exchange}")
private String exchangeName;
public void publish(String routingKey, String message) {
try {
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, m -> {
// 持久化消息配置
m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return m;
});
} catch (Exception e) {
log.error("发送MQ消息失败 routingKey:{} message:{}", routingKey, message, e);
throw e;
}
}
}

View File

@ -84,7 +84,7 @@ public class AliPayController implements IPayService {
orderService.changeOrderMarketSettlement(requestDTO.getOutTradeNoList()); orderService.changeOrderMarketSettlement(requestDTO.getOutTradeNoList());
return "success"; return "success";
} catch (Exception e) { } catch (Exception e) {
log.info("拼团回调,组队完成,结算失败 {}", JSON.toJSONString(requestDTO)); log.error("拼团回调,组队完成,结算失败 {}", JSON.toJSONString(requestDTO));
return "error"; return "error";
} }
} }

View File

@ -6,9 +6,15 @@ import edu.whut.domain.goods.service.IGoodsService;
import edu.whut.domain.order.adapter.event.PaySuccessMessageEvent; import edu.whut.domain.order.adapter.event.PaySuccessMessageEvent;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
* 先TeamSuccessTopicListener再OrderPaySuccessListener
* 支付成功回调消息 * 支付成功回调消息
*/ */
@Slf4j @Slf4j
@ -17,16 +23,44 @@ import org.springframework.stereotype.Component;
public class OrderPaySuccessListener { public class OrderPaySuccessListener {
private final IGoodsService goodsService; private final IGoodsService goodsService;
@Subscribe
public void handleEvent(String paySuccessMessageJson) {
log.info("收到支付成功消息 {}", paySuccessMessageJson);
PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = JSON.parseObject(paySuccessMessageJson, PaySuccessMessageEvent.PaySuccessMessage.class); // @Subscribe - 旧版发布订阅方式
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.config.consumer.topic_order_pay_success.queue}"),
exchange = @Exchange(value = "${spring.rabbitmq.config.consumer.topic_order_pay_success.exchange}", type = ExchangeTypes.TOPIC),
key = "${spring.rabbitmq.config.consumer.topic_order_pay_success.routing_key}"
)
)
public void listener(String paySuccessMessageJson) {
try {
log.info("收到支付成功消息 {}", paySuccessMessageJson);
log.info("模拟发货(如;发货、充值、开户员、返利),单号:{}", paySuccessMessage.getTradeNo()); PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = JSON.parseObject(paySuccessMessageJson, PaySuccessMessageEvent.PaySuccessMessage.class);
// 变更订单状态 - 发货完成&结算 log.info("模拟发货(如;发货、充值、开户员、返利),单号:{}", paySuccessMessage.getTradeNo());
goodsService.changeOrderDealDone(paySuccessMessage.getTradeNo());
// 变更订单状态 - 发货完成&结算
goodsService.changeOrderDealDone(paySuccessMessage.getTradeNo());
// 可以打开测试MQ 消费失败会抛异常之后重试消费这个也是最终执行的重要手段
// throw new RuntimeException("重试消费");
} catch (Exception e) {
log.error("收到支付成功消息失败 {}", paySuccessMessageJson,e);
throw e;
}
} }
// @Subscribe
// public void handleEvent(String paySuccessMessageJson) {
// log.info("收到支付成功消息 {}", paySuccessMessageJson);
//
// PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = JSON.parseObject(paySuccessMessageJson, PaySuccessMessageEvent.PaySuccessMessage.class);
//
// log.info("模拟发货(如;发货、充值、开户员、返利),单号:{}", paySuccessMessage.getTradeNo());
//
// // 变更订单状态 - 发货完成&结算
// goodsService.changeOrderDealDone(paySuccessMessage.getTradeNo());
// }
} }

View File

@ -1,5 +1,8 @@
package edu.whut.trigger.listener; package edu.whut.trigger.listener;
import com.alibaba.fastjson.JSON;
import edu.whut.api.dto.NotifyRequestDTO;
import edu.whut.domain.order.service.IOrderService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Exchange;
@ -8,13 +11,18 @@ import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/** /**
* 结算完成消息监听 * 拼团完成消息监听
*/ */
@Slf4j @Slf4j
@Component @Component
public class TeamSuccessTopicListener { public class TeamSuccessTopicListener {
@Resource
private IOrderService orderService;
/** /**
* 指定消费队列 * 指定消费队列
*/ */
@ -26,7 +34,15 @@ public class TeamSuccessTopicListener {
) )
) )
public void listener(String message) { public void listener(String message) {
log.info("接收消息:{}", message); try {
NotifyRequestDTO requestDTO = JSON.parseObject(message, NotifyRequestDTO.class);
log.info("拼团回调,组队完成,结算开始 {}", JSON.toJSONString(requestDTO));
// 营销结算
orderService.changeOrderMarketSettlement(requestDTO.getOutTradeNoList());
} catch (Exception e) {
log.error("拼团回调,组队完成,结算失败 {}", message, e);
throw e;
}
} }
} }

View File

@ -10,6 +10,9 @@ import java.util.Date;
@Data @Data
public abstract class BaseEvent<T> { public abstract class BaseEvent<T> {
/**
* 构建事件消息各子类会根据 T 的类型生成包含 idtimestampdata 三个字段的 EventMessage
*/
public abstract EventMessage<T> buildEventMessage(T data); public abstract EventMessage<T> buildEventMessage(T data);
public abstract String topic(); public abstract String topic();