限流(rate limiter)
项目中业务提出需求,要求对商品的立即购买接口进行限流。
经过百度及调研,决定在拦截器加限流。拦截器相关讲解见上几篇博客springmv中的拦截器
springmvc限流
RateLimiter
最终的限流代码如下(从百度借鉴,已经找不到出处):
/**
* spring请求限流器
* 可对全局请求或url表达式请求进行限流
* 内部使用spring DispatcherServlet的匹配器PatternsRequestCondition
* 进行url的匹配。
*
* @author ValleyChen
* @version 1.0.0
* @time 2017/1/21
*/
public class RateLimitInterceptor implements HandlerInterceptor {
private RateLimiter globalRateLimiter;
private Properties urlProperties;
private Map<PatternsRequestCondition, RateLimiter> urlRateMap;
private UrlPathHelper urlPathHelper;
private int globalRate;
private List<URLLimitMapping> limitMappings;
public RateLimitInterceptor() {
this(0);
}
/**
* @param globalRate 全局请求限制qps
*/
public RateLimitInterceptor(int globalRate) {
urlPathHelper = new UrlPathHelper();
this.globalRate = globalRate;
if (globalRate > 0)
globalRateLimiter = RateLimiter.create(globalRate);
}
/**
* @param globalRate 全局请求限制qps
*/
public void setGlobalRate(int globalRate) {
this.globalRate = globalRate;
if (globalRate > 0)
globalRateLimiter = RateLimiter.create(globalRate);
}
/**
* url限流
*
* @param urlProperties url表达式->qps propertis
*/
public void setUrlProperties(Properties urlProperties) {
if (urlRateMap == null)
urlRateMap = new ConcurrentHashMap();
fillRateMap(urlProperties, urlRateMap);
this.urlProperties = urlProperties;
}
/**
* url限流
*
* @param limitMappings url表达式 array->qps mapping
*/
public void setLimitMappings(List<URLLimitMapping> limitMappings) {
if (urlRateMap == null)
urlRateMap = new ConcurrentHashMap();
fillRateMap(limitMappings, urlRateMap);
this.limitMappings = limitMappings;
}
/**
* 将url表达转换为PatternsRequestCondition,并生成对应RateLimiter
* 保存
*
* @param controllerProperties
* @param map
*/
private void fillRateMap(Properties controllerProperties, Map<PatternsRequestCondition, RateLimiter> map) {
if (controllerProperties != null) {
for (String key : controllerProperties.stringPropertyNames()) {
String value = controllerProperties.getProperty(key);
if (value.matches("[0-9]*[1-9][0-9]*")) {
map.put(new PatternsRequestCondition(key), RateLimiter.create(Double.valueOf(value)));
} else {
LoggerUtil.log(key + " 的值" + value + " 不是一个合法的限制");
}
}
}
}
/**
* 将url表达转换为PatternsRequestCondition,并生成对应RateLimiter
* 保存
*
* @param limitMappings
* @param map
*/
private void fillRateMap(List<URLLimitMapping> limitMappings, Map<PatternsRequestCondition, RateLimiter> map) {
if (limitMappings != null) {
for (URLLimitMapping mapping : limitMappings) {
map.put(new PatternsRequestCondition(mapping.getUrls()), RateLimiter.create(Double.valueOf(mapping.getRate())));
}
}
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (urlRateMap != null) {
String lookupPath = urlPathHelper.getLookupPathForRequest(request);
for (PatternsRequestCondition patternsRequestCondition : urlRateMap.keySet()) {
//使用spring DispatcherServlet的匹配器PatternsRequestCondition进行匹配
List<String> matches = patternsRequestCondition.getMatchingPatterns(lookupPath);
if (!matches.isEmpty()) {
if (urlRateMap.get(patternsRequestCondition).tryAcquire()) {
LoggerUtil.log(lookupPath + " 请求匹配到" + Joiner.on(",").join(patternsRequestCondition.getPatterns()) + "限流器");
} else {
LoggerUtil.log(lookupPath + " 请求超过" + Joiner.on(",").join(patternsRequestCondition.getPatterns()) + "限流器速率");
//return false;
throw new ResponseException(500,"busy");
}
}
}
}
//全局限流
if (globalRateLimiter != null) {
if (!globalRateLimiter.tryAcquire()) {
return false;
}
}
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
}
}
package com.chen.limit.model;
/**
* @author ValleyChen
* @version 1.0.0
* @time 2017/1/22
*/
public class URLLimitMapping {
private String[] urls;
private int rate=0;
public URLLimitMapping() {
}
public URLLimitMapping(String[] urls, int rate) {
this.urls = urls;
this.rate = rate;
}
public String[] getUrls() {
return urls;
}
public void setUrls(String[] urls) {
this.urls = urls;
}
public int getRate() {
return rate;
}
public void setRate(int rate) {
this.rate = rate;
}
}
public class RequestLimitInterceptor implements HandlerInterceptor ,BeanPostProcessor{
private Logger logger = LoggerFactory.getLogger(RequestLimitInterceptor.class);
private Integer globalRateLimiter = 100;
private Map<PatternsRequestCondition, RateLimiter> urlRateMap;
private Properties urlProperties;
private UrlPathHelper urlPathHelper = new UrlPathHelper();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (urlRateMap != null) {
String lookupPath = urlPathHelper.getLookupPathForRequest(request);
for (PatternsRequestCondition patternsRequestCondition : urlRateMap.keySet()) {
//使用spring DispatcherServlet的匹配器PatternsRequestCondition进行匹配
List<String> matches = patternsRequestCondition.getMatchingPatterns(lookupPath);
if (!matches.isEmpty()) {
if (urlRateMap.get(patternsRequestCondition).tryAcquire(1000, TimeUnit.MILLISECONDS)) {
logger.info(" 请求'{}'匹配到mathes {} ,成功获取令牌,进入请求。" ,lookupPath ,Joiner.on(",").join(patternsRequestCondition.getPatterns()) );
} else {
logger.info( " 请求'{}'匹配到mathes {},超过限流速率,获取令牌失败。" ,lookupPath ,Joiner.on(",").join(patternsRequestCondition.getPatterns()));
return false;
}
}
}
}
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
}
/**
* 限流的 URL与限流值的K/V 值
*
* @param urlProperties
*/
public void setUrlProperties(Properties urlProperties) {
this.urlProperties = urlProperties;
}
public void setGlobalRateLimiter(Integer globalRateLimiter) {
this.globalRateLimiter = globalRateLimiter;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if(RequestMappingHandlerMapping.class.isAssignableFrom(bean.getClass())){
if(urlRateMap==null){
urlRateMap = new ConcurrentHashMap<>();
}
logger.info("we get all the controllers's methods and assign it to urlRateMap");
RequestMappingHandlerMapping requestMappingHandlerMapping = (RequestMappingHandlerMapping)bean;
Map<RequestMappingInfo, HandlerMethod> handlerMethods = requestMappingHandlerMapping.getHandlerMethods();
for (RequestMappingInfo rmi : handlerMethods.keySet()) {
PatternsRequestCondition pc = rmi.getPatternsCondition();
urlRateMap.put(pc,RateLimiter.create(globalRateLimiter));
}
if(urlProperties!=null){
for(String urlPatterns :urlProperties.stringPropertyNames()){
String limit = urlProperties.getProperty(urlPatterns);
if(!limit.matches("^-?\\d+$"))
logger.error("the value {} for url patterns {} is not a number ,please check it ",limit,urlPatterns);
urlRateMap.put(new PatternsRequestCondition(urlPatterns), RateLimiter.create(Integer.parseInt(limit)));
}
}
}
return bean;
}
}
应用
拦截器配置,可以统一配置所有请求的上限,也可以单独对某个 url配置,该拦截器是基于 SpringMvc 的RequestMappingHandlerMapping获取url 进行操作。
<bean id="requestLimitInterceptor" class="cn.fraudmetrix.creditcloud.app.intercepters.RequestLimitInterceptor">
<property name="globalRateLimiter" value="100" />
<property name="urlProperties">
<props>
<prop key="/creditcloud/test">100</prop>
</props>
</property>
</bean>
<!--拦截器配置-->
<mvc:interceptors>
<ref bean="requestLimitInterceptor" />
</mvc:interceptors>
压测结果:
原理分析
谈一谈我的理解:
采用令牌桶算法,想象有一个固定容量的桶,以稳定的速率生成令牌放在桶中。当有请求时,需要获取令牌才能通过。因为桶的容量固定,令牌满了就不生产了。桶中有充足的令牌时,突发的流量可以直接获取令牌通过。当令牌取光后,桶空了,后面的请求就得等生成令牌后才能通过,这种情况下请求通过的速率就变得稳定了。
关键词:令牌生成速率固定,能取到令牌即可通过。
令牌桶的优点是可用处理突发流量。
作者用令牌桶的方式卖包子的例子进行解释:
包子铺每天早上7点准时开门,师傅开始制作这60屉小笼包【RateLimiter rateLimiter = RateLimiter.create(60)】。顾客蜂拥而至排队,第一个顾客先付一屉小笼包的钱【rateLimiter.acquire()】,等一分钟后小笼包出锅,便可以进屋吃包子了。这时伙计会和顾客说:“您本次等待了1分钟,久等了。”【rateLimiter.acquire()返回的值】。之后是第二个顾客,付钱,等一分钟,进屋吃包子。于是,早上大家就这样一直稳定有序的进屋吃包子。这事来了个火急火燎的上班族,说:“我着急吃饭,不想等,有现成的包子吗?”【rateLimiter.tryAcquire()】伙计看了一眼后厨说:“没有现成的包子,需要等会哦。”一看还要等,第三个顾客就走了。
早高峰过去后,买包子的人少了,师傅做完60个包子后,就休息了。
中午12点,隔壁科技园的程序员们下班了,顾客蜂拥而至。因为已经有了60个库存,大家不用等,付了钱就可以直接进屋吃包子,屋子里一下就涌入了60个顾客。看到小笼包都买光了,后厨师傅又赶紧忙了起来。
这时来了个大胃王,大嗓门喊道:“我想要30屉包子。”【rateLimiter.acquire(30)】伙计一愣,心想,这一下也做不出这么多包子呀。但是本着客户至上的原则,他对大胃王说:“好的先生,不过我们的包子都是现做的,您先吃着,我们蒸好了陆陆续续给您端上来可以吗?”大胃王说:“哈哈,好!好饭不怕晚!”于是交钱进门吃包子了。这时候又来了个顾客A,说我想要一屉包子。伙计心里一算,现在是一点,等刚才那个大胃王要的包子上齐了,才能给这位顾客上包子,所以需要等到1:30才能给这位顾客吃上,好惨一顾客。于是说:“不好意思这位顾客,小笼包现在没有了,您需要等一段时间。”顾客A说:“好吧,我交了钱等着吧。”于是过了半个小时,后厨做出来了第31屉包子,顾客A就满足的进屋吃包子了。
因此,令牌桶算法有两个参数限制:
1.桶大小(最大令牌数限制)
2.补给率(匀速生成速率)
稍后,博主又介绍了:
两种获取令牌的方法–
acquire是直接获取令牌,如果还没有到达接受下一个请求的时间点,就继续等待。而tryAcquire则如其名,我先试试,如果在我允许的时间范围内获取不到就不等了,如果能获取到那我就等着。
两种实现方式–
SmoothBursty:平滑突发限流,以稳定的速率生成令牌。
SmoothWarmingUp:平滑预热限流,随着请求量的增加,令牌生成速率会缓慢提升直到一个稳定的速率。
Dubbo服务端限流
并发控制
限制com.foo.BarService的每个方法,服务端并发执行(或占用线程池线程数)不能超过10个:
<dubbo:service interface="com.foo.BarService" executes="10" />
限制com.foo.BarService的sayHello方法,服务器并发执行(或占用线程池线程数)不能超过10个。
<dubbo:service interface="com.foo.BarService">
<dubbo:method name="sayHello" executes="10" />
</dubbo:service>
actives限流
该限流方式与前两种不同,其可以设置在提供端,也可以设置在消费者端。可以设置为接口级别,也可以设置为方法级别。
根据消费者与提供者建立的连接类型,其意义也不同。
长连接: 表示当前的长连接最多可以处理的请求个数。与长连接的数量没有问题。
短连接:表示当前服务可以同时处理的短连接数量。
类级别
<dubbo:service interface="com.foo.BarService" actives="10" />
<dubbo:reference interface="com.foo.BarService" actives="10" />
方法级别
<dubbo:reference interface="com.foo.BarService">
<dubbo:method name="sayHello" actives="10" />
</dubbo:service>
<dubbo:reference interface="com.foo.BarService">
<dubbo:method name="sayHello" actives="10" />
</dubbo:service>
Nginx的API限流(http):
API限流(http)
Nginx
对于Nginx接入层限流可以使用Nginx自带了两个模块:
连接数限流模块ngx_http_limit_conn_module和漏桶算法实现的请求限流模块ngx_http_limit_req_module。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!