diff --git a/pay-mall-app/src/main/resources/application-dev.yml b/pay-mall-app/src/main/resources/application-dev.yml index ab6497b..551ff27 100644 --- a/pay-mall-app/src/main/resources/application-dev.yml +++ b/pay-mall-app/src/main/resources/application-dev.yml @@ -12,6 +12,7 @@ app: group-buy-market: api-url: http://127.0.0.1:8091 notify-url: http://127.0.0.1:8092/api/v1/alipay/group_buy_notify + notify-type: MQ source: s01 chanel: c01 @@ -56,6 +57,16 @@ spring: delivery-mode: persistent # 确保全局默认设置为持久化(可选) # 消息配置 config: + # 生产者 + producer: + # 主题配置 + topic_order_pay_success: + # 绑定交换机 + exchange: pay_mall_exchange + # 消费主题 + routing_key: topic.order_pay_success + # 消费队列 + queue: pay_mall_queue_2_order_pay_success consumer: # 消费 topic 主题,team_success topic_team_success: @@ -65,6 +76,14 @@ spring: routing_key: topic.team_success # 消费队列 - 每个系统有自己的消费队列 queue: pay_mall_queue_2_topic_team_success + # 主题配置 + topic_order_pay_success: + # 绑定交换机 + exchange: pay_mall_exchange + # 消费主题 + routing_key: topic.order_pay_success + # 消费队列 + queue: pay_mall_queue_2_order_pay_success # MyBatis 配置【如需使用记得打开】 mybatis: diff --git a/pay-mall-app/src/test/java/edu/whut/test/ApiTest.java b/pay-mall-app/src/test/java/edu/whut/test/ApiTest.java index d665826..478c4b3 100644 --- a/pay-mall-app/src/test/java/edu/whut/test/ApiTest.java +++ b/pay-mall-app/src/test/java/edu/whut/test/ApiTest.java @@ -3,10 +3,12 @@ package edu.whut.test; import com.alibaba.fastjson.JSON; import com.google.common.eventbus.EventBus; import edu.whut.domain.order.adapter.event.PaySuccessMessageEvent; +import edu.whut.infrastructure.event.EventPublisher; import edu.whut.types.event.BaseEvent; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @@ -24,6 +26,12 @@ public class ApiTest { @Resource private PaySuccessMessageEvent paySuccessMessageEvent; + @Resource + private EventPublisher eventPublisher; + + @Value("${spring.rabbitmq.config.producer.topic_order_pay_success.routing_key}") + private String TOPIC_ORDER_PAY_SUCCESS; + @Test public void test() throws InterruptedException { @@ -37,4 +45,16 @@ public class ApiTest { new CountDownLatch(1).await(); } + @Test + public void test_eventPublisher() throws InterruptedException { + BaseEvent.EventMessage paySuccessMessageEventMessage = paySuccessMessageEvent.buildEventMessage( + PaySuccessMessageEvent.PaySuccessMessage.builder() + .tradeNo("1100000111") + .build()); + + eventPublisher.publish(TOPIC_ORDER_PAY_SUCCESS, JSON.toJSONString(paySuccessMessageEventMessage.getData())); + + new CountDownLatch(1).await(); + } + } diff --git a/pay-mall-domain/src/main/java/edu/whut/domain/order/adapter/event/PaySuccessMessageEvent.java b/pay-mall-domain/src/main/java/edu/whut/domain/order/adapter/event/PaySuccessMessageEvent.java index 27df05d..e9a04a2 100644 --- a/pay-mall-domain/src/main/java/edu/whut/domain/order/adapter/event/PaySuccessMessageEvent.java +++ b/pay-mall-domain/src/main/java/edu/whut/domain/order/adapter/event/PaySuccessMessageEvent.java @@ -5,6 +5,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.lang3.RandomStringUtils; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.Date; @@ -13,6 +14,15 @@ import java.util.Date; @Component public class PaySuccessMessageEvent extends BaseEvent { + @Value("${spring.rabbitmq.config.producer.topic_order_pay_success.routing_key}") + private String TOPIC_ORDER_PAY_SUCCESS; + + /** + * 构造一个携带业务数据的 EventMessage, + * id:随机生成的唯一流水号 + * timestamp:当前时间 + * data:具体的 PaySuccessMessage 对象 + */ @Override public EventMessage buildEventMessage(PaySuccessMessage data) { return EventMessage.builder() @@ -24,7 +34,7 @@ public class PaySuccessMessageEvent extends BaseEvent outTradeNoList) { repository.changeOrderMarketSettlement(outTradeNoList); diff --git a/pay-mall-infrastructure/pom.xml b/pay-mall-infrastructure/pom.xml index 8efbd73..1ae46e1 100644 --- a/pay-mall-infrastructure/pom.xml +++ b/pay-mall-infrastructure/pom.xml @@ -39,6 +39,10 @@ group-buying-sys-api 1.0-SNAPSHOT + + org.springframework.boot + spring-boot-starter-amqp + edu.whut diff --git a/pay-mall-infrastructure/src/main/java/edu/whut/infrastructure/adapter/port/ProductPort.java b/pay-mall-infrastructure/src/main/java/edu/whut/infrastructure/adapter/port/ProductPort.java index 3cf8270..a385d79 100644 --- a/pay-mall-infrastructure/src/main/java/edu/whut/infrastructure/adapter/port/ProductPort.java +++ b/pay-mall-infrastructure/src/main/java/edu/whut/infrastructure/adapter/port/ProductPort.java @@ -32,7 +32,8 @@ public class ProductPort implements IProductPort { private String chanel; @Value("${app.config.group-buy-market.notify-url}") private String notifyUrl; - + @Value("${app.config.group-buy-market.notify-type}") + private String notifyType; //http or mq private final IGroupBuyMarketService groupBuyMarketService; private final ProductRPC productRPC; @@ -68,7 +69,13 @@ public class ProductPort implements IProductPort { requestDTO.setSource(source); requestDTO.setChannel(chanel); requestDTO.setOutTradeNo(orderId); - requestDTO.setNotifyUrl(notifyUrl); + + // 根据配置决定用 HTTP 回调还是 MQ + if ("HTTP".equalsIgnoreCase(notifyType)) { + requestDTO.setNotifyUrl(notifyUrl); + } else { + requestDTO.setNotifyMQ(); + } try { // 发起 HTTP 请求,执行营销锁单 diff --git a/pay-mall-infrastructure/src/main/java/edu/whut/infrastructure/adapter/repository/OrderRepository.java b/pay-mall-infrastructure/src/main/java/edu/whut/infrastructure/adapter/repository/OrderRepository.java index 3976020..3b3e7d8 100644 --- a/pay-mall-infrastructure/src/main/java/edu/whut/infrastructure/adapter/repository/OrderRepository.java +++ b/pay-mall-infrastructure/src/main/java/edu/whut/infrastructure/adapter/repository/OrderRepository.java @@ -13,6 +13,7 @@ import edu.whut.domain.order.model.valobj.MarketTypeVO; import edu.whut.domain.order.model.valobj.OrderStatusVO; import edu.whut.infrastructure.dao.IOrderDao; import edu.whut.infrastructure.dao.po.PayOrder; +import edu.whut.infrastructure.event.EventPublisher; import edu.whut.types.event.BaseEvent; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Repository; @@ -31,6 +32,8 @@ public class OrderRepository implements IOrderRepository { private final EventBus eventBus; + private final EventPublisher eventPublisher; + /** * 保存新订单到数据库 */ @@ -124,7 +127,10 @@ public class OrderRepository implements IOrderRepository { .build() ); PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = evtMsg.getData(); - eventBus.post(JSON.toJSONString(paySuccessMessage)); //发布到 EventBus + // 旧版发送消息方式 +// eventBus.post(JSON.toJSONString(paySuccessMessage)); //发布到 EventBus + + eventPublisher.publish(paySuccessMessageEvent.topic(), JSON.toJSONString(paySuccessMessage)); } /** @@ -164,20 +170,32 @@ public class OrderRepository implements IOrderRepository { } + /** + * 拼团成功后 + * 将该拼团中的交易号列表对应的拼团订单更新为“已结算”状态, + * 并逐条发送支付成功消息到事件总线 + */ @Override public void changeOrderMarketSettlement(List outTradeNoList) { - // 更新拼团结算状态 + // 1. 更新数据库中这些订单的“拼团结算状态”为已结算 orderDao.changeOrderMarketSettlement(outTradeNoList); - // 循环成功发送消息 - 一般在公司的场景里,还会有job任务扫描超时没有结算的订单,查询订单状态。查询对方服务端的接口,会被限制一次查询多少,频次多少。 + // 2. 遍历每个交易号,构造支付成功事件并发送到 MQ(或其它事件总线) outTradeNoList.forEach(outTradeNo -> { - BaseEvent.EventMessage paySuccessMessageEventMessage = paySuccessMessageEvent.buildEventMessage( - PaySuccessMessageEvent.PaySuccessMessage.builder() - .tradeNo(outTradeNo) - .build()); - PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = paySuccessMessageEventMessage.getData(); + // 构建事件消息体 + BaseEvent.EventMessage msg = + paySuccessMessageEvent.buildEventMessage( + PaySuccessMessageEvent.PaySuccessMessage.builder() + .tradeNo(outTradeNo) // 填入当前交易号 + .build() + ); + // 获取消息体中的业务数据 + PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = msg.getData(); + // 旧版发送消息方式 +// eventBus.post(JSON.toJSONString(paySuccessMessage)); - eventBus.post(JSON.toJSONString(paySuccessMessage)); + // 发布消息到指定 Topic(Routing Key) + eventPublisher.publish(paySuccessMessageEvent.topic(), JSON.toJSONString(paySuccessMessage)); }); } diff --git a/pay-mall-infrastructure/src/main/java/edu/whut/infrastructure/event/EventPublisher.java b/pay-mall-infrastructure/src/main/java/edu/whut/infrastructure/event/EventPublisher.java new file mode 100644 index 0000000..1bfa7c3 --- /dev/null +++ b/pay-mall-infrastructure/src/main/java/edu/whut/infrastructure/event/EventPublisher.java @@ -0,0 +1,36 @@ +package edu.whut.infrastructure.event; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.MessageDeliveryMode; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * 消息发送 + */ +@Slf4j +@Component +public class EventPublisher { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Value("${spring.rabbitmq.config.producer.topic_order_pay_success.exchange}") + private String exchangeName; + + public void publish(String routingKey, String message) { + try { + rabbitTemplate.convertAndSend(exchangeName, routingKey, message, m -> { + // 持久化消息配置 + m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); + return m; + }); + } catch (Exception e) { + log.error("发送MQ消息失败 routingKey:{} message:{}", routingKey, message, e); + throw e; + } + } + +} diff --git a/pay-mall-trigger/src/main/java/edu/whut/trigger/http/AliPayController.java b/pay-mall-trigger/src/main/java/edu/whut/trigger/http/AliPayController.java index 10cdb6c..bf3a7b8 100644 --- a/pay-mall-trigger/src/main/java/edu/whut/trigger/http/AliPayController.java +++ b/pay-mall-trigger/src/main/java/edu/whut/trigger/http/AliPayController.java @@ -84,7 +84,7 @@ public class AliPayController implements IPayService { orderService.changeOrderMarketSettlement(requestDTO.getOutTradeNoList()); return "success"; } catch (Exception e) { - log.info("拼团回调,组队完成,结算失败 {}", JSON.toJSONString(requestDTO)); + log.error("拼团回调,组队完成,结算失败 {}", JSON.toJSONString(requestDTO)); return "error"; } } diff --git a/pay-mall-trigger/src/main/java/edu/whut/trigger/listener/OrderPaySuccessListener.java b/pay-mall-trigger/src/main/java/edu/whut/trigger/listener/OrderPaySuccessListener.java index e469e8a..52ee40d 100644 --- a/pay-mall-trigger/src/main/java/edu/whut/trigger/listener/OrderPaySuccessListener.java +++ b/pay-mall-trigger/src/main/java/edu/whut/trigger/listener/OrderPaySuccessListener.java @@ -6,9 +6,15 @@ import edu.whut.domain.goods.service.IGoodsService; import edu.whut.domain.order.adapter.event.PaySuccessMessageEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** + * 先TeamSuccessTopicListener,再OrderPaySuccessListener * 支付成功回调消息 */ @Slf4j @@ -17,16 +23,44 @@ import org.springframework.stereotype.Component; public class OrderPaySuccessListener { private final IGoodsService goodsService; - @Subscribe - public void handleEvent(String paySuccessMessageJson) { - log.info("收到支付成功消息 {}", paySuccessMessageJson); - PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = JSON.parseObject(paySuccessMessageJson, PaySuccessMessageEvent.PaySuccessMessage.class); + // @Subscribe - 旧版发布订阅方式 + @RabbitListener( + bindings = @QueueBinding( + value = @Queue(value = "${spring.rabbitmq.config.consumer.topic_order_pay_success.queue}"), + exchange = @Exchange(value = "${spring.rabbitmq.config.consumer.topic_order_pay_success.exchange}", type = ExchangeTypes.TOPIC), + key = "${spring.rabbitmq.config.consumer.topic_order_pay_success.routing_key}" + ) + ) + public void listener(String paySuccessMessageJson) { + try { + log.info("收到支付成功消息 {}", paySuccessMessageJson); - log.info("模拟发货(如;发货、充值、开户员、返利),单号:{}", paySuccessMessage.getTradeNo()); + PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = JSON.parseObject(paySuccessMessageJson, PaySuccessMessageEvent.PaySuccessMessage.class); - // 变更订单状态 - 发货完成&结算 - goodsService.changeOrderDealDone(paySuccessMessage.getTradeNo()); + log.info("模拟发货(如;发货、充值、开户员、返利),单号:{}", paySuccessMessage.getTradeNo()); + + // 变更订单状态 - 发货完成&结算 + goodsService.changeOrderDealDone(paySuccessMessage.getTradeNo()); + + // 可以打开测试,MQ 消费失败,会抛异常,之后重试消费。这个也是最终执行的重要手段。 + // throw new RuntimeException("重试消费"); + } catch (Exception e) { + log.error("收到支付成功消息失败 {}", paySuccessMessageJson,e); + throw e; + } } +// @Subscribe +// public void handleEvent(String paySuccessMessageJson) { +// log.info("收到支付成功消息 {}", paySuccessMessageJson); +// +// PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = JSON.parseObject(paySuccessMessageJson, PaySuccessMessageEvent.PaySuccessMessage.class); +// +// log.info("模拟发货(如;发货、充值、开户员、返利),单号:{}", paySuccessMessage.getTradeNo()); +// +// // 变更订单状态 - 发货完成&结算 +// goodsService.changeOrderDealDone(paySuccessMessage.getTradeNo()); +// } + } diff --git a/pay-mall-trigger/src/main/java/edu/whut/trigger/listener/TeamSuccessTopicListener.java b/pay-mall-trigger/src/main/java/edu/whut/trigger/listener/TeamSuccessTopicListener.java index fff155b..4a6c41f 100644 --- a/pay-mall-trigger/src/main/java/edu/whut/trigger/listener/TeamSuccessTopicListener.java +++ b/pay-mall-trigger/src/main/java/edu/whut/trigger/listener/TeamSuccessTopicListener.java @@ -1,5 +1,8 @@ package edu.whut.trigger.listener; +import com.alibaba.fastjson.JSON; +import edu.whut.api.dto.NotifyRequestDTO; +import edu.whut.domain.order.service.IOrderService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; @@ -8,13 +11,18 @@ import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; +import javax.annotation.Resource; + /** - * 结算完成消息监听 + * 拼团完成消息监听 */ @Slf4j @Component public class TeamSuccessTopicListener { + @Resource + private IOrderService orderService; + /** * 指定消费队列 */ @@ -26,7 +34,15 @@ public class TeamSuccessTopicListener { ) ) public void listener(String message) { - log.info("接收消息:{}", message); + try { + NotifyRequestDTO requestDTO = JSON.parseObject(message, NotifyRequestDTO.class); + log.info("拼团回调,组队完成,结算开始 {}", JSON.toJSONString(requestDTO)); + // 营销结算 + orderService.changeOrderMarketSettlement(requestDTO.getOutTradeNoList()); + } catch (Exception e) { + log.error("拼团回调,组队完成,结算失败 {}", message, e); + throw e; + } } } diff --git a/pay-mall-types/src/main/java/edu/whut/types/event/BaseEvent.java b/pay-mall-types/src/main/java/edu/whut/types/event/BaseEvent.java index 6a89298..0e3b344 100644 --- a/pay-mall-types/src/main/java/edu/whut/types/event/BaseEvent.java +++ b/pay-mall-types/src/main/java/edu/whut/types/event/BaseEvent.java @@ -10,6 +10,9 @@ import java.util.Date; @Data public abstract class BaseEvent { + /** + * 构建事件消息:各子类会根据 T 的类型生成包含 id、timestamp、data 三个字段的 EventMessage + */ public abstract EventMessage buildEventMessage(T data); public abstract String topic();