7.24 redission分布式限流+黑名单+降级回调

This commit is contained in:
zhangsan 2025-07-24 17:16:08 +08:00
parent ca163978f2
commit 7cb33348f1
8 changed files with 231 additions and 9 deletions

View File

@ -4,7 +4,7 @@ services:
image: mysql:8.0
container_name: mysql
command: --default-authentication-plugin=mysql_native_password
restart: always
restart: unless-stopped
environment:
TZ: Asia/Shanghai
MYSQL_ROOT_PASSWORD: 123456
@ -20,11 +20,12 @@ services:
retries: 10
start_period: 15s
networks:
- my-network
- group-buy-network
# phpmyadmin https://hub.docker.com/_/phpmyadmin
phpmyadmin:
image: phpmyadmin:5.2.1
restart: unless-stopped
container_name: phpmyadmin
hostname: phpmyadmin
ports:
@ -37,13 +38,13 @@ services:
mysql:
condition: service_healthy
networks:
- my-network
- group-buy-network
# Redis
redis:
image: redis:6.2
restart: unless-stopped
container_name: redis
restart: always
hostname: redis
privileged: true
ports:
@ -52,7 +53,7 @@ services:
- ./redis/redis.conf:/usr/local/etc/redis/redis.conf
command: redis-server /usr/local/etc/redis/redis.conf
networks:
- my-network
- group-buy-network
healthcheck:
test: [ "CMD", "redis-cli", "ping" ]
interval: 10s
@ -64,7 +65,7 @@ services:
image: spryker/redis-commander:0.8.0
container_name: redis-admin
hostname: redis-commander
restart: always
restart: unless-stopped
ports:
- 8081:8081
environment:
@ -75,7 +76,7 @@ services:
- LANGUAGE=C.UTF-8
- LC_ALL=C.UTF-8
networks:
- my-network
- group-buy-network
depends_on:
redis:
condition: service_healthy
@ -86,7 +87,7 @@ services:
rabbitmq:
image: rabbitmq:3.8-management
container_name: rabbitmq
restart: always
restart: unless-stopped
ports:
- "5672:5672"
- "15672:15672"
@ -97,7 +98,9 @@ services:
volumes:
- ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
- ./rabbitmq/mq-data:/var/lib/rabbitmq
networks:
- group-buy-network
networks:
my-network:
group-buy-network:
driver: bridge

View File

@ -0,0 +1,17 @@
package edu.whut.config;
import edu.whut.infrastructure.aop.RateLimiterAOP;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 限流配置
*/
@Configuration
public class RateLimiterAutoConfig {
@Bean
public RateLimiterAOP rateLimiterAOP() {
return new RateLimiterAOP();
}
}

View File

@ -37,6 +37,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!-- 系统模块 -->
<dependency>
<groupId>edu.whut</groupId>

View File

@ -0,0 +1,160 @@
package edu.whut.infrastructure.aop;
import edu.whut.types.annotations.DCCValue;
import edu.whut.types.annotations.RateLimiterAccessInterceptor;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateType;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
/**
* 分布式限流切面基于 Redisson RRateLimiter RAtomicLong 实现
*/
@Aspect
public class RateLimiterAOP {
private final Logger log = LoggerFactory.getLogger(RateLimiterAOP.class);
/**
* 全局开关open/close
*/
@DCCValue("rateLimiterSwitch:open")
private String rateLimiterSwitch;
/**
* Redisson 客户端注入使用
*/
@Resource
private RedissonClient redissonClient;
@Pointcut("@annotation(edu.whut.types.annotations.RateLimiterAccessInterceptor)")
public void aopPoint() {}
@Around("aopPoint() && @annotation(rateLimiterAccessInterceptor)")
public Object doRouter(ProceedingJoinPoint jp,
RateLimiterAccessInterceptor rateLimiterAccessInterceptor) throws Throwable {
// 0. 全局开关
if (StringUtils.isBlank(rateLimiterSwitch) || "close".equals(rateLimiterSwitch)) {
return jp.proceed();
}
// 1. 获取限流维度 key
String key = rateLimiterAccessInterceptor.key();
if (StringUtils.isBlank(key)) {
throw new RuntimeException("annotation RateLimiter key is null");
}
String keyAttr = getAttrValue(key, jp.getArgs());
log.info("[RateLimiter] attr={}, permits={}, blacklistCount={}",
keyAttr,
rateLimiterAccessInterceptor.permitsPerSecond(),
rateLimiterAccessInterceptor.blacklistCount());
// 2. 黑名单检查分布式24h rl:ratelimit bl:blacklist
// 存储的是 用户在这一轮限流中被拒绝的次数大于blacklistLimit则被视作进入黑名单等key释放解决黑名单
double blacklistLimit = rateLimiterAccessInterceptor.blacklistCount();
if (blacklistLimit > 0) {
RAtomicLong blCounter = redissonClient.getAtomicLong("rl:bl:" + keyAttr);
if (blCounter.isExists() && blCounter.get() > blacklistLimit) {
log.info("[RateLimiter] 黑名单拦截: {}", keyAttr);
return fallbackMethodResult(jp, rateLimiterAccessInterceptor.fallbackMethod());
}
}
// 3. 获取或创建分布式 RateLimiter
RRateLimiter limiter = redissonClient.getRateLimiter("rl:limiter:" + keyAttr);
// 尝试设置速率每秒放n个令牌 若已设置则返回 false
limiter.trySetRate(RateType.OVERALL,
(long) rateLimiterAccessInterceptor.permitsPerSecond(),
1, RateIntervalUnit.SECONDS);
// 4. 尝试获取令牌如果取不到则返回false
boolean allowed = limiter.tryAcquire();
if (!allowed) {
// 超限后计入黑名单
if (blacklistLimit > 0) {
RAtomicLong blCounter = redissonClient.getAtomicLong("rl:bl:" + keyAttr);
long count = blCounter.incrementAndGet();
if (count == 1) {
blCounter.expire(24, TimeUnit.HOURS);
}
}
log.info("[RateLimiter] 限流拦截: {}", keyAttr);
return fallbackMethodResult(jp, rateLimiterAccessInterceptor.fallbackMethod());
}
// 5. 正常执行
return jp.proceed();
}
/**
* 调用用户配置的降级方法
*/
private Object fallbackMethodResult(JoinPoint jp, String fallbackMethod)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Signature sig = jp.getSignature();
MethodSignature ms = (MethodSignature) sig;
Method method = jp.getTarget().getClass()
.getMethod(fallbackMethod, ms.getParameterTypes());
return method.invoke(jp.getTarget(), jp.getArgs());
}
/**
* 从方法参数中获取 attr 字段值
*/
private String getAttrValue(String attr, Object[] args) {
if (args == null || args.length == 0) return null;
if (args[0] instanceof String) {
return args[0].toString();
}
for (Object arg : args) {
String val = extractField(arg, attr);
if (StringUtils.isNotBlank(val)) {
return val;
}
}
return null;
}
private String extractField(Object obj, String name) {
try {
Field field = getFieldByName(obj, name);
if (field == null) return null;
field.setAccessible(true);
Object v = field.get(obj);
field.setAccessible(false);
return v != null ? v.toString() : null;
} catch (Exception e) {
log.warn("[RateLimiter] 提取字段失败 {}", name, e);
return null;
}
}
private Field getFieldByName(Object obj, String name) {
Class<?> cls = obj.getClass();
while (cls != null) {
try {
return cls.getDeclaredField(name);
} catch (NoSuchFieldException e) {
cls = cls.getSuperclass();
}
}
return null;
}
}

View File

@ -25,6 +25,7 @@ public class DCCController implements IDCCService {
* 动态值变更
* curl http://localhost:8091/api/v1/gbm/dcc/update_config?key=downgradeSwitch&value=1
* curl http://localhost:8091/api/v1/gbm/dcc/update_config?key=cutRange&value=0
* curl http://127.0.0.1:8091/api/v1/gbm/dcc/update_config?key=rateLimiterSwitch&value=close
*/
@GetMapping("update_config")
@Override

View File

@ -11,6 +11,7 @@ import edu.whut.domain.activity.model.entity.UserGroupBuyOrderDetailEntity;
import edu.whut.domain.activity.model.valobj.GroupBuyActivityDiscountVO;
import edu.whut.domain.activity.model.valobj.TeamStatisticVO;
import edu.whut.domain.activity.service.IIndexGroupBuyMarketService;
import edu.whut.types.annotations.RateLimiterAccessInterceptor;
import edu.whut.types.enums.ResponseCode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -31,6 +32,7 @@ public class MarketIndexController implements IMarketIndexService {
private final IIndexGroupBuyMarketService indexGroupBuyMarketService;
@RateLimiterAccessInterceptor(key = "userId", fallbackMethod = "queryGroupBuyMarketConfigFallBack", permitsPerSecond = 1.0d, blacklistCount = 1)
@PostMapping("/query_group_buy_market_config")
@Override
public Response<GoodsMarketResponseDTO> queryGroupBuyMarketConfig(
@ -113,4 +115,12 @@ public class MarketIndexController implements IMarketIndexService {
}
}
public Response<GoodsMarketResponseDTO> queryGroupBuyMarketConfigFallBack(@RequestBody GoodsMarketRequestDTO requestDTO) {
log.error("查询拼团营销配置限流:{}", requestDTO.getUserId());
return Response.<GoodsMarketResponseDTO>builder()
.code(ResponseCode.RATE_LIMITER.getCode())
.info(ResponseCode.RATE_LIMITER.getInfo())
.build();
}
}

View File

@ -0,0 +1,26 @@
package edu.whut.types.annotations;
import java.lang.annotation.*;
/**
* 1.标记切入点方法上贴了这个注解的都要被拦截
* 2.携带限流配置 Advice 读取
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface RateLimiterAccessInterceptor {
/** 用哪个字段作为拦截标识,未配置则默认走全部 */
String key() default "all";
/** 限制频次(每秒请求次数) */
double permitsPerSecond();
/** 黑名单拦截多少次限制后加入黑名单0 不限制 */
double blacklistCount() default 0;
/** 拦截后的执行方法 */
String fallbackMethod();
}

View File

@ -15,6 +15,7 @@ public enum ResponseCode {
INDEX_EXCEPTION("0003", "唯一索引冲突"),
UPDATE_ZERO("0004", "更新记录为0"),
HTTP_EXCEPTION("0005", "HTTP接口调用异常"),
RATE_LIMITER("0006", "接口限流"),
E0001("E0001", "不存在对应的折扣计算服务"),
E0002("E0002", "无拼团营销配置"),