7.5 拼团成功后回调通知功能实现 springtask作兜底

This commit is contained in:
zhangsan 2025-07-05 13:55:28 +08:00
parent a8502d1dae
commit 7944d5769b
32 changed files with 616 additions and 33 deletions

View File

@ -159,6 +159,7 @@ CREATE TABLE `group_buy_order` (
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`valid_start_time` datetime NOT NULL COMMENT '拼团开始时间',
`valid_end_time` datetime NOT NULL COMMENT '拼团结束时间',
`notify_url` varchar(512) NOT NULL COMMENT '回调地址',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `uq_team_id`(`team_id` ASC) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '拼团订单表' ROW_FORMAT = Dynamic;
@ -217,7 +218,8 @@ CREATE TABLE `notify_task` (
`parameter_json` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '参数对象',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `uq_team_id` (`team_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
-- ----------------------------

View File

@ -22,5 +22,7 @@ public class LockMarketPayOrderRequestDTO {
private String channel;
// 外部交易单号
private String outTradeNo;
// 回调地址
private String notifyUrl;
}

View File

@ -0,0 +1,18 @@
package edu.whut.api.dto;
import lombok.Data;
import java.util.List;
/**
* 回调请求对象
*/
@Data
public class NotifyRequestDTO {
/** 组队ID */
private String teamId;
/** 外部单号 */
private List<String> outTradeNoList;
}

View File

@ -79,6 +79,16 @@
<version>2.9.0</version>
</dependency>
<!-- http 接口框架 https://bugstack.cn/md/road-map/http.html -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-sse</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>logging-interceptor</artifactId>
</dependency>
<!-- 工程模块;启动依赖 trigger->domain, infrastructure-->
<dependency>
<groupId>edu.whut</groupId>

View File

@ -3,9 +3,11 @@ package edu.whut;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@Configurable
@EnableScheduling
public class Application {
public static void main(String[] args){

View File

@ -0,0 +1,18 @@
package edu.whut.config;
import okhttp3.OkHttpClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* http 框架
*/
@Configuration
public class OKHttpClientConfig {
@Bean
public OkHttpClient httpClient() {
return new OkHttpClient();
}
}

View File

@ -17,6 +17,7 @@
<result column="status" property="status"/>
<result column="valid_start_time" property="validStartTime"/>
<result column="valid_end_time" property="validEndTime"/>
<result column="notify_url" property="notifyUrl"/>
<result column="create_time" property="createTime"/>
<result column="update_time" property="updateTime"/>
</resultMap>
@ -25,10 +26,10 @@
insert into group_buy_order(
team_id, activity_id, source, channel, original_price,
deduction_price, pay_price, target_count, complete_count,
lock_count, status, valid_start_time, valid_end_time, create_time, update_time
lock_count, status, valid_start_time, valid_end_time, notify_url, create_time, update_time
) values(
#{teamId}, #{activityId}, #{source}, #{channel}, #{originalPrice},
#{deductionPrice}, #{payPrice}, #{targetCount}, #{completeCount}, #{lockCount}, 0, #{validStartTime}, #{validEndTime} ,now(), now()
#{deductionPrice}, #{payPrice}, #{targetCount}, #{completeCount}, #{lockCount}, 0, #{validStartTime}, #{validEndTime} ,#{notifyUrl},now(), now()
)
</insert>
@ -66,7 +67,7 @@
</select>
<select id="queryGroupBuyTeamByTeamId" parameterType="java.lang.String" resultMap="dataMap">
select team_id, activity_id, target_count, complete_count, lock_count, status, valid_start_time, valid_end_time
select team_id, activity_id, target_count, complete_count, lock_count, status, valid_start_time, valid_end_time,notify_url
from group_buy_order where team_id = #{teamId}
</select>

View File

@ -20,4 +20,35 @@
values(#{activityId}, #{teamId}, #{notifyUrl}, #{notifyCount}, #{notifyStatus}, #{parameterJson}, now(),now())
</insert>
<select id="queryUnExecutedNotifyTaskList" resultMap="dataMap">
select activity_id, team_id, notify_url, notify_count, notify_status, parameter_json
from notify_task
where notify_status in (0, 2)
limit 50
</select>
<select id="queryUnExecutedNotifyTaskByTeamId" resultMap="dataMap">
select activity_id, team_id, notify_url, notify_count, notify_status, parameter_json
from notify_task
where team_id = #{teamId} and notify_status in (0, 2)
</select>
<update id="updateNotifyTaskStatusSuccess" parameterType="java.lang.String">
update notify_task
set notify_count = notify_count + 1, notify_status = 1, update_time = now()
where team_id = #{teamId}
</update>
<update id="updateNotifyTaskStatusError" parameterType="java.lang.String">
update notify_task
set notify_count = notify_count + 1, notify_status = 3, update_time = now()
where team_id = #{teamId}
</update>
<update id="updateNotifyTaskStatusRetry" parameterType="java.lang.String">
update notify_task
set notify_count = notify_count + 1, notify_status = 2, update_time = now()
where team_id = #{teamId}
</update>
</mapper>

View File

@ -24,14 +24,18 @@ public class TradeSettlementOrderServiceTest {
@Resource
private ITradeSettlementOrderService tradeSettlementOrderService;
/**
* 模拟拼团交易结算
* @throws Exception
*/
@Test
public void test_settlementMarketPayOrder() throws Exception {
//模拟支付成功后传入的信息
TradePaySuccessEntity tradePaySuccessEntity = new TradePaySuccessEntity();
tradePaySuccessEntity.setSource("s02");
tradePaySuccessEntity.setChannel("c02");
tradePaySuccessEntity.setUserId("zy03");
tradePaySuccessEntity.setOutTradeNo("251255361034");
tradePaySuccessEntity.setSource("s01");
tradePaySuccessEntity.setChannel("c01");
tradePaySuccessEntity.setUserId("zy01");
tradePaySuccessEntity.setOutTradeNo("451247062978");
tradePaySuccessEntity.setOutTradeTime(new Date()); //交易时间
TradePaySettlementEntity tradePaySettlementEntity = tradeSettlementOrderService.settlementMarketPayOrder(tradePaySuccessEntity);
log.info("请求参数:{}", JSON.toJSONString(tradePaySuccessEntity));

View File

@ -0,0 +1,49 @@
package edu.whut.test.infrastructure.gateway;
import edu.whut.infrastructure.gateway.GroupBuyNotifyService;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.io.IOException;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GroupBuyNotifyServiceTest {
@Resource
private GroupBuyNotifyService groupBuyNotifyService;
@Test
public void test_notify_api() throws Exception {
String notifyRequestDTOJSON = "{\"teamId\":\"57199993\",\"outTradeNoList\":\"038426231487,652896391719,619401409195\"}";
String notifyRequestDTOJSON2 = "{\"teamId\":\"57199993\",\"outTradeNoList\":[\"038426231487\",\"652896391719\",\"619401409195\"]}";
String response = groupBuyNotifyService.groupBuyNotify("http://127.0.0.1:8091/api/v1/test/group_buy_notify", notifyRequestDTOJSON2);
log.info("测试结果:{}", response);
}
@Test
public void test() throws IOException {
OkHttpClient client = new OkHttpClient();
MediaType mediaType = MediaType.parse("application/json");
RequestBody body = RequestBody.create(mediaType, "{\"teamId\":\"57199993\",\"outTradeNoList\":\"038426231487,652896391719,619401409195\"}");
Request request = new Request.Builder()
.url("http://127.0.0.1:8091/api/v1/test/group_buy_notify")
.post(body)
.addHeader("content-type", "application/json")
.build();
Response response = client.newCall(request).execute();
log.info("测试结果:{}", response);
}
}

View File

@ -25,15 +25,19 @@ public class MarketTradeControllerTest {
private IMarketTradeService marketTradeService;
/**
* 模拟第一个用户创建拼团
*/
@Test
public void test_lockMarketPayOrder() {
LockMarketPayOrderRequestDTO lockMarketPayOrderRequestDTO = new LockMarketPayOrderRequestDTO();
lockMarketPayOrderRequestDTO.setUserId("zy01");
lockMarketPayOrderRequestDTO.setTeamId(null);
lockMarketPayOrderRequestDTO.setActivityId(100123L);
lockMarketPayOrderRequestDTO.setActivityId(100124L);
lockMarketPayOrderRequestDTO.setGoodsId("9890001");
lockMarketPayOrderRequestDTO.setSource("s01");
lockMarketPayOrderRequestDTO.setChannel("c01");
lockMarketPayOrderRequestDTO.setNotifyUrl("http://127.0.0.1:8091/api/v1/test/group_buy_notify");
lockMarketPayOrderRequestDTO.setOutTradeNo(RandomStringUtils.randomNumeric(12));
Response<LockMarketPayOrderResponseDTO> lockMarketPayOrderResponseDTOResponse = marketTradeService.lockMarketPayOrder(lockMarketPayOrderRequestDTO);
@ -41,6 +45,9 @@ public class MarketTradeControllerTest {
log.info("测试结果 req:{} res:{}", JSON.toJSONString(lockMarketPayOrderRequestDTO), JSON.toJSONString(lockMarketPayOrderResponseDTOResponse));
}
/**
* 模拟其他人加入拼团
*/
@Test
public void test_lockMarketPayOrder_teamId_not_null() {
LockMarketPayOrderRequestDTO lockMarketPayOrderRequestDTO = new LockMarketPayOrderRequestDTO();
@ -50,6 +57,7 @@ public class MarketTradeControllerTest {
lockMarketPayOrderRequestDTO.setGoodsId("9890001");
lockMarketPayOrderRequestDTO.setSource("s01");
lockMarketPayOrderRequestDTO.setChannel("c01");
lockMarketPayOrderRequestDTO.setNotifyUrl("http://127.0.0.1:8091/api/v1/test/group_buy_notify");
lockMarketPayOrderRequestDTO.setOutTradeNo(RandomStringUtils.randomNumeric(12));
Response<LockMarketPayOrderResponseDTO> lockMarketPayOrderResponseDTOResponse = marketTradeService.lockMarketPayOrder(lockMarketPayOrderRequestDTO);

View File

@ -0,0 +1,12 @@
package edu.whut.domain.trade.adapter.port;
import edu.whut.domain.trade.model.entity.NotifyTaskEntity;
/**
* 交易接口服务接口
*/
public interface ITradePort {
String groupBuyNotify(NotifyTaskEntity notifyTask) throws Exception;
}

View File

@ -5,8 +5,11 @@ import edu.whut.domain.trade.model.aggregate.GroupBuyTeamSettlementAggregate;
import edu.whut.domain.trade.model.entity.GroupBuyActivityEntity;
import edu.whut.domain.trade.model.entity.GroupBuyTeamEntity;
import edu.whut.domain.trade.model.entity.MarketPayOrderEntity;
import edu.whut.domain.trade.model.entity.NotifyTaskEntity;
import edu.whut.domain.trade.model.valobj.GroupBuyProgressVO;
import java.util.List;
/**
* 交易仓储服务接口
*/
@ -24,7 +27,17 @@ public interface ITradeRepository {
GroupBuyTeamEntity queryGroupBuyTeamByTeamId(String teamId);
void settlementMarketPayOrder(GroupBuyTeamSettlementAggregate groupBuyTeamSettlementAggregate);
boolean settlementMarketPayOrder(GroupBuyTeamSettlementAggregate groupBuyTeamSettlementAggregate);
boolean isSCBlackIntercept(String source, String channel);
List<NotifyTaskEntity> queryUnExecutedNotifyTaskList();
List<NotifyTaskEntity> queryUnExecutedNotifyTaskList(String teamId);
int updateNotifyTaskStatusSuccess(String teamId);
int updateNotifyTaskStatusError(String teamId);
int updateNotifyTaskStatusRetry(String teamId);
}

View File

@ -32,5 +32,7 @@ public class GroupBuyTeamEntity {
private Date validStartTime;
/** 拼团结束时间 - 拼团有效时长 */
private Date validEndTime;
/** 回调地址 */
private String notifyUrl;
}

View File

@ -0,0 +1,38 @@
package edu.whut.domain.trade.model.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 回调任务实体
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class NotifyTaskEntity {
/**
* 拼单组队ID
*/
private String teamId;
/**
* 回调接口
*/
private String notifyUrl;
/**
* 回调次数
*/
private Integer notifyCount;
/**
* 参数对象
*/
private String parameterJson;
public String lockKey() {
return "notify_job_lock_key_" + this.teamId;
}
}

View File

@ -32,5 +32,6 @@ public class PayDiscountEntity {
private BigDecimal payPrice;
/** 外部交易单号-确保外部调用唯一幂等 */
private String outTradeNo;
/** 回调地址 */
private String notifyUrl;
}

View File

@ -32,5 +32,7 @@ public class TradeSettlementRuleFilterBackEntity {
private Date validStartTime;
/** 拼团结束时间 - 拼团有效时长 */
private Date validEndTime;
/** 回调地址 */
private String notifyUrl;
}

View File

@ -3,6 +3,8 @@ package edu.whut.domain.trade.service;
import edu.whut.domain.trade.model.entity.TradePaySettlementEntity;
import edu.whut.domain.trade.model.entity.TradePaySuccessEntity;
import java.util.Map;
/**
* 拼团交易结算服务接口
*/
@ -15,4 +17,22 @@ public interface ITradeSettlementOrderService {
*/
TradePaySettlementEntity settlementMarketPayOrder(TradePaySuccessEntity tradePaySuccessEntity) throws Exception;
/**
* 执行结算通知任务
*
* @return 结算数量
* @throws Exception 异常
*/
Map<String, Integer> execSettlementNotifyJob() throws Exception;
/**
* 执行结算通知任务
*
* @param teamId 指定结算组ID
* @return 结算数量
* @throws Exception 异常
*/
Map<String, Integer> execSettlementNotifyJob(String teamId) throws Exception;
}

View File

@ -1,16 +1,21 @@
package edu.whut.domain.trade.service.settlement;
import com.alibaba.fastjson.JSON;
import edu.whut.domain.trade.adapter.port.ITradePort;
import edu.whut.domain.trade.adapter.repository.ITradeRepository;
import edu.whut.domain.trade.model.aggregate.GroupBuyTeamSettlementAggregate;
import edu.whut.domain.trade.model.entity.*;
import edu.whut.domain.trade.service.ITradeSettlementOrderService;
import edu.whut.domain.trade.service.settlement.factory.TradeSettlementRuleFilterFactory;
import edu.whut.types.design.framework.link.model2.chain.BusinessLinkedList;
import edu.whut.types.enums.NotifyTaskHTTPEnumVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 拼团交易结算服务
@ -21,6 +26,8 @@ import javax.annotation.Resource;
public class TradeSettlementOrderService implements ITradeSettlementOrderService {
private final ITradeRepository repository;
private final ITradePort port;
private final BusinessLinkedList<TradeSettlementRuleCommandEntity, TradeSettlementRuleFilterFactory.DynamicContext, TradeSettlementRuleFilterBackEntity> tradeSettlementRuleFilter;
@ -37,7 +44,7 @@ public class TradeSettlementOrderService implements ITradeSettlementOrderService
BeanUtils.copyProperties(tradePaySuccessEntity, command);
TradeSettlementRuleFilterBackEntity tradeSettlementRuleFilterBackEntity = tradeSettlementRuleFilter.apply(
command, new TradeSettlementRuleFilterFactory.DynamicContext());
String teamId=tradeSettlementRuleFilterBackEntity.getTeamId();
// 2. 查询组团信息
GroupBuyTeamEntity groupBuyTeamEntity = new GroupBuyTeamEntity();
BeanUtils.copyProperties(tradeSettlementRuleFilterBackEntity, groupBuyTeamEntity);
@ -50,17 +57,86 @@ public class TradeSettlementOrderService implements ITradeSettlementOrderService
.build();
// 4. 更新数据库拼团交易结算
repository.settlementMarketPayOrder(groupBuyTeamSettlementAggregate);
boolean isNotify =repository.settlementMarketPayOrder(groupBuyTeamSettlementAggregate);
// 5. 返回结算信息 - 公司中开发这样的流程时候会根据外部需要进行值的设置
// 5. 拼团成功后自动进行组队回调处理 - 处理失败也会有定时任务补偿通过这样的方式可以减轻任务调度提高时效性
if (isNotify) {
Map<String, Integer> notifyResultMap = execSettlementNotifyJob(teamId);
log.info("回调通知 拼团成功 result:{}", JSON.toJSONString(notifyResultMap));
}
// 6. 返回结算信息 - 公司中开发这样的流程时候会根据外部需要进行值的设置
return TradePaySettlementEntity.builder()
.source(tradePaySuccessEntity.getSource())
.channel(tradePaySuccessEntity.getChannel())
.userId(tradePaySuccessEntity.getUserId())
.teamId(tradeSettlementRuleFilterBackEntity.getTeamId())
.teamId(teamId)
.activityId(groupBuyTeamEntity.getActivityId())
.outTradeNo(tradePaySuccessEntity.getOutTradeNo())
.build();
}
/**
* 由定时任务触发定时扫描并执行所有未执行的结算通知任务
*/
@Override
public Map<String, Integer> execSettlementNotifyJob() throws Exception {
log.info("拼团交易-执行结算通知任务");
// 查询未执行任务
List<NotifyTaskEntity> notifyTaskEntityList = repository.queryUnExecutedNotifyTaskList();
return execSettlementNotifyJob(notifyTaskEntityList);
}
/**
* 拼团成功后触发传入 teamId执行该 team 下的结算通知回调
*/
@Override
public Map<String, Integer> execSettlementNotifyJob(String teamId) throws Exception {
log.info("拼团交易-执行结算通知回调,指定 teamId:{}", teamId);
List<NotifyTaskEntity> notifyTaskEntityList = repository.queryUnExecutedNotifyTaskList(teamId);
return execSettlementNotifyJob(notifyTaskEntityList);
}
/**
*公共逻辑抽取遍历任务列表调用外部服务更新数据库并计数
* @param notifyTaskEntityList 待处理的通知任务列表
* @return key任务总量successCounterrorCountretryCount
*/
private Map<String, Integer> execSettlementNotifyJob(List<NotifyTaskEntity> notifyTaskEntityList) throws Exception {
int successCount = 0, errorCount = 0, retryCount = 0;
for (NotifyTaskEntity notifyTask : notifyTaskEntityList) {
// 回调处理 success 成功error 失败
String response = port.groupBuyNotify(notifyTask);
// 更新状态判断&变更数据库表回调任务状态
if (NotifyTaskHTTPEnumVO.SUCCESS.getCode().equals(response)) {
int updateCount = repository.updateNotifyTaskStatusSuccess(notifyTask.getTeamId());
if (1 == updateCount) {
successCount += 1;
}
} else if (NotifyTaskHTTPEnumVO.ERROR.getCode().equals(response)) {
if (notifyTask.getNotifyCount() < 5) {
// 失败但可以重试 标记为 RETRY等待下一次收集 待处理的通知任务列表
if (repository.updateNotifyTaskStatusRetry(notifyTask.getTeamId()) == 1) {
retryCount++;
}
} else {
// 已达最大重试次数 标记为 ERROR不再重试
if (repository.updateNotifyTaskStatusError(notifyTask.getTeamId()) == 1) {
errorCount++;
}
}
}
}
Map<String, Integer> resultMap = new HashMap<>();
resultMap.put("totalCount", notifyTaskEntityList.size());
resultMap.put("successCount", successCount);
resultMap.put("errorCount", errorCount);
resultMap.put("retryCount", retryCount);
return resultMap;
}
}

View File

@ -6,6 +6,7 @@ import edu.whut.domain.trade.model.entity.TradeSettlementRuleFilterBackEntity;
import edu.whut.domain.trade.service.settlement.factory.TradeSettlementRuleFilterFactory;
import edu.whut.types.design.framework.link.model2.handler.ILogicHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
/**
@ -22,17 +23,10 @@ public class EndRuleFilter implements ILogicHandler<TradeSettlementRuleCommandEn
// 获取上下文对象
GroupBuyTeamEntity groupBuyTeamEntity = dynamicContext.getGroupBuyTeamEntity();
TradeSettlementRuleFilterBackEntity back = new TradeSettlementRuleFilterBackEntity();
BeanUtils.copyProperties(groupBuyTeamEntity, back);
// 返回封装数据
return TradeSettlementRuleFilterBackEntity.builder()
.teamId(groupBuyTeamEntity.getTeamId())
.activityId(groupBuyTeamEntity.getActivityId())
.targetCount(groupBuyTeamEntity.getTargetCount())
.completeCount(groupBuyTeamEntity.getCompleteCount())
.lockCount(groupBuyTeamEntity.getLockCount())
.status(groupBuyTeamEntity.getStatus())
.validStartTime(groupBuyTeamEntity.getValidStartTime())
.validEndTime(groupBuyTeamEntity.getValidEndTime())
.build();
return back;
}
}

View File

@ -24,6 +24,11 @@
<artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-sse</artifactId>
</dependency>
<!-- 系统模块 -->
<dependency>
<groupId>edu.whut</groupId>

View File

@ -0,0 +1,50 @@
package edu.whut.infrastructure.adapter.port;
import edu.whut.domain.trade.adapter.port.ITradePort;
import edu.whut.domain.trade.model.entity.NotifyTaskEntity;
import edu.whut.infrastructure.gateway.GroupBuyNotifyService;
import edu.whut.infrastructure.redis.IRedisService;
import edu.whut.types.enums.NotifyTaskHTTPEnumVO;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Service
@RequiredArgsConstructor
public class TradePort implements ITradePort {
private final GroupBuyNotifyService groupBuyNotifyService;
private final IRedisService redisService;
@Override
public String groupBuyNotify(NotifyTaskEntity notifyTask) throws Exception {
RLock lock = redisService.getLock(notifyTask.lockKey());
try {
// group-buy-market 拼团服务端会被部署到多台应用服务器上那么就会有很多任务一起执行这个时候要进行抢占避免被多次执行
// tryLock(long waitTime, long leaseTime, TimeUnit unit) 最大等待时间锁自动释放时间传0表示需手动释放
if (lock.tryLock(3, 0, TimeUnit.SECONDS)) {
try {
// 无效的 notifyUrl 则直接返回成功
if (StringUtils.isBlank(notifyTask.getNotifyUrl()) || "暂无".equals(notifyTask.getNotifyUrl())) {
return NotifyTaskHTTPEnumVO.SUCCESS.getCode();
}
return groupBuyNotifyService.groupBuyNotify(notifyTask.getNotifyUrl(), notifyTask.getParameterJson());
} finally {
//解锁
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
return NotifyTaskHTTPEnumVO.NULL.getCode();
} catch (Exception e) {
Thread.currentThread().interrupt();
return NotifyTaskHTTPEnumVO.NULL.getCode();
}
}
}

View File

@ -29,11 +29,9 @@ import org.springframework.beans.BeanUtils;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.*;
/**
@ -122,6 +120,7 @@ public class TradeRepository implements ITradeRepository {
.lockCount(1) // 首单已锁定
.validStartTime(currentDate)
.validEndTime(calendar.getTime())
.notifyUrl(discount.getNotifyUrl())
.build();
groupBuyOrderDao.insert(orderPo);
@ -235,7 +234,7 @@ public class TradeRepository implements ITradeRepository {
*/
@Transactional(timeout = 500)
@Override
public void settlementMarketPayOrder(GroupBuyTeamSettlementAggregate agg) {
public boolean settlementMarketPayOrder(GroupBuyTeamSettlementAggregate agg) {
// ========= 1. 聚合拆分 =========
UserEntity user = agg.getUserEntity();
@ -274,7 +273,7 @@ public class TradeRepository implements ITradeRepository {
NotifyTask task = new NotifyTask();
task.setActivityId(team.getActivityId());
task.setTeamId(team.getTeamId());
task.setNotifyUrl("暂无"); // TODO待配置回调地址
task.setNotifyUrl(team.getNotifyUrl());
task.setNotifyCount(0);
task.setNotifyStatus(0);
task.setParameterJson(JSON.toJSONString(new HashMap<String, Object>() {{
@ -283,11 +282,83 @@ public class TradeRepository implements ITradeRepository {
}}));
notifyTaskDao.insert(task);
return true;
}
return false;
}
/**
* 渠道是否被拦截
* @param source
* @param channel
* @return
*/
@Override
public boolean isSCBlackIntercept(String source, String channel) {
return dccService.isSCBlackIntercept(source, channel);
}
/**
* 查询 所有 未执行或待重试的通知任务
* @return 如果不存在相关任务则返回空列表否则返回实体列表
*/
@Override
public List<NotifyTaskEntity> queryUnExecutedNotifyTaskList() {
List<NotifyTask> notifyTaskList = notifyTaskDao.queryUnExecutedNotifyTaskList();
if (CollectionUtils.isEmpty(notifyTaskList)) {
return Collections.emptyList();
}
List<NotifyTaskEntity> notifyTaskEntities = new ArrayList<>();
for (NotifyTask notifyTask : notifyTaskList) {
NotifyTaskEntity notifyTaskEntity = new NotifyTaskEntity();
// 将同名属性从 task 拷贝到 entity
BeanUtils.copyProperties(notifyTask, notifyTaskEntity);
notifyTaskEntities.add(notifyTaskEntity);
}
return notifyTaskEntities;
}
/**
* 根据指定 teamId 查询未执行或待重试的单个通知任务
* @param teamId 拼团团队标识
* @return 返回长度为 0 1 的列表封装 NotifyTaskEntity
*/
@Override
public List<NotifyTaskEntity> queryUnExecutedNotifyTaskList(String teamId) {
NotifyTask notifyTask = notifyTaskDao.queryUnExecutedNotifyTaskByTeamId(teamId);
if (null == notifyTask) return new ArrayList<>();
return Collections.singletonList(NotifyTaskEntity.builder()
.teamId(notifyTask.getTeamId())
.notifyUrl(notifyTask.getNotifyUrl())
.notifyCount(notifyTask.getNotifyCount())
.parameterJson(notifyTask.getParameterJson())
.build());
}
/**
* 更新指定 teamId 的通知任务状态为成功
*/
@Override
public int updateNotifyTaskStatusSuccess(String teamId) {
return notifyTaskDao.updateNotifyTaskStatusSuccess(teamId);
}
/**
* 更新指定 teamId 的通知任务状态为失败不可重试
*/
@Override
public int updateNotifyTaskStatusError(String teamId) {
return notifyTaskDao.updateNotifyTaskStatusError(teamId);
}
/**
* 更新指定 teamId 的通知任务状态为重试
*/
@Override
public int updateNotifyTaskStatusRetry(String teamId) {
return notifyTaskDao.updateNotifyTaskStatusRetry(teamId);
}
}

View File

@ -2,6 +2,8 @@ package edu.whut.infrastructure.dao;
import edu.whut.infrastructure.dao.po.NotifyTask;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
/**
* 回调任务
*/
@ -10,4 +12,14 @@ public interface INotifyTaskDao {
void insert(NotifyTask notifyTask);
List<NotifyTask> queryUnExecutedNotifyTaskList();
NotifyTask queryUnExecutedNotifyTaskByTeamId(String teamId);
int updateNotifyTaskStatusSuccess(String teamId);
int updateNotifyTaskStatusError(String teamId);
int updateNotifyTaskStatusRetry(String teamId);
}

View File

@ -45,6 +45,8 @@ public class GroupBuyOrder {
private Date validStartTime;
/** 拼团结束时间 - 拼团有效时长 */
private Date validEndTime;
/** 回调通知地址 */
private String notifyUrl;
/** 创建时间 */
private Date createTime;
/** 更新时间 */

View File

@ -0,0 +1,43 @@
package edu.whut.infrastructure.gateway;
import edu.whut.types.enums.ResponseCode;
import edu.whut.types.exception.AppException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* 拼团回调服务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class GroupBuyNotifyService {
private final OkHttpClient okHttpClient;
public String groupBuyNotify(String apiUrl, String notifyRequestDTOJSON) throws Exception {
try {
// 1. 构建参数
MediaType mediaType = MediaType.parse("application/json");
RequestBody body = RequestBody.create(mediaType, notifyRequestDTOJSON);
Request request = new Request.Builder()
.url(apiUrl)
.post(body)
.addHeader("content-type", "application/json")
.build();
// 2. 调用接口
Response response = okHttpClient.newCall(request).execute();
// 3. 返回结果
return response.body().string();
} catch (Exception e) {
log.error("拼团回调 HTTP 接口服务异常 {}", apiUrl, e);
throw new AppException(ResponseCode.HTTP_EXCEPTION);
}
}
}

View File

@ -67,11 +67,12 @@ public class MarketTradeController implements IMarketTradeService {
Long activityId = lockMarketPayOrderRequestDTO.getActivityId();
String outTradeNo = lockMarketPayOrderRequestDTO.getOutTradeNo();
String teamId = lockMarketPayOrderRequestDTO.getTeamId(); //可为空
String notifyUrl = lockMarketPayOrderRequestDTO.getNotifyUrl();
log.info("拼团交易锁单入参 userId={} req={}", userId, JSON.toJSONString(lockMarketPayOrderRequestDTO));
// 空值校验任何一个关键字段为空则直接返回错误
if (StringUtils.isAnyBlank(userId, source, channel, goodsId) || activityId == null) {
if (StringUtils.isAnyBlank(userId, source, channel, goodsId, notifyUrl) || activityId == null) {
return Response.<LockMarketPayOrderResponseDTO>builder()
.code(ResponseCode.ILLEGAL_PARAMETER.getCode())
.info(ResponseCode.ILLEGAL_PARAMETER.getInfo())
@ -143,6 +144,7 @@ public class MarketTradeController implements IMarketTradeService {
.deductionPrice(trialBalance.getDeductionPrice())
.payPrice(trialBalance.getPayPrice())
.outTradeNo(outTradeNo)
.notifyUrl(notifyUrl)
.build());
log.info("交易锁单成功 userId={} order={}", userId, JSON.toJSONString(marketPayOrderEntity));

View File

@ -0,0 +1,29 @@
package edu.whut.trigger.http;
import com.alibaba.fastjson2.JSON;
import edu.whut.api.dto.NotifyRequestDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
/**
* 回调服务接口测试
*/
@Slf4j
@RestController()
@CrossOrigin("*")
@RequestMapping("/api/v1/test")
public class TestApiClientController {
/**
* 模拟回调案例
*
* @param notifyRequestDTO 通知回调参数
* @return success 成功error 失败
*/
@PostMapping("/group_buy_notify")
public String groupBuyNotify(@RequestBody NotifyRequestDTO notifyRequestDTO) {
log.info("模拟测试第三方服务接收拼团回调 {}", JSON.toJSONString(notifyRequestDTO));
return "success";
}
}

View File

@ -0,0 +1,31 @@
package edu.whut.trigger.job;
import com.alibaba.fastjson.JSON;
import edu.whut.domain.trade.service.ITradeSettlementOrderService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* 拼团完结回调通知任务拼团回调任务表实际公司场景会定时清理数据结转不会有太多数据挤压
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class GroupBuyNotifyJob {
private final ITradeSettlementOrderService tradeSettlementOrderService;
//每15秒执行一次
@Scheduled(cron = "0/15 * * * * ?")
public void exec() {
try {
Map<String, Integer> result = tradeSettlementOrderService. execSettlementNotifyJob();
log.info("定时任务,回调通知拼团完结任务 result:{}", JSON.toJSONString(result));
} catch (Exception e) {
log.error("定时任务,回调通知拼团完结任务失败", e);
}
}
}

View File

@ -0,0 +1,23 @@
package edu.whut.types.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
* 回调任务状态
*/
@Getter
@AllArgsConstructor
@NoArgsConstructor
public enum NotifyTaskHTTPEnumVO {
SUCCESS("success", "成功"),
ERROR("error", "失败"),
NULL(null, "空执行"),
;
private String code;
private String info;
}

View File

@ -14,6 +14,7 @@ public enum ResponseCode {
ILLEGAL_PARAMETER("0002", "非法参数"),
INDEX_EXCEPTION("0003", "唯一索引冲突"),
UPDATE_ZERO("0004", "更新记录为0"),
HTTP_EXCEPTION("0005", "HTTP接口调用异常"),
E0001("E0001", "不存在对应的折扣计算服务"),
E0002("E0002", "无拼团营销配置"),

11
pom.xml
View File

@ -122,6 +122,17 @@
<version>3.26.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-sse</artifactId>
<version>3.14.9</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>logging-interceptor</artifactId>
<version>3.14.9</version>
</dependency>
<!-- 工程模块 -->
<dependency>
<groupId>edu.whut</groupId>