From 7cb33348f1b106b57e5ae5cf9845e2345705691d Mon Sep 17 00:00:00 2001 From: zhangsan <646228430@qq.com> Date: Thu, 24 Jul 2025 17:16:08 +0800 Subject: [PATCH] =?UTF-8?q?7.24=20redission=E5=88=86=E5=B8=83=E5=BC=8F?= =?UTF-8?q?=E9=99=90=E6=B5=81+=E9=BB=91=E5=90=8D=E5=8D=95+=E9=99=8D?= =?UTF-8?q?=E7=BA=A7=E5=9B=9E=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/dev-ops/docker-compose-environment.yml | 21 ++- .../whut/config/RateLimiterAutoConfig.java | 17 ++ group-buying-sys-infrastructure/pom.xml | 4 + .../infrastructure/aop/RateLimiterAOP.java | 160 ++++++++++++++++++ .../edu/whut/trigger/http/DCCController.java | 1 + .../trigger/http/MarketIndexController.java | 10 ++ .../RateLimiterAccessInterceptor.java | 26 +++ .../edu/whut/types/enums/ResponseCode.java | 1 + 8 files changed, 231 insertions(+), 9 deletions(-) create mode 100644 group-buying-sys-app/src/main/java/edu/whut/config/RateLimiterAutoConfig.java create mode 100644 group-buying-sys-infrastructure/src/main/java/edu/whut/infrastructure/aop/RateLimiterAOP.java create mode 100644 group-buying-sys-types/src/main/java/edu/whut/types/annotations/RateLimiterAccessInterceptor.java diff --git a/docs/dev-ops/docker-compose-environment.yml b/docs/dev-ops/docker-compose-environment.yml index c4b0c67..e786aa9 100644 --- a/docs/dev-ops/docker-compose-environment.yml +++ b/docs/dev-ops/docker-compose-environment.yml @@ -4,7 +4,7 @@ services: image: mysql:8.0 container_name: mysql command: --default-authentication-plugin=mysql_native_password - restart: always + restart: unless-stopped environment: TZ: Asia/Shanghai MYSQL_ROOT_PASSWORD: 123456 @@ -20,11 +20,12 @@ services: retries: 10 start_period: 15s networks: - - my-network + - group-buy-network # phpmyadmin https://hub.docker.com/_/phpmyadmin phpmyadmin: image: phpmyadmin:5.2.1 + restart: unless-stopped container_name: phpmyadmin hostname: phpmyadmin ports: @@ -37,13 +38,13 @@ services: mysql: condition: service_healthy networks: - - my-network + - group-buy-network # Redis redis: image: redis:6.2 + restart: unless-stopped container_name: redis - restart: always hostname: redis privileged: true ports: @@ -52,7 +53,7 @@ services: - ./redis/redis.conf:/usr/local/etc/redis/redis.conf command: redis-server /usr/local/etc/redis/redis.conf networks: - - my-network + - group-buy-network healthcheck: test: [ "CMD", "redis-cli", "ping" ] interval: 10s @@ -64,7 +65,7 @@ services: image: spryker/redis-commander:0.8.0 container_name: redis-admin hostname: redis-commander - restart: always + restart: unless-stopped ports: - 8081:8081 environment: @@ -75,7 +76,7 @@ services: - LANGUAGE=C.UTF-8 - LC_ALL=C.UTF-8 networks: - - my-network + - group-buy-network depends_on: redis: condition: service_healthy @@ -86,7 +87,7 @@ services: rabbitmq: image: rabbitmq:3.8-management container_name: rabbitmq - restart: always + restart: unless-stopped ports: - "5672:5672" - "15672:15672" @@ -97,7 +98,9 @@ services: volumes: - ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins - ./rabbitmq/mq-data:/var/lib/rabbitmq + networks: + - group-buy-network networks: - my-network: + group-buy-network: driver: bridge \ No newline at end of file diff --git a/group-buying-sys-app/src/main/java/edu/whut/config/RateLimiterAutoConfig.java b/group-buying-sys-app/src/main/java/edu/whut/config/RateLimiterAutoConfig.java new file mode 100644 index 0000000..7821414 --- /dev/null +++ b/group-buying-sys-app/src/main/java/edu/whut/config/RateLimiterAutoConfig.java @@ -0,0 +1,17 @@ +package edu.whut.config; +import edu.whut.infrastructure.aop.RateLimiterAOP; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 限流配置 + */ +@Configuration +public class RateLimiterAutoConfig { + + @Bean + public RateLimiterAOP rateLimiterAOP() { + return new RateLimiterAOP(); + } + +} diff --git a/group-buying-sys-infrastructure/pom.xml b/group-buying-sys-infrastructure/pom.xml index 4c8c2f4..e77c884 100644 --- a/group-buying-sys-infrastructure/pom.xml +++ b/group-buying-sys-infrastructure/pom.xml @@ -37,6 +37,10 @@ org.springframework.boot spring-boot-starter-amqp + + org.springframework.boot + spring-boot-starter-aop + edu.whut diff --git a/group-buying-sys-infrastructure/src/main/java/edu/whut/infrastructure/aop/RateLimiterAOP.java b/group-buying-sys-infrastructure/src/main/java/edu/whut/infrastructure/aop/RateLimiterAOP.java new file mode 100644 index 0000000..e007eb7 --- /dev/null +++ b/group-buying-sys-infrastructure/src/main/java/edu/whut/infrastructure/aop/RateLimiterAOP.java @@ -0,0 +1,160 @@ +package edu.whut.infrastructure.aop; + +import edu.whut.types.annotations.DCCValue; +import edu.whut.types.annotations.RateLimiterAccessInterceptor; +import org.aspectj.lang.JoinPoint; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.Signature; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Pointcut; +import org.aspectj.lang.reflect.MethodSignature; +import org.redisson.api.RAtomicLong; +import org.redisson.api.RRateLimiter; +import org.redisson.api.RateIntervalUnit; +import org.redisson.api.RateType; +import org.redisson.api.RedissonClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Resource; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +/** + * 分布式限流切面,基于 Redisson 的 RRateLimiter 和 RAtomicLong 实现 + */ +@Aspect +public class RateLimiterAOP { + + private final Logger log = LoggerFactory.getLogger(RateLimiterAOP.class); + + /** + * 全局开关:open/close + */ + @DCCValue("rateLimiterSwitch:open") + private String rateLimiterSwitch; + + /** + * Redisson 客户端,注入使用 + */ + @Resource + private RedissonClient redissonClient; + + @Pointcut("@annotation(edu.whut.types.annotations.RateLimiterAccessInterceptor)") + public void aopPoint() {} + + @Around("aopPoint() && @annotation(rateLimiterAccessInterceptor)") + public Object doRouter(ProceedingJoinPoint jp, + RateLimiterAccessInterceptor rateLimiterAccessInterceptor) throws Throwable { + // 0. 全局开关 + if (StringUtils.isBlank(rateLimiterSwitch) || "close".equals(rateLimiterSwitch)) { + return jp.proceed(); + } + + // 1. 获取限流维度 key + String key = rateLimiterAccessInterceptor.key(); + if (StringUtils.isBlank(key)) { + throw new RuntimeException("annotation RateLimiter key is null!"); + } + String keyAttr = getAttrValue(key, jp.getArgs()); + log.info("[RateLimiter] attr={}, permits={}, blacklistCount={}", + keyAttr, + rateLimiterAccessInterceptor.permitsPerSecond(), + rateLimiterAccessInterceptor.blacklistCount()); + + // 2. 黑名单检查(分布式,24h) rl:ratelimit bl:blacklist + // 存储的是 “用户在这一轮限流中被拒绝的次数”,大于blacklistLimit则被视作进入黑名单,等key释放解决黑名单 + double blacklistLimit = rateLimiterAccessInterceptor.blacklistCount(); + if (blacklistLimit > 0) { + RAtomicLong blCounter = redissonClient.getAtomicLong("rl:bl:" + keyAttr); + if (blCounter.isExists() && blCounter.get() > blacklistLimit) { + log.info("[RateLimiter] 黑名单拦截: {}", keyAttr); + return fallbackMethodResult(jp, rateLimiterAccessInterceptor.fallbackMethod()); + } + } + + // 3. 获取或创建分布式 RateLimiter + RRateLimiter limiter = redissonClient.getRateLimiter("rl:limiter:" + keyAttr); + // 尝试设置速率,每秒放n个令牌 若已设置则返回 false + limiter.trySetRate(RateType.OVERALL, + (long) rateLimiterAccessInterceptor.permitsPerSecond(), + 1, RateIntervalUnit.SECONDS); + + // 4. 尝试获取令牌,如果取不到,则返回false + boolean allowed = limiter.tryAcquire(); + if (!allowed) { + // 超限后计入黑名单 + if (blacklistLimit > 0) { + RAtomicLong blCounter = redissonClient.getAtomicLong("rl:bl:" + keyAttr); + long count = blCounter.incrementAndGet(); + if (count == 1) { + blCounter.expire(24, TimeUnit.HOURS); + } + } + log.info("[RateLimiter] 限流拦截: {}", keyAttr); + return fallbackMethodResult(jp, rateLimiterAccessInterceptor.fallbackMethod()); + } + + // 5. 正常执行 + return jp.proceed(); + } + + /** + * 调用用户配置的降级方法 + */ + private Object fallbackMethodResult(JoinPoint jp, String fallbackMethod) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Signature sig = jp.getSignature(); + MethodSignature ms = (MethodSignature) sig; + Method method = jp.getTarget().getClass() + .getMethod(fallbackMethod, ms.getParameterTypes()); + return method.invoke(jp.getTarget(), jp.getArgs()); + } + + /** + * 从方法参数中获取 attr 字段值 + */ + private String getAttrValue(String attr, Object[] args) { + if (args == null || args.length == 0) return null; + if (args[0] instanceof String) { + return args[0].toString(); + } + for (Object arg : args) { + String val = extractField(arg, attr); + if (StringUtils.isNotBlank(val)) { + return val; + } + } + return null; + } + + private String extractField(Object obj, String name) { + try { + Field field = getFieldByName(obj, name); + if (field == null) return null; + field.setAccessible(true); + Object v = field.get(obj); + field.setAccessible(false); + return v != null ? v.toString() : null; + } catch (Exception e) { + log.warn("[RateLimiter] 提取字段失败 {}", name, e); + return null; + } + } + + private Field getFieldByName(Object obj, String name) { + Class cls = obj.getClass(); + while (cls != null) { + try { + return cls.getDeclaredField(name); + } catch (NoSuchFieldException e) { + cls = cls.getSuperclass(); + } + } + return null; + } +} diff --git a/group-buying-sys-trigger/src/main/java/edu/whut/trigger/http/DCCController.java b/group-buying-sys-trigger/src/main/java/edu/whut/trigger/http/DCCController.java index c130f62..bfb375b 100644 --- a/group-buying-sys-trigger/src/main/java/edu/whut/trigger/http/DCCController.java +++ b/group-buying-sys-trigger/src/main/java/edu/whut/trigger/http/DCCController.java @@ -25,6 +25,7 @@ public class DCCController implements IDCCService { * 动态值变更 * curl http://localhost:8091/api/v1/gbm/dcc/update_config?key=downgradeSwitch&value=1 * curl http://localhost:8091/api/v1/gbm/dcc/update_config?key=cutRange&value=0 + * curl http://127.0.0.1:8091/api/v1/gbm/dcc/update_config?key=rateLimiterSwitch&value=close */ @GetMapping("update_config") @Override diff --git a/group-buying-sys-trigger/src/main/java/edu/whut/trigger/http/MarketIndexController.java b/group-buying-sys-trigger/src/main/java/edu/whut/trigger/http/MarketIndexController.java index f107e9b..d627d2d 100644 --- a/group-buying-sys-trigger/src/main/java/edu/whut/trigger/http/MarketIndexController.java +++ b/group-buying-sys-trigger/src/main/java/edu/whut/trigger/http/MarketIndexController.java @@ -11,6 +11,7 @@ import edu.whut.domain.activity.model.entity.UserGroupBuyOrderDetailEntity; import edu.whut.domain.activity.model.valobj.GroupBuyActivityDiscountVO; import edu.whut.domain.activity.model.valobj.TeamStatisticVO; import edu.whut.domain.activity.service.IIndexGroupBuyMarketService; +import edu.whut.types.annotations.RateLimiterAccessInterceptor; import edu.whut.types.enums.ResponseCode; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -31,6 +32,7 @@ public class MarketIndexController implements IMarketIndexService { private final IIndexGroupBuyMarketService indexGroupBuyMarketService; + @RateLimiterAccessInterceptor(key = "userId", fallbackMethod = "queryGroupBuyMarketConfigFallBack", permitsPerSecond = 1.0d, blacklistCount = 1) @PostMapping("/query_group_buy_market_config") @Override public Response queryGroupBuyMarketConfig( @@ -113,4 +115,12 @@ public class MarketIndexController implements IMarketIndexService { } } + public Response queryGroupBuyMarketConfigFallBack(@RequestBody GoodsMarketRequestDTO requestDTO) { + log.error("查询拼团营销配置限流:{}", requestDTO.getUserId()); + return Response.builder() + .code(ResponseCode.RATE_LIMITER.getCode()) + .info(ResponseCode.RATE_LIMITER.getInfo()) + .build(); + } + } diff --git a/group-buying-sys-types/src/main/java/edu/whut/types/annotations/RateLimiterAccessInterceptor.java b/group-buying-sys-types/src/main/java/edu/whut/types/annotations/RateLimiterAccessInterceptor.java new file mode 100644 index 0000000..a31c3de --- /dev/null +++ b/group-buying-sys-types/src/main/java/edu/whut/types/annotations/RateLimiterAccessInterceptor.java @@ -0,0 +1,26 @@ +package edu.whut.types.annotations; + +import java.lang.annotation.*; + +/** + * 1.标记切入点,方法上贴了这个注解的,都要被拦截 + * 2.携带限流配置,供 Advice 读取 + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD}) +@Documented +public @interface RateLimiterAccessInterceptor { + + /** 用哪个字段作为拦截标识,未配置则默认走全部 */ + String key() default "all"; + + /** 限制频次(每秒请求次数) */ + double permitsPerSecond(); + + /** 黑名单拦截(多少次限制后加入黑名单)0 不限制 */ + double blacklistCount() default 0; + + /** 拦截后的执行方法 */ + String fallbackMethod(); + +} diff --git a/group-buying-sys-types/src/main/java/edu/whut/types/enums/ResponseCode.java b/group-buying-sys-types/src/main/java/edu/whut/types/enums/ResponseCode.java index 5569f7e..8e74323 100644 --- a/group-buying-sys-types/src/main/java/edu/whut/types/enums/ResponseCode.java +++ b/group-buying-sys-types/src/main/java/edu/whut/types/enums/ResponseCode.java @@ -15,6 +15,7 @@ public enum ResponseCode { INDEX_EXCEPTION("0003", "唯一索引冲突"), UPDATE_ZERO("0004", "更新记录为0"), HTTP_EXCEPTION("0005", "HTTP接口调用异常"), + RATE_LIMITER("0006", "接口限流"), E0001("E0001", "不存在对应的折扣计算服务"), E0002("E0002", "无拼团营销配置"),