From 42f05b1608f2b5f9c457dbcd45cddf5d3909aedb Mon Sep 17 00:00:00 2001 From: zhangsan <646228430@qq.com> Date: Mon, 21 Jul 2025 16:26:23 +0800 Subject: [PATCH] =?UTF-8?q?7.21=20=E5=88=86=E5=B8=83=E5=BC=8F=E7=8E=AF?= =?UTF-8?q?=E5=A2=83=E4=B8=8B=EF=BC=8C=E4=BA=92=E6=96=A5=E9=94=81=E6=89=A7?= =?UTF-8?q?=E8=A1=8Cjob=EF=BC=9B=E6=97=A0=E9=94=81=E5=8C=96=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=E9=AB=98=E5=B9=B6=E5=8F=91=E4=B8=8B=E6=8B=BC=E5=9B=A2?= =?UTF-8?q?=E9=94=81=E5=8D=95=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../trigger/MarketTradeControllerTest.java | 11 ++-- .../adapter/repository/ITradeRepository.java | 4 ++ .../entity/TradeLockRuleCommandEntity.java | 3 +- .../entity/TradeLockRuleFilterBackEntity.java | 3 + .../lock/TradeLockLockOrderService.java | 11 +++- .../factory/TradeLockRuleFilterFactory.java | 24 +++++++- .../filter/TeamStockOccupyRuleFilter.java | 59 +++++++++++++++++++ .../lock/filter/UserTakeLimitRuleFilter.java | 7 ++- .../adapter/repository/TradeRepository.java | 47 +++++++++++++++ .../whut/trigger/job/GroupBuyNotifyJob.java | 22 ++++++- .../edu/whut/types/enums/ResponseCode.java | 2 + 11 files changed, 176 insertions(+), 17 deletions(-) create mode 100644 group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/filter/TeamStockOccupyRuleFilter.java diff --git a/group-buying-sys-app/src/test/java/edu/whut/test/trigger/MarketTradeControllerTest.java b/group-buying-sys-app/src/test/java/edu/whut/test/trigger/MarketTradeControllerTest.java index de189ea..c2a943c 100644 --- a/group-buying-sys-app/src/test/java/edu/whut/test/trigger/MarketTradeControllerTest.java +++ b/group-buying-sys-app/src/test/java/edu/whut/test/trigger/MarketTradeControllerTest.java @@ -27,8 +27,7 @@ public class MarketTradeControllerTest { @Test public void test_lockMarketPayOrder_mq() throws InterruptedException { LockMarketPayOrderRequestDTO lockMarketPayOrderRequestDTO = new LockMarketPayOrderRequestDTO(); - lockMarketPayOrderRequestDTO.setUserId("smile01" + - ""); + lockMarketPayOrderRequestDTO.setUserId("smile01"); lockMarketPayOrderRequestDTO.setTeamId(null); lockMarketPayOrderRequestDTO.setActivityId(100124L); lockMarketPayOrderRequestDTO.setGoodsId("9890001"); @@ -48,9 +47,9 @@ public class MarketTradeControllerTest { @Test public void test_lockMarketPayOrder() { LockMarketPayOrderRequestDTO lockMarketPayOrderRequestDTO = new LockMarketPayOrderRequestDTO(); - lockMarketPayOrderRequestDTO.setUserId("zy01"); + lockMarketPayOrderRequestDTO.setUserId("smile01"); lockMarketPayOrderRequestDTO.setTeamId(null); - lockMarketPayOrderRequestDTO.setActivityId(100124L); + lockMarketPayOrderRequestDTO.setActivityId(100123L); lockMarketPayOrderRequestDTO.setGoodsId("9890001"); lockMarketPayOrderRequestDTO.setSource("s01"); lockMarketPayOrderRequestDTO.setChannel("c01"); @@ -68,8 +67,8 @@ public class MarketTradeControllerTest { @Test public void test_lockMarketPayOrder_teamId_not_null() { LockMarketPayOrderRequestDTO lockMarketPayOrderRequestDTO = new LockMarketPayOrderRequestDTO(); - lockMarketPayOrderRequestDTO.setUserId("zy04"); - lockMarketPayOrderRequestDTO.setTeamId("36689983"); + lockMarketPayOrderRequestDTO.setUserId("smile03"); + lockMarketPayOrderRequestDTO.setTeamId("60683575"); lockMarketPayOrderRequestDTO.setActivityId(100123L); lockMarketPayOrderRequestDTO.setGoodsId("9890001"); lockMarketPayOrderRequestDTO.setSource("s01"); diff --git a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/adapter/repository/ITradeRepository.java b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/adapter/repository/ITradeRepository.java index 4517da7..7bb31a3 100644 --- a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/adapter/repository/ITradeRepository.java +++ b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/adapter/repository/ITradeRepository.java @@ -40,4 +40,8 @@ public interface ITradeRepository { int updateNotifyTaskStatusError(String teamId); int updateNotifyTaskStatusRetry(String teamId); + + boolean occupyTeamStock(String teamStockKey, String recoveryTeamStockKey, Integer target, Integer validTime); + + void recoveryTeamStock(String recoveryTeamStockKey, Integer validTime); } diff --git a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/model/entity/TradeLockRuleCommandEntity.java b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/model/entity/TradeLockRuleCommandEntity.java index 25ba87a..002f239 100644 --- a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/model/entity/TradeLockRuleCommandEntity.java +++ b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/model/entity/TradeLockRuleCommandEntity.java @@ -18,5 +18,6 @@ public class TradeLockRuleCommandEntity { private String userId; /** 活动ID */ private Long activityId; - + /** 组队ID */ + private String teamId; } diff --git a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/model/entity/TradeLockRuleFilterBackEntity.java b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/model/entity/TradeLockRuleFilterBackEntity.java index 3dac1e5..16d3678 100644 --- a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/model/entity/TradeLockRuleFilterBackEntity.java +++ b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/model/entity/TradeLockRuleFilterBackEntity.java @@ -17,4 +17,7 @@ public class TradeLockRuleFilterBackEntity { // 用户参与活动的订单量 private Integer userTakeOrderCount; + // 恢复组队库存缓存key + private String recoveryTeamStockKey; + } diff --git a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/TradeLockLockOrderService.java b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/TradeLockLockOrderService.java index 859f73b..6837ad9 100644 --- a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/TradeLockLockOrderService.java +++ b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/TradeLockLockOrderService.java @@ -46,6 +46,7 @@ public class TradeLockLockOrderService implements ITradeLockOrderService { TradeLockRuleFilterBackEntity tradeLockRuleFilterBackEntity = tradeRuleFilter.apply(TradeLockRuleCommandEntity.builder() .activityId(payActivityEntity.getActivityId()) .userId(userEntity.getUserId()) + .teamId(payActivityEntity.getTeamId()) .build(), new TradeLockRuleFilterFactory.DynamicContext()); @@ -60,8 +61,14 @@ public class TradeLockLockOrderService implements ITradeLockOrderService { .userTakeOrderCount(userTakeOrderCount) .build(); - // 锁定聚合订单 - 这会用户只是下单还没有支付。后续会有2个流程;支付成功、超时未支付(回退) - return repository.lockMarketPayOrder(groupBuyOrderAggregate); + try { + // 锁定聚合订单 - 这会用户只是下单还没有支付。后续会有2个流程;支付成功、超时未支付(回退) + return repository.lockMarketPayOrder(groupBuyOrderAggregate); + } catch (Exception e) { + // 记录失败恢复量 + repository.recoveryTeamStock(tradeLockRuleFilterBackEntity.getRecoveryTeamStockKey(), payActivityEntity.getValidTime()); + throw e; + } } } diff --git a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/factory/TradeLockRuleFilterFactory.java b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/factory/TradeLockRuleFilterFactory.java index a7a4f1e..18f7a04 100644 --- a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/factory/TradeLockRuleFilterFactory.java +++ b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/factory/TradeLockRuleFilterFactory.java @@ -3,6 +3,7 @@ import edu.whut.domain.trade.model.entity.GroupBuyActivityEntity; import edu.whut.domain.trade.model.entity.TradeLockRuleCommandEntity; import edu.whut.domain.trade.model.entity.TradeLockRuleFilterBackEntity; import edu.whut.domain.trade.service.lock.filter.ActivityUsabilityRuleFilter; +import edu.whut.domain.trade.service.lock.filter.TeamStockOccupyRuleFilter; import edu.whut.domain.trade.service.lock.filter.UserTakeLimitRuleFilter; import edu.whut.types.design.framework.link.model2.LinkArmory; import edu.whut.types.design.framework.link.model2.chain.BusinessLinkedList; @@ -11,6 +12,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; @@ -22,13 +24,17 @@ import org.springframework.stereotype.Service; public class TradeLockRuleFilterFactory { /** + * 组装责任链 * 通过 Spring @Bean 暴露:外部只需注入 BusinessLinkedList 即可调用 apply */ @Bean("tradeRuleFilter") - public BusinessLinkedList tradeRuleFilter(ActivityUsabilityRuleFilter activityUsabilityRuleFilter, UserTakeLimitRuleFilter userTakeLimitRuleFilter) { + public BusinessLinkedList tradeRuleFilter( + ActivityUsabilityRuleFilter activityUsabilityRuleFilter, + UserTakeLimitRuleFilter userTakeLimitRuleFilter, + TeamStockOccupyRuleFilter teamStockOccupyRuleFilter) { // 1. 组装链 LinkArmory linkArmory = - new LinkArmory<>("交易规则过滤链", activityUsabilityRuleFilter, userTakeLimitRuleFilter); + new LinkArmory<>("交易规则过滤链", activityUsabilityRuleFilter, userTakeLimitRuleFilter,teamStockOccupyRuleFilter); // 2. 返回链容器(即可作为责任链使用) return linkArmory.getLogicLink(); @@ -43,8 +49,22 @@ public class TradeLockRuleFilterFactory { @NoArgsConstructor public static class DynamicContext { /** 拼团活动信息,供后续节点复用 */ + private String teamOccupiedStockKey = "group_buy_market_team_occupied_stock_key_"; + private GroupBuyActivityEntity groupBuyActivity; + private Integer userTakeOrderCount; + + public String generateTeamStockKey(String teamId) { + if (StringUtils.isBlank(teamId)) return null; + return teamOccupiedStockKey + groupBuyActivity.getActivityId() + "_" + teamId; + } + + public String generateRecoveryTeamStockKey(String teamId) { + if (StringUtils.isBlank(teamId)) return null; + return teamOccupiedStockKey + groupBuyActivity.getActivityId() + "_" + teamId + "_recovery"; + } + } } diff --git a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/filter/TeamStockOccupyRuleFilter.java b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/filter/TeamStockOccupyRuleFilter.java new file mode 100644 index 0000000..ac2563c --- /dev/null +++ b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/filter/TeamStockOccupyRuleFilter.java @@ -0,0 +1,59 @@ +package edu.whut.domain.trade.service.lock.filter; + +import edu.whut.domain.trade.adapter.repository.ITradeRepository; +import edu.whut.domain.trade.model.entity.GroupBuyActivityEntity; +import edu.whut.domain.trade.model.entity.TradeLockRuleCommandEntity; +import edu.whut.domain.trade.model.entity.TradeLockRuleFilterBackEntity; +import edu.whut.domain.trade.service.lock.factory.TradeLockRuleFilterFactory; +import edu.whut.types.design.framework.link.model2.handler.ILogicHandler; +import edu.whut.types.enums.ResponseCode; +import edu.whut.types.exception.AppException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * 组队库存占用规则过滤 + */ +@Slf4j +@Service +public class TeamStockOccupyRuleFilter implements ILogicHandler { + + @Resource + private ITradeRepository repository; + + @Override + public TradeLockRuleFilterBackEntity apply(TradeLockRuleCommandEntity requestParameter, TradeLockRuleFilterFactory.DynamicContext dynamicContext) throws Exception { + log.info("交易规则过滤-组队库存校验{} activityId:{}", requestParameter.getUserId(), requestParameter.getActivityId()); + + // 1. teamId 为空,则为首次开团,不做拼团组队目标量库存限制 + String teamId = requestParameter.getTeamId(); + if (StringUtils.isBlank(teamId)) { + return TradeLockRuleFilterBackEntity.builder() + .userTakeOrderCount(dynamicContext.getUserTakeOrderCount()) + .build(); + } + + // 2. 参与拼团; 抢占库存;通过抢占 Redis 缓存库存,来降低对数据库的操作压力。 + GroupBuyActivityEntity groupBuyActivity = dynamicContext.getGroupBuyActivity(); + Integer target = groupBuyActivity.getTarget(); + Integer validTime = groupBuyActivity.getValidTime(); + String teamOccupiedStockKey = dynamicContext.generateTeamStockKey(teamId); + String recoveryTeamStockKey = dynamicContext.generateRecoveryTeamStockKey(teamId); + //抢占库存 + boolean status = repository.occupyTeamStock(teamOccupiedStockKey, recoveryTeamStockKey, target, validTime); + + if (!status) { + log.warn("交易规则过滤-组队库存校验{} activityId:{} 抢占失败:{}", requestParameter.getUserId(), requestParameter.getActivityId(), teamOccupiedStockKey); + throw new AppException(ResponseCode.E0008); + } + + return TradeLockRuleFilterBackEntity.builder() + .userTakeOrderCount(dynamicContext.getUserTakeOrderCount()) + .recoveryTeamStockKey(recoveryTeamStockKey) + .build(); + } + +} diff --git a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/filter/UserTakeLimitRuleFilter.java b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/filter/UserTakeLimitRuleFilter.java index 652a40d..f9eaa28 100644 --- a/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/filter/UserTakeLimitRuleFilter.java +++ b/group-buying-sys-domain/src/main/java/edu/whut/domain/trade/service/lock/filter/UserTakeLimitRuleFilter.java @@ -36,9 +36,10 @@ public class UserTakeLimitRuleFilter implements ILogicHandler target + recoveryCount) { + redisService.setAtomicLong(teamOccupiedStockKey, target); + return false; + } + + // 4) 如果用到了补偿名额(序号已经 > target),就从补偿池里减掉一个; + //if (occupySeq > target) { + // redisService.decr(recoveryKey); + //} + + // 1. 给每个产生的值加锁为兜底设计,虽然incr操作是原子的,基本不会产生一样的值。但在实际生产中,遇到过集群的运维配置问题,以及业务运营配置数据问题,导致incr得到的值相同。 + // 2. validTime + 60分钟,是一个延后时间的设计,让数据保留时间稍微长一些,便于排查问题。 + String lockKey = teamOccupiedStockKey + Constants.UNDERLINE + occupy; + Boolean lock = redisService.setNx(lockKey, validTime + 60, TimeUnit.MINUTES); + + if (!lock) { + log.info("组队库存加锁失败 {}", lockKey); + } + + return lock; + } + + @Override + public void recoveryTeamStock(String recoveryteamOccupiedStockKey, Integer validTime) { + // 首次组队拼团,是没有 teamId 的,所以不需要这个做处理。 + if (StringUtils.isBlank(recoveryteamOccupiedStockKey)) return; + + redisService.incr(recoveryteamOccupiedStockKey); + } } diff --git a/group-buying-sys-trigger/src/main/java/edu/whut/trigger/job/GroupBuyNotifyJob.java b/group-buying-sys-trigger/src/main/java/edu/whut/trigger/job/GroupBuyNotifyJob.java index 1d673ba..f3475be 100644 --- a/group-buying-sys-trigger/src/main/java/edu/whut/trigger/job/GroupBuyNotifyJob.java +++ b/group-buying-sys-trigger/src/main/java/edu/whut/trigger/job/GroupBuyNotifyJob.java @@ -3,9 +3,12 @@ import com.alibaba.fastjson.JSON; import edu.whut.domain.trade.service.ITradeSettlementOrderService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * 拼团完结回调通知任务;拼团回调任务表,实际公司场景会定时清理数据结转,不会有太多数据挤压 @@ -17,14 +20,27 @@ public class GroupBuyNotifyJob { private final ITradeSettlementOrderService tradeSettlementOrderService; - //每30秒执行一次 - @Scheduled(cron = "0/30 * * * * ?") + private final RedissonClient redissonClient; + + //每天零点执行一次 + @Scheduled(cron = "0 0 0 * * ?") public void exec() { + // 为什么加锁?分布式应用N台机器部署互备(一个应用实例挂了,还有另外可用的),任务调度会有N个同时执行,那么这里需要增加抢占机制,谁抢占到谁就执行。完毕后,下一轮继续抢占。 + // 获取锁句柄,并未真正获取锁 + RLock lock = redissonClient.getLock("group_buy_market_notify_job_exec"); try { - Map result = tradeSettlementOrderService. execSettlementNotifyJob(); + //尝试获取锁 waitTime = 3:如果当前锁已经被别人持有,调用线程最多等待 3 秒去重试获取;leaseTime = 0:不设过期时间,看门狗机制 + boolean isLocked = lock.tryLock(3, 0, TimeUnit.SECONDS); + if (!isLocked) return; + + Map result = tradeSettlementOrderService.execSettlementNotifyJob(); log.info("定时任务,回调通知拼团完结任务 result:{}", JSON.toJSONString(result)); } catch (Exception e) { log.error("定时任务,回调通知拼团完结任务失败", e); + } finally { + if (lock.isLocked() && lock.isHeldByCurrentThread()) { + lock.unlock(); + } } } diff --git a/group-buying-sys-types/src/main/java/edu/whut/types/enums/ResponseCode.java b/group-buying-sys-types/src/main/java/edu/whut/types/enums/ResponseCode.java index 6624e26..5569f7e 100644 --- a/group-buying-sys-types/src/main/java/edu/whut/types/enums/ResponseCode.java +++ b/group-buying-sys-types/src/main/java/edu/whut/types/enums/ResponseCode.java @@ -23,6 +23,8 @@ public enum ResponseCode { E0005("E0005", "拼团组队失败,记录更新为0"), E0006("E0006", "拼团组队完结,锁单量已达成"), E0007("E0007", "拼团人群限定,不可参与"), + E0008("E0008", "拼团组队失败,缓存库存不足"), + E0101("E0101", "拼团活动未生效"), E0102("E0102", "不在拼团活动有效时间内"), E0103("E0103", "当前用户参与此拼团次数已达上限"),