6.26 人群标签数据采集

This commit is contained in:
zhangsan 2025-06-26 13:02:00 +08:00
parent c268b51b9b
commit 3fffdb4385
37 changed files with 1259 additions and 20 deletions

View File

@ -0,0 +1,165 @@
SET NAMES utf8mb4;
CREATE database if NOT EXISTS `group_buying_sys` default character set utf8mb4 collate utf8mb4_0900_ai_ci;
use `group_buying_sys`;
DROP TABLE IF EXISTS `group_buy_activity`;
--
CREATE TABLE `group_buy_activity` (
`id` bigint(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增',
`activity_id` bigint(8) NOT NULL COMMENT '活动ID',
`activity_name` varchar(128) NOT NULL COMMENT '活动名称',
`source` varchar(8) NOT NULL COMMENT '来源',
`channel` varchar(8) NOT NULL COMMENT '渠道',
`goods_id` varchar(12) NOT NULL COMMENT '商品ID',
`discount_id` varchar(8) NOT NULL COMMENT '折扣ID',
`group_type` tinyint(1) NOT NULL DEFAULT '0' COMMENT '拼团方式0自动成团、1达成目标拼团',
`take_limit_count` int(4) NOT NULL DEFAULT '1' COMMENT '拼团次数限制',
`target` int(5) NOT NULL DEFAULT '1' COMMENT '拼团目标',
`valid_time` int(4) NOT NULL DEFAULT '15' COMMENT '拼团时长(分钟)',
`status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '活动状态0创建、1生效、2过期、3废弃',
`start_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '活动开始时间',
`end_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '活动结束时间',
`tag_id` varchar(32) DEFAULT NULL COMMENT '人群标签规则标识',
`tag_scope` varchar(4) DEFAULT NULL COMMENT '人群标签规则范围多选1可见限制、2参与限制',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uq_activity_id` (`activity_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拼团活动';
LOCK TABLES `group_buy_activity` WRITE;
INSERT INTO `group_buy_activity` (`id`, `activity_id`, `activity_name`, `source`, `channel`, `goods_id`, `discount_id`, `group_type`, `take_limit_count`, `target`, `valid_time`, `status`, `start_time`, `end_time`, `tag_id`, `tag_scope`, `create_time`, `update_time`)
VALUES
(1,100123,'测试活动','s01','c01','9890001','25120207',0,1,1,15,0,'2025-06-19 10:19:40','2025-06-19 10:19:40','1','1','2025-06-19 10:19:40','2025-06-19 11:47:27');
UNLOCK TABLES;
DROP TABLE IF EXISTS `group_buy_discount`;
--
CREATE TABLE `group_buy_discount` (
`id` bigint(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`discount_id` int(8) NOT NULL COMMENT '折扣ID',
`discount_name` varchar(64) NOT NULL COMMENT '折扣标题',
`discount_desc` varchar(256) NOT NULL COMMENT '折扣描述',
`discount_type` tinyint(1) NOT NULL DEFAULT '0' COMMENT '折扣类型0:base、1:tag',
`market_plan` varchar(4) NOT NULL DEFAULT 'ZJ' COMMENT '营销优惠计划ZJ:直减、MJ:满减、ZK:折扣、N元购',
`market_expr` varchar(32) NOT NULL COMMENT '营销优惠表达式',
`tag_id` varchar(8) DEFAULT NULL COMMENT '人群标签,特定优惠限定',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uq_discount_id` (`discount_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
LOCK TABLES `group_buy_discount` WRITE;
INSERT INTO `group_buy_discount` (`id`, `discount_id`, `discount_name`, `discount_desc`, `discount_type`, `market_plan`, `market_expr`, `tag_id`, `create_time`, `update_time`)
VALUES
(1,'25120207','直减优惠20元','直减优惠20元',0,'ZJ','20',NULL,'2025-06-25 14:02:13','2025-06-25 14:02:13'),
(2,'25120208','满减优惠100-10元','满减优惠100-10元',0,'MJ','100,10',NULL,'2025-06-25 14:02:13','2025-06-25 14:02:13'),
(4,'25120209','折扣优惠8折','折扣优惠8折',0,'ZK','0.8',NULL,'2025-06-25 14:02:13','2025-06-25 14:02:13'),
(5,'25120210','N元购买优惠','N元购买优惠',0,'N','1.99',NULL,'2025-06-25 14:02:13','2025-06-25 14:02:13');
UNLOCK TABLES;
DROP TABLE IF EXISTS `sku`;
CREATE TABLE `sku` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`source` varchar(8) NOT NULL COMMENT '渠道',
`channel` varchar(8) NOT NULL COMMENT '来源',
`goods_id` varchar(16) NOT NULL COMMENT '商品ID',
`goods_name` varchar(128) NOT NULL COMMENT '商品名称',
`original_price` decimal(10,2) NOT NULL COMMENT '商品价格',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uq_goods_id` (`goods_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息';
LOCK TABLES `sku` WRITE;
INSERT INTO `sku` (`id`, `source`, `channel`, `goods_id`, `goods_name`, `original_price`, `create_time`, `update_time`)
VALUES
(1,'s01','c01','9890001','《手写MyBatis渐进式源码实践》',100.00,'2025-06-22 11:10:06','2025-06-22 11:10:06');
UNLOCK TABLES;
-- 人群标签表
DROP TABLE IF EXISTS `crowd_tags`;
CREATE TABLE `crowd_tags` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`tag_id` varchar(32) NOT NULL COMMENT '人群ID',
`tag_name` varchar(64) NOT NULL COMMENT '人群名称',
`tag_desc` varchar(256) NOT NULL COMMENT '人群描述',
`statistics` int(8) NOT NULL COMMENT '人群标签统计量',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uq_tag_id` (`tag_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='人群标签';
LOCK TABLES `crowd_tags` WRITE;
INSERT INTO `crowd_tags` (`id`, `tag_id`, `tag_name`, `tag_desc`, `statistics`, `create_time`, `update_time`)
VALUES
(1,'RQ_KJHKL98UU78H66554GFDV','潜在消费用户','潜在消费用户',6,'2025-06-26 09:12:22','2025-06-26 09:12:22');
UNLOCK TABLES;
-- 人群标签明细表
DROP TABLE IF EXISTS `crowd_tags_detail`;
CREATE TABLE `crowd_tags_detail` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`tag_id` varchar(32) NOT NULL COMMENT '人群ID',
`user_id` varchar(16) NOT NULL COMMENT '用户ID',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uq_tag_user` (`tag_id`,`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='人群标签明细';
LOCK TABLES `crowd_tags_detail` WRITE;
INSERT INTO `crowd_tags_detail` (`id`, `tag_id`, `user_id`, `create_time`, `update_time`)
VALUES
(4,'RQ_KJHKL98UU78H66554GFDV','zy123','2025-06-26 09:08:31','2025-06-26 09:08:31'),
(5,'RQ_KJHKL98UU78H66554GFDV','smile','2025-06-26 09:09:54','2025-06-26 09:09:54');
UNLOCK TABLES;
DROP TABLE IF EXISTS `crowd_tags_job`;
-- 人群标签任务表
CREATE TABLE `crowd_tags_job` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`tag_id` varchar(32) NOT NULL COMMENT '标签ID',
`batch_id` varchar(8) NOT NULL COMMENT '批次ID',
`tag_type` tinyint(1) NOT NULL DEFAULT '1' COMMENT '标签类型(参与量、消费金额)',
`tag_rule` varchar(8) NOT NULL COMMENT '标签规则(限定类型 N次',
`stat_start_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '统计数据,开始时间',
`stat_end_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '统计数据,结束时间',
`status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '状态0初始、1计划进入执行阶段、2重置、3完成',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uq_batch_id` (`batch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='人群标签任务';
LOCK TABLES `crowd_tags_job` WRITE;
INSERT INTO `crowd_tags_job` (`id`, `tag_id`, `batch_id`, `tag_type`, `tag_rule`, `stat_start_time`, `stat_end_time`, `status`, `create_time`, `update_time`)
VALUES
(1,'RQ_KJHKL98UU78H66554GFDV','10001',0,'100','2025-06-26 09:13:31','2025-06-26 09:13:31',0,'2025-06-26 09:13:31','2025-06-26 09:13:31');
UNLOCK TABLES;

View File

@ -0,0 +1,82 @@
package edu.whut.config;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
/**
* Redis 客户端使用 Redisson <a href="https://github.com/redisson/redisson">Redisson</a>
*/
@Configuration
@EnableConfigurationProperties(RedisClientConfigProperties.class)
public class RedisClientConfig {
@Bean("redissonClient")
public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext, RedisClientConfigProperties properties) {
Config config = new Config();
// 根据需要可以设定编解码器https://github.com/redisson/redisson/wiki/4.-%E6%95%B0%E6%8D%AE%E5%BA%8F%E5%88%97%E5%8C%96
config.setCodec(JsonJacksonCodec.INSTANCE);
config.useSingleServer()
.setAddress("redis://" + properties.getHost() + ":" + properties.getPort())
// .setPassword(properties.getPassword())
.setConnectionPoolSize(properties.getPoolSize())
.setConnectionMinimumIdleSize(properties.getMinIdleSize())
.setIdleConnectionTimeout(properties.getIdleTimeout())
.setConnectTimeout(properties.getConnectTimeout())
.setRetryAttempts(properties.getRetryAttempts())
.setRetryInterval(properties.getRetryInterval())
.setPingConnectionInterval(properties.getPingInterval())
.setKeepAlive(properties.isKeepAlive())
;
return Redisson.create(config);
}
static class RedisCodec extends BaseCodec {
private final Encoder encoder = in -> {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
ByteBufOutputStream os = new ByteBufOutputStream(out);
JSON.writeJSONString(os, in, SerializerFeature.WriteClassName);
return os.buffer();
} catch (IOException e) {
out.release();
throw e;
} catch (Exception e) {
out.release();
throw new IOException(e);
}
};
private final Decoder<Object> decoder = (buf, state) -> JSON.parseObject(new ByteBufInputStream(buf), Object.class);
@Override
public Decoder<Object> getValueDecoder() {
return decoder;
}
@Override
public Encoder getValueEncoder() {
return encoder;
}
}
}

View File

@ -0,0 +1,36 @@
package edu.whut.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* Redis 连接配置 <a href="https://github.com/redisson/redisson/tree/master/redisson-spring-boot-starter">redisson-spring-boot-starter</a>
*/
@Data
@ConfigurationProperties(prefix = "redis.sdk.config", ignoreInvalidFields = true)
public class RedisClientConfigProperties {
/** host:ip */
private String host;
/** 端口 */
private int port;
/** 账密 */
private String password;
/** 设置连接池的大小默认为64 */
private int poolSize = 64;
/** 设置连接池的最小空闲连接数默认为10 */
private int minIdleSize = 10;
/** 设置连接的最大空闲时间单位毫秒超过该时间的空闲连接将被关闭默认为10000 */
private int idleTimeout = 10000;
/** 设置连接超时时间单位毫秒默认为10000 */
private int connectTimeout = 10000;
/** 设置连接重试次数默认为3 */
private int retryAttempts = 3;
/** 设置连接重试的间隔时间单位毫秒默认为1000 */
private int retryInterval = 1000;
/** 设置定期检查连接是否可用的时间间隔单位毫秒默认为0表示不进行定期检查 */
private int pingInterval = 0;
/** 设置是否保持长连接默认为true */
private boolean keepAlive = true;
}

View File

@ -36,6 +36,21 @@ mybatis:
mapper-locations: classpath:/mybatis/mapper/*.xml
config-location: classpath:/mybatis/config/mybatis-config.xml
# Redis
redis:
sdk:
config:
host: localhost
port: 16379
pool-size: 10
min-idle-size: 5
idle-timeout: 30000
connect-timeout: 5000
retry-attempts: 3
retry-interval: 1000
ping-interval: 60000
keep-alive: true
# 日志
logging:
level:

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="edu.whut.infrastructure.dao.ICrowdTagsDetailDao">
<resultMap id="dataMap" type="edu.whut.infrastructure.dao.po.CrowdTagsDetail">
<id column="id" property="id"/>
<result column="tag_id" property="tagId"/>
<result column="user_id" property="userId"/>
<result column="create_time" property="createTime"/>
<result column="update_time" property="updateTime"/>
</resultMap>
<insert id="addCrowdTagsUserId" parameterType="edu.whut.infrastructure.dao.po.CrowdTagsDetail">
insert into crowd_tags_detail(tag_id, user_id, create_time, update_time)
values (#{tagId}, #{userId}, now(), now())
</insert>
</mapper>

View File

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="edu.whut.infrastructure.dao.ICrowdTagsJobDao">
<resultMap id="dataMap" type="edu.whut.infrastructure.dao.po.CrowdTagsJob">
<id column="id" property="id"/>
<result column="tag_id" property="tagId"/>
<result column="batch_id" property="batchId"/>
<result column="tag_type" property="tagType"/>
<result column="tag_rule" property="tagRule"/>
<result column="stat_start_time" property="statStartTime"/>
<result column="stat_end_time" property="statEndTime"/>
<result column="status" property="status"/>
<result column="create_time" property="createTime"/>
<result column="update_time" property="updateTime"/>
</resultMap>
<select id="queryCrowdTagsJob" parameterType="edu.whut.infrastructure.dao.po.CrowdTagsJob" resultMap="dataMap">
select tag_type, tag_rule, stat_start_time, stat_end_time
from crowd_tags_job
where tag_id = #{tagId} and batch_id = #{batchId}
</select>
</mapper>

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="edu.whut.infrastructure.dao.ICrowdTagsDao">
<resultMap id="dataMap" type="edu.whut.infrastructure.dao.po.CrowdTags">
<id column="id" property="id"/>
<result column="tag_id" property="tagId"/>
<result column="tag_name" property="tagName"/>
<result column="tag_desc" property="tagDesc"/>
<result column="statistics" property="statistics"/>
<result column="create_time" property="createTime"/>
<result column="update_time" property="updateTime"/>
</resultMap>
<update id="updateCrowdTagsStatistics" parameterType="edu.whut.infrastructure.dao.po.CrowdTags">
update crowd_tags
set statistics = statistics + #{statistics}
where tag_id = #{tagId}
</update>
</mapper>

View File

@ -26,7 +26,7 @@ public class IIndexGroupBuyMarketServiceTest {
@Test
public void test_indexMarketTrial() throws Exception {
MarketProductEntity marketProductEntity = new MarketProductEntity();
marketProductEntity.setUserId("xiaofuge");
marketProductEntity.setUserId("smile");
marketProductEntity.setSource("s01");
marketProductEntity.setChannel("c01");
marketProductEntity.setGoodsId("9890001");

View File

@ -0,0 +1,39 @@
package edu.whut.test.domain.tag;
import edu.whut.domain.tag.service.TagService;
import edu.whut.infrastructure.redis.IRedisService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.redisson.api.RBitSet;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
/**
* 人群标签服务测试
*/
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class ITagServiceTest {
@Resource
private TagService tagService;
@Resource
private IRedisService redisService;
@Test
public void test_tag_job() {
tagService.execTagBatchJob("RQ_KJHKL98UU78H66554GFDV", "10001");
}
@Test
public void test_get_tag_bitmap() {
RBitSet bitSet = redisService.getBitSet("RQ_KJHKL98UU78H66554GFDV");
// 是否存在
log.info("smile 存在,预期结果为 true测试结果:{}", bitSet.get(redisService.getIndexFromUserId("smile")));
log.info("gudebai 不存在,预期结果为 false测试结果:{}", bitSet.get(redisService.getIndexFromUserId("gudebai")));
}
}

View File

@ -8,6 +8,12 @@ import edu.whut.domain.activity.model.entity.TrialBalanceEntity;
*/
public interface IIndexGroupBuyMarketService {
/**
* 首页营销试算入口
* @param marketProductEntity
* @return
* @throws Exception
*/
TrialBalanceEntity indexMarketTrial(MarketProductEntity marketProductEntity) throws Exception;
}

View File

@ -3,6 +3,7 @@ import edu.whut.domain.activity.model.entity.MarketProductEntity;
import edu.whut.domain.activity.model.entity.TrialBalanceEntity;
import edu.whut.domain.activity.service.trial.factory.DefaultActivityStrategyFactory;
import edu.whut.types.design.framework.tree.StrategyHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@ -11,10 +12,10 @@ import javax.annotation.Resource;
* 首页营销服务
*/
@Service
@RequiredArgsConstructor
public class IndexGroupBuyMarketServiceImpl implements IIndexGroupBuyMarketService {
@Resource
private DefaultActivityStrategyFactory defaultActivityStrategyFactory;
private final DefaultActivityStrategyFactory defaultActivityStrategyFactory;
@Override
public TrialBalanceEntity indexMarketTrial(MarketProductEntity marketProductEntity) throws Exception {

View File

@ -5,10 +5,7 @@ import edu.whut.domain.activity.model.valobj.GroupBuyActivityDiscountVO;
import edu.whut.domain.activity.model.valobj.SkuVO;
import edu.whut.domain.activity.service.trial.node.RootNode;
import edu.whut.types.design.framework.tree.StrategyHandler;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.*;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@ -17,14 +14,11 @@ import java.math.BigDecimal;
* 活动策略工厂
*/
@Service
@RequiredArgsConstructor
public class DefaultActivityStrategyFactory {
private final RootNode rootNode;
public DefaultActivityStrategyFactory(RootNode rootNode) {
this.rootNode = rootNode;
}
public StrategyHandler<MarketProductEntity, DynamicContext, TrialBalanceEntity> strategyHandler() {
return rootNode;
}

View File

@ -35,7 +35,7 @@ public class MarketNode extends AbstractGroupBuyMarketSupport<MarketProductEntit
private final Map<String, IDiscountCalculateService> discountCalculateServiceMap;
// 异步加载数据
@Override
protected void multiThread(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws ExecutionException, InterruptedException, TimeoutException {
// 异步查询活动配置

View File

@ -0,0 +1,19 @@
package edu.whut.domain.tag.adapter.repository;
import edu.whut.domain.tag.model.entity.CrowdTagsJobEntity;
/**
* 人群标签仓储接口
*/
public interface ITagRepository {
//拉取批次任务配置
CrowdTagsJobEntity queryCrowdTagsJobEntity(String tagId, String batchId);
//向明细表Redis 位图双写
void addCrowdTagsUserId(String tagId, String userId);
//把累计数量写回主表的统计字段
void updateCrowdTagsStatistics(String tagId, int count);
}

View File

@ -4,4 +4,4 @@
* 2. 聚合是聚合的对象和提供基础处理对象的方法但不建议在聚合中引入仓储和接口来做过大的逻辑而这些复杂的操作应该放到service中处理
* 3. 对象名称 XxxAggregate
*/
package edu.whut.domain.yyy.model.aggregate;
package edu.whut.domain.tag.model.aggregate;

View File

@ -0,0 +1,28 @@
package edu.whut.domain.tag.model.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
/**
* 批次任务对象
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CrowdTagsJobEntity {
/** 标签类型(参与量、消费金额) */
private Integer tagType;
/** 标签规则(限定类型 N次 */
private String tagRule;
/** 统计数据,开始时间 */
private Date statStartTime;
/** 统计数据,结束时间 */
private Date statEndTime;
}

View File

@ -4,4 +4,4 @@
* 2. 如果是老系统改造那么旧的库表冗余了太多的字段可能会有nv1的情况
* 3. 对象名称 XxxEntity
*/
package edu.whut.domain.yyy.model.entity;
package edu.whut.domain.tag.model.entity;

View File

@ -3,4 +3,4 @@
* 1. 用于描述对象属性的值如一个库表中有json后者一个字段多个属性信息的枚举对象
* 2. 对象名称如XxxVO
*/
package edu.whut.domain.yyy.model.valobj;
package edu.whut.domain.tag.model.valobj;

View File

@ -0,0 +1,16 @@
package edu.whut.domain.tag.service;
/**
* 人群标签服务接口
*/
public interface ITagService {
/**
* 执行人群标签批次任务
*
* @param tagId 人群ID
* @param batchId 批次ID
*/
void execTagBatchJob(String tagId, String batchId);
}

View File

@ -0,0 +1,51 @@
package edu.whut.domain.tag.service;
import edu.whut.domain.tag.adapter.repository.ITagRepository;
import edu.whut.domain.tag.model.entity.CrowdTagsJobEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
* 人群标签服务
*/
@Slf4j
@Service
public class TagService implements ITagService {
@Resource
private ITagRepository repository;
/**
* 通常会被定时器或消息中间件触发用来把符合某个标签规则的用户批量写入到标签明细表和 Redis 位图中并更新标签的统计量
* @param tagId 人群ID
* @param batchId 批次ID
*/
@Override
public void execTagBatchJob(String tagId, String batchId) {
log.info("人群标签批次任务 tagId:{} batchId:{}", tagId, batchId);
// 1. 查询批次任务,拿到本次打标签要用的统计规则信息比如时间范围标签类型规则表达式等
CrowdTagsJobEntity crowdTagsJobEntity = repository.queryCrowdTagsJobEntity(tagId, batchId);
// 2. 采集用户数据 - 这部分需要采集用户的消费类数据后续有用户发起拼单后再处理
// 3. 数据写入记录暂时模拟数据
List<String> userIdList = new ArrayList<String>() {{
add("zy123");
add("smile");
}};
// 4. 把用户写入标签明细表
for (String userId : userIdList) {
repository.addCrowdTagsUserId(tagId, userId);
}
// 5. 把本次新增的用户数量累加到主表 crowd_tags.statistics 字段里统计当前有多少人打中了这个标签
repository.updateCrowdTagsStatistics(tagId, userIdList.size());
}
}

View File

@ -1 +0,0 @@
package edu.whut.domain.yyy.service;

View File

@ -18,6 +18,12 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
<!-- 系统模块 -->
<dependency>
<groupId>edu.whut</groupId>

View File

@ -0,0 +1,100 @@
package edu.whut.infrastructure.adapter.repository;
import edu.whut.domain.tag.adapter.repository.ITagRepository;
import edu.whut.domain.tag.model.entity.CrowdTagsJobEntity;
import edu.whut.infrastructure.dao.ICrowdTagsDao;
import edu.whut.infrastructure.dao.ICrowdTagsDetailDao;
import edu.whut.infrastructure.dao.ICrowdTagsJobDao;
import edu.whut.infrastructure.dao.po.CrowdTags;
import edu.whut.infrastructure.dao.po.CrowdTagsDetail;
import edu.whut.infrastructure.dao.po.CrowdTagsJob;
import edu.whut.infrastructure.redis.IRedisService;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RBitSet;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Repository;
import javax.annotation.Resource;
/**
* 人群标签仓储
*/
@Repository
@RequiredArgsConstructor
public class TagRepository implements ITagRepository {
private final ICrowdTagsDao crowdTagsDao;
private final ICrowdTagsDetailDao crowdTagsDetailDao;
private final ICrowdTagsJobDao crowdTagsJobDao;
private final IRedisService redisService;
/**
* 根据标签 + 批次号查询一次人群标签批次任务配置
* 1入参由调度/业务服务提供
* 2查询 crowd_tags_job 拿到统计区间规则等信息
* 3若数据库无记录则返回 null调用方需自行判空
* @param tagId
* @param batchId
* @return
*/
@Override
public CrowdTagsJobEntity queryCrowdTagsJobEntity(String tagId, String batchId) {
CrowdTagsJob crowdTagsJobReq = new CrowdTagsJob();
crowdTagsJobReq.setTagId(tagId);
crowdTagsJobReq.setBatchId(batchId);
CrowdTagsJob crowdTagsJobRes = crowdTagsJobDao.queryCrowdTagsJob(crowdTagsJobReq);
if (null == crowdTagsJobRes) return null;
return CrowdTagsJobEntity.builder()
.tagType(crowdTagsJobRes.getTagType())
.tagRule(crowdTagsJobRes.getTagRule())
.statStartTime(crowdTagsJobRes.getStatStartTime())
.statEndTime(crowdTagsJobRes.getStatEndTime())
.build();
}
/**
* 新增标签 - 用户关系
* 先向 crowd_tags_detail 表插入一条 (tagId, userId)
* 再把 userId 映射到位图索引并写入 Redis BitSet便于高速判定某用户是否属于该标签
* 如果数据库已存在同样的唯一键会捕获 {@link org.springframework.dao.DuplicateKeyException} 并忽略
* @param tagId
* @param userId
*/
@Override
public void addCrowdTagsUserId(String tagId, String userId) {
CrowdTagsDetail crowdTagsDetailReq = new CrowdTagsDetail();
crowdTagsDetailReq.setTagId(tagId);
crowdTagsDetailReq.setUserId(userId);
try {
crowdTagsDetailDao.addCrowdTagsUserId(crowdTagsDetailReq);
// 获取BitSet
RBitSet bitSet = redisService.getBitSet(tagId);
bitSet.set(redisService.getIndexFromUserId(userId), true);
} catch (DuplicateKeyException ignore) {
// 忽略唯一索引冲突
}
}
/**
* 更新标签的统计量例如已命中的用户总数
* 一般在整批用户写入完毕后调用把本次新增 count 累加到
* crowd_tags.statistics 字段便于后续做报表或曝光
* @param tagId 标签 ID
* @param count 本次新增或回填的数量
*/
@Override
public void updateCrowdTagsStatistics(String tagId, int count) {
CrowdTags crowdTagsReq = new CrowdTags();
crowdTagsReq.setTagId(tagId);
crowdTagsReq.setStatistics(count);
crowdTagsDao.updateCrowdTagsStatistics(crowdTagsReq);
}
}

View File

@ -0,0 +1,14 @@
package edu.whut.infrastructure.dao;
import edu.whut.infrastructure.dao.po.CrowdTags;
import org.apache.ibatis.annotations.Mapper;
/**
* 人群标签
*/
@Mapper
public interface ICrowdTagsDao {
void updateCrowdTagsStatistics(CrowdTags crowdTagsReq);
}

View File

@ -0,0 +1,14 @@
package edu.whut.infrastructure.dao;
import edu.whut.infrastructure.dao.po.CrowdTagsDetail;
import org.apache.ibatis.annotations.Mapper;
/**
* 群标签明细
*/
@Mapper
public interface ICrowdTagsDetailDao {
void addCrowdTagsUserId(CrowdTagsDetail crowdTagsDetailReq);
}

View File

@ -0,0 +1,14 @@
package edu.whut.infrastructure.dao;
import edu.whut.infrastructure.dao.po.CrowdTagsJob;
import org.apache.ibatis.annotations.Mapper;
/**
* @description 人群标签任务
*/
@Mapper
public interface ICrowdTagsJobDao {
CrowdTagsJob queryCrowdTagsJob(CrowdTagsJob crowdTagsJobReq);
}

View File

@ -0,0 +1,34 @@
package edu.whut.infrastructure.dao.po;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
/**
* 人群标签
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CrowdTags {
/** 自增ID */
private Long id;
/** 人群ID */
private String tagId;
/** 人群名称 */
private String tagName;
/** 人群描述 */
private String tagDesc;
/** 人群标签统计量 */
private Integer statistics;
/** 创建时间 */
private Date createTime;
/** 更新时间 */
private Date updateTime;
}

View File

@ -0,0 +1,30 @@
package edu.whut.infrastructure.dao.po;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
/**
* 人群标签明细
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CrowdTagsDetail {
/** 自增ID */
private Long id;
/** 人群ID */
private String tagId;
/** 用户ID */
private String userId;
/** 创建时间 */
private Date createTime;
/** 更新时间 */
private Date updateTime;
}

View File

@ -0,0 +1,42 @@
package edu.whut.infrastructure.dao.po;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
/**
* @author Fuzhengwei bugstack.cn @小傅哥
* @description 人群标签任务
* @create 2024-12-28 11:45
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CrowdTagsJob {
/** 自增ID */
private Long id;
/** 标签ID */
private String tagId;
/** 批次ID */
private String batchId;
/** 标签类型(参与量、消费金额) */
private Integer tagType;
/** 标签规则(限定类型 N次 */
private String tagRule;
/** 统计数据,开始时间 */
private Date statStartTime;
/** 统计数据,结束时间 */
private Date statEndTime;
/** 状态0初始、1计划进入执行阶段、2重置、3完成 */
private Integer status;
/** 创建时间 */
private Date createTime;
/** 更新时间 */
private Date updateTime;
}

View File

@ -0,0 +1,287 @@
package edu.whut.infrastructure.redis;
import org.redisson.api.*;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeUnit;
/**
* Redis 服务 统一方法名上层业务只依赖 IRedisService 这个接口不用关心用的是 RedissonLettuce 还是别的
*/
public interface IRedisService {
/**
* 设置指定 key 的值
*
* @param key
* @param value
*/
<T> void setValue(String key, T value);
/**
* 设置指定 key 的值
*
* @param key
* @param value
* @param expired 过期时间
*/
<T> void setValue(String key, T value, long expired);
/**
* 获取指定 key 的值
*
* @param key
* @return
*/
<T> T getValue(String key);
/**
* 获取队列
*
* @param key
* @param <T> 泛型
* @return 队列
*/
<T> RQueue<T> getQueue(String key);
/**
* 加锁队列
*
* @param key
* @param <T> 泛型
* @return 队列
*/
<T> RBlockingQueue<T> getBlockingQueue(String key);
/**
* 延迟队列
*
* @param rBlockingQueue 加锁队列
* @param <T> 泛型
* @return 队列
*/
<T> RDelayedQueue<T> getDelayedQueue(RBlockingQueue<T> rBlockingQueue);
/**
* 设置值
*
* @param key key
* @param value
*/
void setAtomicLong(String key, long value);
/**
* 获取值
*
* @param key key
*/
Long getAtomicLong(String key);
/**
* 自增 Key 的值1234
*
* @param key
* @return 自增后的值
*/
long incr(String key);
/**
* 指定值自增 Key 的值1234
*
* @param key
* @return 自增后的值
*/
long incrBy(String key, long delta);
/**
* 自减 Key 的值1234
*
* @param key
* @return 自增后的值
*/
long decr(String key);
/**
* 指定值自增 Key 的值1234
*
* @param key
* @return 自增后的值
*/
long decrBy(String key, long delta);
/**
* 移除指定 key 的值
*
* @param key
*/
void remove(String key);
/**
* 判断指定 key 的值是否存在
*
* @param key
* @return true/false
*/
boolean isExists(String key);
/**
* 将指定的值添加到集合中
*
* @param key
* @param value
*/
void addToSet(String key, String value);
/**
* 判断指定的值是否是集合的成员
*
* @param key
* @param value
* @return 如果是集合的成员返回 true否则返回 false
*/
boolean isSetMember(String key, String value);
/**
* 将指定的值添加到列表中
*
* @param key
* @param value
*/
void addToList(String key, String value);
/**
* 获取列表中指定索引的值
*
* @param key
* @param index 索引
* @return
*/
String getFromList(String key, int index);
/**
* 获取Map
*
* @param key
* @return
*/
<K, V> RMap<K, V> getMap(String key);
/**
* 将指定的键值对添加到哈希表中
*
* @param key
* @param field 字段
* @param value
*/
void addToMap(String key, String field, String value);
/**
* 获取哈希表中指定字段的值
*
* @param key
* @param field 字段
* @return
*/
String getFromMap(String key, String field);
/**
* 获取哈希表中指定字段的值
*
* @param key
* @param field 字段
* @return
*/
<K, V> V getFromMap(String key, K field);
/**
* 将指定的值添加到有序集合中
*
* @param key
* @param value
*/
void addToSortedSet(String key, String value);
/**
* 获取 Redis 可重入锁
*
* @param key
* @return Lock
*/
RLock getLock(String key);
/**
* 获取 Redis 公平锁
*
* @param key
* @return Lock
*/
RLock getFairLock(String key);
/**
* 获取 Redis 读写锁
*
* @param key
* @return RReadWriteLock
*/
RReadWriteLock getReadWriteLock(String key);
/**
* 获取 Redis 信号量
*
* @param key
* @return RSemaphore
*/
RSemaphore getSemaphore(String key);
/**
* 获取 Redis 过期信号量
* <p>
* 基于Redis的Redisson的分布式信号量SemaphoreJava对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法
* 同时还提供了异步Async反射式Reactive和RxJava2标准的接口
*
* @param key
* @return RPermitExpirableSemaphore
*/
RPermitExpirableSemaphore getPermitExpirableSemaphore(String key);
/**
* 闭锁
*
* @param key
* @return RCountDownLatch
*/
RCountDownLatch getCountDownLatch(String key);
/**
* 布隆过滤器
*
* @param key
* @param <T> 存放对象
* @return 返回结果
*/
<T> RBloomFilter<T> getBloomFilter(String key);
Boolean setNx(String key);
Boolean setNx(String key, long expired, TimeUnit timeUnit);
RBitSet getBitSet(String key);
default int getIndexFromUserId(String userId) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] hashBytes = md.digest(userId.getBytes(StandardCharsets.UTF_8));
// 将哈希字节数组转换为正整数
BigInteger bigInt = new BigInteger(1, hashBytes);
// 取模以确保索引在合理范围内
return bigInt.mod(BigInteger.valueOf(Integer.MAX_VALUE)).intValue();
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 algorithm not found", e);
}
}
}

View File

@ -0,0 +1,182 @@
package edu.whut.infrastructure.redis;
import org.redisson.api.*;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/**
* Redis 服务 - Redisson
*/
@Service("redissonService")
public class RedissonService implements IRedisService {
@Resource
private RedissonClient redissonClient;
public <T> void setValue(String key, T value) {
redissonClient.<T>getBucket(key).set(value);
}
@Override
public <T> void setValue(String key, T value, long expired) {
RBucket<T> bucket = redissonClient.getBucket(key);
bucket.set(value, Duration.ofMillis(expired));
}
public <T> T getValue(String key) {
return redissonClient.<T>getBucket(key).get();
}
@Override
public <T> RQueue<T> getQueue(String key) {
return redissonClient.getQueue(key);
}
@Override
public <T> RBlockingQueue<T> getBlockingQueue(String key) {
return redissonClient.getBlockingQueue(key);
}
@Override
public <T> RDelayedQueue<T> getDelayedQueue(RBlockingQueue<T> rBlockingQueue) {
return redissonClient.getDelayedQueue(rBlockingQueue);
}
@Override
public void setAtomicLong(String key, long value) {
redissonClient.getAtomicLong(key).set(value);
}
@Override
public Long getAtomicLong(String key) {
return redissonClient.getAtomicLong(key).get();
}
@Override
public long incr(String key) {
return redissonClient.getAtomicLong(key).incrementAndGet();
}
@Override
public long incrBy(String key, long delta) {
return redissonClient.getAtomicLong(key).addAndGet(delta);
}
@Override
public long decr(String key) {
return redissonClient.getAtomicLong(key).decrementAndGet();
}
@Override
public long decrBy(String key, long delta) {
return redissonClient.getAtomicLong(key).addAndGet(-delta);
}
@Override
public void remove(String key) {
redissonClient.getBucket(key).delete();
}
@Override
public boolean isExists(String key) {
return redissonClient.getBucket(key).isExists();
}
public void addToSet(String key, String value) {
RSet<String> set = redissonClient.getSet(key);
set.add(value);
}
public boolean isSetMember(String key, String value) {
RSet<String> set = redissonClient.getSet(key);
return set.contains(value);
}
public void addToList(String key, String value) {
RList<String> list = redissonClient.getList(key);
list.add(value);
}
public String getFromList(String key, int index) {
RList<String> list = redissonClient.getList(key);
return list.get(index);
}
@Override
public <K, V> RMap<K, V> getMap(String key) {
return redissonClient.getMap(key);
}
public void addToMap(String key, String field, String value) {
RMap<String, String> map = redissonClient.getMap(key);
map.put(field, value);
}
public String getFromMap(String key, String field) {
RMap<String, String> map = redissonClient.getMap(key);
return map.get(field);
}
@Override
public <K, V> V getFromMap(String key, K field) {
return redissonClient.<K, V>getMap(key).get(field);
}
public void addToSortedSet(String key, String value) {
RSortedSet<String> sortedSet = redissonClient.getSortedSet(key);
sortedSet.add(value);
}
@Override
public RLock getLock(String key) {
return redissonClient.getLock(key);
}
@Override
public RLock getFairLock(String key) {
return redissonClient.getFairLock(key);
}
@Override
public RReadWriteLock getReadWriteLock(String key) {
return redissonClient.getReadWriteLock(key);
}
@Override
public RSemaphore getSemaphore(String key) {
return redissonClient.getSemaphore(key);
}
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(String key) {
return redissonClient.getPermitExpirableSemaphore(key);
}
@Override
public RCountDownLatch getCountDownLatch(String key) {
return redissonClient.getCountDownLatch(key);
}
@Override
public <T> RBloomFilter<T> getBloomFilter(String key) {
return redissonClient.getBloomFilter(key);
}
@Override
public Boolean setNx(String key) {
return redissonClient.getBucket(key).trySet("lock");
}
@Override
public Boolean setNx(String key, long expired, TimeUnit timeUnit) {
return redissonClient.getBucket(key).trySet("lock", expired, timeUnit);
}
@Override
public RBitSet getBitSet(String key) {
return redissonClient.getBitSet(key);
}
}

View File

@ -1,4 +0,0 @@
/**
* 提供redis链接配置
*/
package edu.whut.infrastructure.redis;

View File

@ -116,6 +116,12 @@
<version>1.15</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.26.0</version>
</dependency>
<!-- 工程模块 -->
<dependency>
<groupId>edu.whut</groupId>