7.21 分布式环境下,互斥锁执行job;无锁化控制高并发下拼团锁单数

This commit is contained in:
zhangsan 2025-07-21 16:26:23 +08:00
parent 09769679af
commit 42f05b1608
11 changed files with 176 additions and 17 deletions

View File

@ -27,8 +27,7 @@ public class MarketTradeControllerTest {
@Test
public void test_lockMarketPayOrder_mq() throws InterruptedException {
LockMarketPayOrderRequestDTO lockMarketPayOrderRequestDTO = new LockMarketPayOrderRequestDTO();
lockMarketPayOrderRequestDTO.setUserId("smile01" +
"");
lockMarketPayOrderRequestDTO.setUserId("smile01");
lockMarketPayOrderRequestDTO.setTeamId(null);
lockMarketPayOrderRequestDTO.setActivityId(100124L);
lockMarketPayOrderRequestDTO.setGoodsId("9890001");
@ -48,9 +47,9 @@ public class MarketTradeControllerTest {
@Test
public void test_lockMarketPayOrder() {
LockMarketPayOrderRequestDTO lockMarketPayOrderRequestDTO = new LockMarketPayOrderRequestDTO();
lockMarketPayOrderRequestDTO.setUserId("zy01");
lockMarketPayOrderRequestDTO.setUserId("smile01");
lockMarketPayOrderRequestDTO.setTeamId(null);
lockMarketPayOrderRequestDTO.setActivityId(100124L);
lockMarketPayOrderRequestDTO.setActivityId(100123L);
lockMarketPayOrderRequestDTO.setGoodsId("9890001");
lockMarketPayOrderRequestDTO.setSource("s01");
lockMarketPayOrderRequestDTO.setChannel("c01");
@ -68,8 +67,8 @@ public class MarketTradeControllerTest {
@Test
public void test_lockMarketPayOrder_teamId_not_null() {
LockMarketPayOrderRequestDTO lockMarketPayOrderRequestDTO = new LockMarketPayOrderRequestDTO();
lockMarketPayOrderRequestDTO.setUserId("zy04");
lockMarketPayOrderRequestDTO.setTeamId("36689983");
lockMarketPayOrderRequestDTO.setUserId("smile03");
lockMarketPayOrderRequestDTO.setTeamId("60683575");
lockMarketPayOrderRequestDTO.setActivityId(100123L);
lockMarketPayOrderRequestDTO.setGoodsId("9890001");
lockMarketPayOrderRequestDTO.setSource("s01");

View File

@ -40,4 +40,8 @@ public interface ITradeRepository {
int updateNotifyTaskStatusError(String teamId);
int updateNotifyTaskStatusRetry(String teamId);
boolean occupyTeamStock(String teamStockKey, String recoveryTeamStockKey, Integer target, Integer validTime);
void recoveryTeamStock(String recoveryTeamStockKey, Integer validTime);
}

View File

@ -18,5 +18,6 @@ public class TradeLockRuleCommandEntity {
private String userId;
/** 活动ID */
private Long activityId;
/** 组队ID */
private String teamId;
}

View File

@ -17,4 +17,7 @@ public class TradeLockRuleFilterBackEntity {
// 用户参与活动的订单量
private Integer userTakeOrderCount;
// 恢复组队库存缓存key
private String recoveryTeamStockKey;
}

View File

@ -46,6 +46,7 @@ public class TradeLockLockOrderService implements ITradeLockOrderService {
TradeLockRuleFilterBackEntity tradeLockRuleFilterBackEntity = tradeRuleFilter.apply(TradeLockRuleCommandEntity.builder()
.activityId(payActivityEntity.getActivityId())
.userId(userEntity.getUserId())
.teamId(payActivityEntity.getTeamId())
.build(),
new TradeLockRuleFilterFactory.DynamicContext());
@ -60,8 +61,14 @@ public class TradeLockLockOrderService implements ITradeLockOrderService {
.userTakeOrderCount(userTakeOrderCount)
.build();
// 锁定聚合订单 - 这会用户只是下单还没有支付后续会有2个流程支付成功超时未支付回退
return repository.lockMarketPayOrder(groupBuyOrderAggregate);
try {
// 锁定聚合订单 - 这会用户只是下单还没有支付后续会有2个流程支付成功超时未支付回退
return repository.lockMarketPayOrder(groupBuyOrderAggregate);
} catch (Exception e) {
// 记录失败恢复量
repository.recoveryTeamStock(tradeLockRuleFilterBackEntity.getRecoveryTeamStockKey(), payActivityEntity.getValidTime());
throw e;
}
}
}

View File

@ -3,6 +3,7 @@ import edu.whut.domain.trade.model.entity.GroupBuyActivityEntity;
import edu.whut.domain.trade.model.entity.TradeLockRuleCommandEntity;
import edu.whut.domain.trade.model.entity.TradeLockRuleFilterBackEntity;
import edu.whut.domain.trade.service.lock.filter.ActivityUsabilityRuleFilter;
import edu.whut.domain.trade.service.lock.filter.TeamStockOccupyRuleFilter;
import edu.whut.domain.trade.service.lock.filter.UserTakeLimitRuleFilter;
import edu.whut.types.design.framework.link.model2.LinkArmory;
import edu.whut.types.design.framework.link.model2.chain.BusinessLinkedList;
@ -11,6 +12,7 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
@ -22,13 +24,17 @@ import org.springframework.stereotype.Service;
public class TradeLockRuleFilterFactory {
/**
* 组装责任链
* 通过 Spring @Bean 暴露外部只需注入 BusinessLinkedList 即可调用 apply
*/
@Bean("tradeRuleFilter")
public BusinessLinkedList<TradeLockRuleCommandEntity, DynamicContext, TradeLockRuleFilterBackEntity> tradeRuleFilter(ActivityUsabilityRuleFilter activityUsabilityRuleFilter, UserTakeLimitRuleFilter userTakeLimitRuleFilter) {
public BusinessLinkedList<TradeLockRuleCommandEntity, DynamicContext, TradeLockRuleFilterBackEntity> tradeRuleFilter(
ActivityUsabilityRuleFilter activityUsabilityRuleFilter,
UserTakeLimitRuleFilter userTakeLimitRuleFilter,
TeamStockOccupyRuleFilter teamStockOccupyRuleFilter) {
// 1. 组装链
LinkArmory<TradeLockRuleCommandEntity, DynamicContext, TradeLockRuleFilterBackEntity> linkArmory =
new LinkArmory<>("交易规则过滤链", activityUsabilityRuleFilter, userTakeLimitRuleFilter);
new LinkArmory<>("交易规则过滤链", activityUsabilityRuleFilter, userTakeLimitRuleFilter,teamStockOccupyRuleFilter);
// 2. 返回链容器即可作为责任链使用
return linkArmory.getLogicLink();
@ -43,8 +49,22 @@ public class TradeLockRuleFilterFactory {
@NoArgsConstructor
public static class DynamicContext {
/** 拼团活动信息,供后续节点复用 */
private String teamOccupiedStockKey = "group_buy_market_team_occupied_stock_key_";
private GroupBuyActivityEntity groupBuyActivity;
private Integer userTakeOrderCount;
public String generateTeamStockKey(String teamId) {
if (StringUtils.isBlank(teamId)) return null;
return teamOccupiedStockKey + groupBuyActivity.getActivityId() + "_" + teamId;
}
public String generateRecoveryTeamStockKey(String teamId) {
if (StringUtils.isBlank(teamId)) return null;
return teamOccupiedStockKey + groupBuyActivity.getActivityId() + "_" + teamId + "_recovery";
}
}
}

View File

@ -0,0 +1,59 @@
package edu.whut.domain.trade.service.lock.filter;
import edu.whut.domain.trade.adapter.repository.ITradeRepository;
import edu.whut.domain.trade.model.entity.GroupBuyActivityEntity;
import edu.whut.domain.trade.model.entity.TradeLockRuleCommandEntity;
import edu.whut.domain.trade.model.entity.TradeLockRuleFilterBackEntity;
import edu.whut.domain.trade.service.lock.factory.TradeLockRuleFilterFactory;
import edu.whut.types.design.framework.link.model2.handler.ILogicHandler;
import edu.whut.types.enums.ResponseCode;
import edu.whut.types.exception.AppException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* 组队库存占用规则过滤
*/
@Slf4j
@Service
public class TeamStockOccupyRuleFilter implements ILogicHandler<TradeLockRuleCommandEntity, TradeLockRuleFilterFactory.DynamicContext, TradeLockRuleFilterBackEntity> {
@Resource
private ITradeRepository repository;
@Override
public TradeLockRuleFilterBackEntity apply(TradeLockRuleCommandEntity requestParameter, TradeLockRuleFilterFactory.DynamicContext dynamicContext) throws Exception {
log.info("交易规则过滤-组队库存校验{} activityId:{}", requestParameter.getUserId(), requestParameter.getActivityId());
// 1. teamId 为空则为首次开团不做拼团组队目标量库存限制
String teamId = requestParameter.getTeamId();
if (StringUtils.isBlank(teamId)) {
return TradeLockRuleFilterBackEntity.builder()
.userTakeOrderCount(dynamicContext.getUserTakeOrderCount())
.build();
}
// 2. 参与拼团 抢占库存通过抢占 Redis 缓存库存来降低对数据库的操作压力
GroupBuyActivityEntity groupBuyActivity = dynamicContext.getGroupBuyActivity();
Integer target = groupBuyActivity.getTarget();
Integer validTime = groupBuyActivity.getValidTime();
String teamOccupiedStockKey = dynamicContext.generateTeamStockKey(teamId);
String recoveryTeamStockKey = dynamicContext.generateRecoveryTeamStockKey(teamId);
//抢占库存
boolean status = repository.occupyTeamStock(teamOccupiedStockKey, recoveryTeamStockKey, target, validTime);
if (!status) {
log.warn("交易规则过滤-组队库存校验{} activityId:{} 抢占失败:{}", requestParameter.getUserId(), requestParameter.getActivityId(), teamOccupiedStockKey);
throw new AppException(ResponseCode.E0008);
}
return TradeLockRuleFilterBackEntity.builder()
.userTakeOrderCount(dynamicContext.getUserTakeOrderCount())
.recoveryTeamStockKey(recoveryTeamStockKey)
.build();
}
}

View File

@ -36,9 +36,10 @@ public class UserTakeLimitRuleFilter implements ILogicHandler<TradeLockRuleComma
throw new AppException(ResponseCode.E0103);
}
return TradeLockRuleFilterBackEntity.builder()
.userTakeOrderCount(count)
.build();
dynamicContext.setUserTakeOrderCount(count);
// 走到下一个责任链节点
return next(requestParameter, dynamicContext);
}
}

View File

@ -18,6 +18,7 @@ import edu.whut.infrastructure.dao.po.GroupBuyOrder;
import edu.whut.infrastructure.dao.po.GroupBuyOrderList;
import edu.whut.infrastructure.dao.po.NotifyTask;
import edu.whut.infrastructure.dcc.DCCService;
import edu.whut.infrastructure.redis.IRedisService;
import edu.whut.types.common.Constants;
import edu.whut.types.enums.ActivityStatusEnumVO;
import edu.whut.types.enums.GroupBuyOrderStatusEnumVO;
@ -35,6 +36,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
@ -59,6 +61,8 @@ public class TradeRepository implements ITradeRepository {
@Value("${spring.rabbitmq.config.producer.topic_team_success.routing_key}")
private String topic_team_success;
private final IRedisService redisService;
/**
* 根据外部交易号 & 用户id 查询未支付的锁单记录用于幂等
*/
@ -399,4 +403,47 @@ public class TradeRepository implements ITradeRepository {
public int updateNotifyTaskStatusRetry(String teamId) {
return notifyTaskDao.updateNotifyTaskStatusRetry(teamId);
}
/**
* 占用库存
*/
@Override
public boolean occupyTeamStock(String teamOccupiedStockKey, String recoveryTeamStockKey, Integer target, Integer validTime) {
// 获取失败恢复量
Long recoveryCount = redisService.getAtomicLong(recoveryTeamStockKey);
recoveryCount = null == recoveryCount ? 0 : recoveryCount;
// 1. incr 得到值与总量和恢复量做对比恢复量为系统失败时候记录的量
// 2. 从有组队量开始相当于已经有了一个占用量所以要 +1因为团长开团的时候teamid为null但事实上锁单已经有一单了
long occupy = redisService.incr(teamOccupiedStockKey) + 1; //取teamOccupiedStockKey的值先自增再返回类似++i
if (occupy > target + recoveryCount) {
redisService.setAtomicLong(teamOccupiedStockKey, target);
return false;
}
// 4) 如果用到了补偿名额序号已经 > target就从补偿池里减掉一个;
//if (occupySeq > target) {
// redisService.decr(recoveryKey);
//}
// 1. 给每个产生的值加锁为兜底设计虽然incr操作是原子的基本不会产生一样的值但在实际生产中遇到过集群的运维配置问题以及业务运营配置数据问题导致incr得到的值相同
// 2. validTime + 60分钟是一个延后时间的设计让数据保留时间稍微长一些便于排查问题
String lockKey = teamOccupiedStockKey + Constants.UNDERLINE + occupy;
Boolean lock = redisService.setNx(lockKey, validTime + 60, TimeUnit.MINUTES);
if (!lock) {
log.info("组队库存加锁失败 {}", lockKey);
}
return lock;
}
@Override
public void recoveryTeamStock(String recoveryteamOccupiedStockKey, Integer validTime) {
// 首次组队拼团是没有 teamId 所以不需要这个做处理
if (StringUtils.isBlank(recoveryteamOccupiedStockKey)) return;
redisService.incr(recoveryteamOccupiedStockKey);
}
}

View File

@ -3,9 +3,12 @@ import com.alibaba.fastjson.JSON;
import edu.whut.domain.trade.service.ITradeSettlementOrderService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 拼团完结回调通知任务拼团回调任务表实际公司场景会定时清理数据结转不会有太多数据挤压
@ -17,14 +20,27 @@ public class GroupBuyNotifyJob {
private final ITradeSettlementOrderService tradeSettlementOrderService;
//每30秒执行一次
@Scheduled(cron = "0/30 * * * * ?")
private final RedissonClient redissonClient;
//每天零点执行一次
@Scheduled(cron = "0 0 0 * * ?")
public void exec() {
// 为什么加锁分布式应用N台机器部署互备一个应用实例挂了还有另外可用的任务调度会有N个同时执行那么这里需要增加抢占机制谁抢占到谁就执行完毕后下一轮继续抢占
// 获取锁句柄并未真正获取锁
RLock lock = redissonClient.getLock("group_buy_market_notify_job_exec");
try {
Map<String, Integer> result = tradeSettlementOrderService. execSettlementNotifyJob();
//尝试获取锁 waitTime = 3:如果当前锁已经被别人持有调用线程最多等待 3 秒去重试获取;leaseTime = 0:不设过期时间看门狗机制
boolean isLocked = lock.tryLock(3, 0, TimeUnit.SECONDS);
if (!isLocked) return;
Map<String, Integer> result = tradeSettlementOrderService.execSettlementNotifyJob();
log.info("定时任务,回调通知拼团完结任务 result:{}", JSON.toJSONString(result));
} catch (Exception e) {
log.error("定时任务,回调通知拼团完结任务失败", e);
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}

View File

@ -23,6 +23,8 @@ public enum ResponseCode {
E0005("E0005", "拼团组队失败记录更新为0"),
E0006("E0006", "拼团组队完结,锁单量已达成"),
E0007("E0007", "拼团人群限定,不可参与"),
E0008("E0008", "拼团组队失败,缓存库存不足"),
E0101("E0101", "拼团活动未生效"),
E0102("E0102", "不在拼团活动有效时间内"),
E0103("E0103", "当前用户参与此拼团次数已达上限"),