#前言
在最近工作中承接了部分技改项需求,需要在流量高峰期对非紧密关联业务的缓存写操作进行速率限制,重保缓存读操作,保证业务稳定。 考虑到该部分需要限流业务的部署情况,一开始就敲定了单机限流的前提。 同时,在做了进一步调研之后,决定采取 Google Guava 包中成熟的令牌桶限流。
#令牌桶算法
令牌桶的限流原理可分为以下几步:
-
令牌桶以设定速率产生令牌并放入桶中,当令牌数达到桶的令牌上限时不再继续生成;
-
每当有新请求到来时都会先去令牌桶获取令牌,只有拿到令牌的请求才会放行;
-
根据获取令牌的方式,令牌桶对请求的处理方式也不同,令牌的获取分为阻塞式获取和非阻塞式获取:
- 阻塞式:若当前请求获取到令牌则放行,否则请求所在线程阻塞直至获取到令牌;
- 非阻塞式:若当前请求获取到令牌则放行,否则立即返回失败。
#核心API
#令牌桶创建
// 创建平滑突发限流实例
public static RateLimiter create(double permitsPerSecond) {
return create(RateLimiter.SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
}
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
RateLimiter rateLimiter = new SmoothRateLimiter.SmoothBursty(stopwatch, 1.0);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
SmoothBursty(RateLimiter.SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch, null);
this.maxBurstSeconds = maxBurstSeconds;
}
// 创建平滑预热限流实例
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit, double coldFactor) {
RateLimiter rateLimiter = new SmoothRateLimiter.SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
SmoothWarmingUp(RateLimiter.SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
super(stopwatch, null);
this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
this.coldFactor = coldFactor;
}
RateLimiter
有 SmoothBursty
和 SmoothWarmingUp
两个实现类,分别表示平滑突发限流和平滑预热限流,前者生成令牌的速率是恒定的,后置存在一个预热期,在预热期内令牌的速度是慢慢增加直至到达固定速度为止。
#设置限流器的速率
// 对外提供的方法
public final void setRate(double permitsPerSecond) {
Preconditions.checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized(this.mutex()) {
this.doSetRate(permitsPerSecond, this.stopwatch.readMicros());
}
}
// 实际调用方法
final void doSetRate(double permitsPerSecond, long nowMicros) {
this.resync(nowMicros);
double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
this.doSetRate(permitsPerSecond, stableIntervalMicros);
}
// resync方法
private void resync(long nowMicros) {
if (nowMicros > this.nextFreeTicketMicros) {
this.storedPermits = Math.min(this.maxPermits, this.storedPermits + (double)(nowMicros - this.nextFreeTicketMicros) / this.stableIntervalMicros);
this.nextFreeTicketMicros = nowMicros;
}
}
// SmoothBursty平滑突发限流实现
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
this.maxPermits = this.maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
this.storedPermits = this.maxPermits;
} else {
this.storedPermits = oldMaxPermits == 0.0 ? 0.0 : this.storedPermits * this.maxPermits / oldMaxPermits;
}
}
// SmoothWarmingUp平滑预热限流实现
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
this.maxPermits = (double)this.warmupPeriodMicros / stableIntervalMicros;
this.halfPermits = this.maxPermits / 2.0;
double coldIntervalMicros = stableIntervalMicros * 3.0;
this.slope = (coldIntervalMicros - stableIntervalMicros) / this.halfPermits;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
this.storedPermits = 0.0;
} else {
this.storedPermits = oldMaxPermits == 0.0 ? this.maxPermits : this.storedPermits * this.maxPermits / oldMaxPermits;
}
}
设置限流器速率时,根据传入的每秒产生的令牌数 permitsPerSecond
计算出并产生一个令牌所需的微秒数 stableIntervalMicros
。
然后在 resync
方法中,对令牌桶当前已存储的令牌数storedPermits
与下一个可以分配令牌的时间点 nextFreeTicketMicros
进行了调整,即如果当前时间晚于 nextFreeTicketMicros
,则计算这段时间内产生的令牌数并累加到 令牌桶当前已存储的令牌数storedPermits
上,并更新下次可获取令牌时间 nextFreeTicketMicros
为当前时间。
- 在
SmoothBursty
平滑突发限流实现中,先根据桶中最多可保存的多少秒存入的令牌数maxBurstSeconds
, 创建实例时传入的默认值为1.0
,然后再乘以创建实例时传入的每秒产生的令牌数permitsPerSecond
得到新的最大令牌数maxPermits
并更新实例的相应属性。 同时,计算得到令牌桶当前已存储的令牌数storedPermits
并更新相应实例属性, 计算公式为storedPermits = oldStoredPermits * maxPermits / oldMaxPermits
。 - 在
SmoothWarmingUp
平滑预热限流实现中,计算并更新maxPermits
和storedPermits
的逻辑有些区别,会根据预热期warmupPeriodMicros
进行计算。
#获取限流器的速率
// 对外提供的方法
public final double getRate() {
synchronized(this.mutex()) {
return this.doGetRate();
}
}
// 实际调用方法
final double doGetRate() {
return (double)TimeUnit.SECONDS.toMicros(1L) / this.stableIntervalMicros;
}
// mutex锁
private Object mutex() {
Object mutex = this.mutexDoNotUseDirectly;
if (mutex == null) {
synchronized(this) {
mutex = this.mutexDoNotUseDirectly;
if (mutex == null) {
this.mutexDoNotUseDirectly = mutex = new Object();
}
}
}
return mutex;
}
获取限流器速率的计算方式是使用 1
除以令牌的生成速率 stableIntervalMicros
得出,即 rate = 1 / stableIntervalMicros
。这里为了保证线程安全,实现了一个 mutex
锁。
#阻塞式获取令牌
// 获取一个令牌
public double acquire() {
return acquire(1);
}
// 获取指定数目的令牌
public double acquire(int permits) {
long microsToWait = this.reserve(permits);
this.stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * (double)microsToWait / (double)TimeUnit.SECONDS.toMicros(1L);
}
从 RateLimiter
获取许可,该方法会被阻塞直到获取到请求, 返回值为阻塞时间,单位为秒。
#非阻塞式获取令牌
// 获取令牌带超时时间
public boolean tryAcquire(long timeout, TimeUnit unit) {
return this.tryAcquire(1, timeout, unit);
}
// 获取指定数目令牌不带超时时间
public boolean tryAcquire(int permits) {
return this.tryAcquire(permits, 0L, TimeUnit.MICROSECONDS);
}
// 获取令牌不带超时时间
public boolean tryAcquire() {
return this.tryAcquire(1, 0L, TimeUnit.MICROSECONDS);
}
// 获取指定数目令牌带超时时间
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = Math.max(unit.toMicros(timeout), 0L);
checkPermits(permits);
long microsToWait;
synchronized(this.mutex()) {
long nowMicros = this.stopwatch.readMicros();
if (!this.canAcquire(nowMicros, timeoutMicros)) {
return false;
}
microsToWait = this.reserveAndGetWaitLength(permits, nowMicros);
}
this.stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
阻塞式获取令牌,可以指定获取令牌的数目和超时时间,若在超时时间内获取到了令牌,则不会阻塞,否则立即返回 false
。
#实践
#参数配置化
在业务实践中对令牌桶的参数做了动态配置,主要参数包括是否开启限流以及限流的 QPS 。同时为了实现一个通用限流的效果,以键值对的 json 形式存放相应配置, 键表示业务类型,值对象中配置限流器的相关参数如下所示:
{
"business_a": {
"switchOn": false,
"qps": 0.5
},
"business_b": {
"switchOn": true,
"qps": 10
}
}
同时实现一个读取配置的方法,如下:
@Component("rateLimiterConfig")
public class RateLimiterConfig {
/**
* 动态配置的令牌桶限流参数
*/
private String rateLimiterConfig;
/**
* 默认限制QPS
*/
public static final double DEFAULT_LIMITED_QPS = 1.0d;
/**
* 获取令牌桶限流配置
*
* @param rateLimiterType 限流器类型
* @return 结果
*/
public LimiterConfigInfo getRateLimiterConfig(GuavaRateLimiter.RateLimiterType rateLimiterType) {
LimiterConfigInfo configInfo = new LimiterConfigInfo().setSwitchOn(false).setQps(DEFAULT_LIMITED_QPS);
if (StringUtils.isNotBlank(rateLimiterConfig)) {
TypeReference<HashMap<String, LimiterConfigInfo>> typeRef = new TypeReference<HashMap<String, RateLimiterConfig.LimiterConfigInfo>>() {
};
Optional.ofNullable(
Optional.ofNullable(JSON.parseObject(rateLimiterConfig, typeRef))
.orElse(Maps.newHashMap())
.get(String.valueOf(rateLimiterType.getName()))
)
.ifPresent(
config -> {
Optional.ofNullable(config.getSwitchOn()).ifPresent(configInfo::setSwitchOn);
Optional.ofNullable(config.getQps()).ifPresent(configInfo::setQps);
}
);
}
return configInfo;
}
/**
* 限流配置项
*/
@Data
@Accessors(chain = true)
public static class LimiterConfigInfo {
/**
* 限流开关
*/
private Boolean switchOn;
/**
* 限流qps
*/
private Double qps;
}
}
#动态监听配置变化及方法封装
配合限流器配置的动态更新,需要动态对限流器速率进行动态设置。同时,对 RateLimiter
做一层业务封装。
@Slf4j
@SuppressWarnings("all")
@Service("guavaRateLimiter")
public class GuavaRateLimiter {
/**
* 限流器集合
*/
@Getter
private ConcurrentHashMap<RateLimiterType, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
/**
* 限流器配置
*/
@Resource(name = "rateLimiterConfig")
private RateLimiterConfig rateLimiterConfig;
/**
* 限流器配置变更监听器
*
* @param rateLimiterConfigProperty 限流器配置属性
*/
private void onRateLimiterConfigPropertyChange(Property rateLimiterConfigProperty) {
if (Objects.isNull(rateLimiterConfigProperty)) {
return;
}
String configKey = rateLimiterConfigProperty.getKey();
String configValue = Optional.ofNullable(rateLimiterConfigProperty.getValue()).map(o -> (String) o).orElse(null);
log.info("监听到限流配置发生变化, 键: {}, 值: {}", configKey, configValue);
TypeReference<HashMap<String, RateLimiterConfig.LimiterConfigInfo>> typeRef = new TypeReference<HashMap<String, RateLimiterConfig.LimiterConfigInfo>>() {
};
Map<String, RateLimiterConfig.LimiterConfigInfo> rateLimiterConfigMap = JSON.parseObject(configValue, typeRef);
if (MapUtils.isEmpty(rateLimiterConfigMap)) {
rateLimiterMap.clear();
log.info("限流配置为空, 清理本地限流器完成");
}
for (Map.Entry<String, RateLimiterConfig.LimiterConfigInfo> entry : rateLimiterConfigMap.entrySet()) {
RateLimiterType rateLimiterType = RateLimiterType.ofName(entry.getKey());
if (Objects.isNull(rateLimiterType)) {
continue;
}
RateLimiterConfig.LimiterConfigInfo value = entry.getValue();
if (Optional.ofNullable(value.getSwitchOn()).orElse(false)) {
double newQps = Optional.ofNullable(value.getQps()).orElse(RateLimiterConfig.DEFAULT_LIMITED_QPS);
if (rateLimiterMap.containsKey(rateLimiterType)) {
RateLimiter rateLimiter = rateLimiterMap.get(rateLimiterType);
double oldQps = rateLimiter.getRate();
rateLimiter.setRate(newQps);
log.info("【{}限流器】限流开关已开启, 监听到限流速率发生变化, 重新设置限流速率完成, {} -> {}", rateLimiterType.getDesc(), oldQps, newQps);
} else {
rateLimiterMap.put(rateLimiterType, RateLimiter.create(newQps));
log.info("【{}限流器】初始化完成", rateLimiterType.getDesc());
}
} else {
rateLimiterMap.remove(rateLimiterType);
log.info("【{}限流器】限流开关已关闭, 清理本地该限流器完成", rateLimiterType.getDesc());
}
}
}
/**
* 创建限流器
*
* @param rateLimiterType 限流器类型
*/
private void putRateLimiter(RateLimiterType rateLimiterType) {
if (!rateLimiterMap.containsKey(rateLimiterType)) {
rateLimiterMap.put(rateLimiterType, RateLimiter.create(rateLimiterConfig.getRateLimiterConfig(rateLimiterType).getQps()));
log.info("【{}限流器】初始化完成", rateLimiterType.getDesc());
}
}
/**
* 获取限流器
*
* @param rateLimiterType 限流器类型
* @return 结果
*/
private RateLimiter getRateLimiter(RateLimiterType rateLimiterType) {
// 获取时若为null则创建创建并放置
this.putRateLimiter(rateLimiterType);
return rateLimiterMap.get(rateLimiterType);
}
/**
* 阻塞式获取令牌
*
* @param rateLimiterType 限流器类型
*/
public void acquire(RateLimiterType rateLimiterType) {
if (rateLimiterConfig.getRateLimiterConfig(rateLimiterType).getSwitchOn()) {
// 如果开启限流则开始获取令牌
double waitingSeconds = this.getRateLimiter(rateLimiterType).acquire();
log.info("【{}限流器】限流开关已开启, 本次获取令牌成功, 获取令牌等待时间: {}s", rateLimiterType.getDesc(), waitingSeconds);
}
}
/**
* 限流器类型
*/
@Getter
public enum RateLimiterType {
BUSINESS_A(1, "business_a", "业务a"),
BUSINESS_B(1, "business_b", "业务b");
private final int code;
private final String name;
private final String desc;
RateLimiterType(int code, String name, String desc) {
this.code = code;
this.name = name;
this.desc = desc;
}
/**
* 根据code字符串获取枚举项
*
* @param codeStr code字符串
* @return 结果
*/
public static RateLimiterType ofCodeStr(String codeStr) {
for (RateLimiterType value : values()) {
if (String.valueOf(value.getCode()).equals(codeStr)) {
return value;
}
}
return null;
}
/**
* 根据name获取枚举项
*
* @param name 名称
* @return 结果
*/
public static RateLimiterType ofName(String name) {
for (RateLimiterType value : values()) {
if (String.valueOf(value.getName()).equals(name)) {
return value;
}
}
return null;
}
}
}
#后记
本文针对 guava
包的令牌桶限流做了简单介绍,并在业务实践中进行了单机限流。对于分布式限流的原理和设计,后续有时间再做进一步的介绍和实践。