首页 >> 大全

bucket4j限流示例

2023-11-25 大全 28 作者:考证青年

最近处理测试某个业务的性能,发现当kafka消息量特别大的时候需要限制kafka消息消费速度,因为接受消息的处理流程比较多,当消息量特别大的时候,如果kafka消息了消息(kafka是自动完成)提交给后台处理,一旦后台线程中断,就会导致有消息遗漏处理。

这里补充一句,为什么没有设置kafka手动提交,因为每个消息的处理流程有差异,时间不一,整体上只要接受到kafka消息,完成基本处理,就提交到线程池中了(有的消息要提交给多个线程池), 无法等没有线程池都处理完毕了才手动向kafka提交确认。

是基于令牌桶算法的Java限流库, 主页在。 它主要用在3种场景:

a,限制比较重工作的速率。

b,将限流作为定时器,例如有些场景限制你对服务提供方的调用速度,因此使用限流器作为定时器,定时按照约定速率调用服务提供方。

c,限制对API访问速率。

_限流实现原理_限流器java

示例简介

为了说明问题,我们开发一个简单的工程,就一个Rest API, 该接口会根据租户id进行限流。限流速度:每30秒2次。 因为逻辑比较简单就不再说明,直接看代码。

注意:这个示例只是演示如何使用,实际工程中,对API调用的限流我们一般在API网关处完成。而不是在具体的API处。,本例子只是为了说明如何限流, 因为我们除了对API限流,也会多其他业务处理限流,例如我遇到的处理消息情况。

生成

public class RateLimits {private final LocalBucket bucket;public RateLimits(String limitsConfiguration) {LocalBucketBuilder builder = Bucket4j.builder();boolean initialized = false;for (String limitSrc : limitsConfiguration.split(",")) {long capacity = Long.parseLong(limitSrc.split(":")[0]);long duration = Long.parseLong(limitSrc.split(":")[1]);builder.addLimit(Bandwidth.simple(capacity, Duration.ofSeconds(duration)));initialized = true;}if (initialized) {bucket = builder.build();} else {throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration);}}public boolean tryConsume() {return bucket.tryConsume(1);}public long getAvailableTokens() {return bucket.getAvailableTokens();}
}

服务层

@Service
@Slf4j
public class RateLimitSvcImpl implements RateLimitSvc {@Value("${rest.limits.tenant.enabled:false}")private boolean perTenantLimitsEnabled;@Value("${rest.limits.tenant.configuration:}")private String perTenantLimitsConfiguration;private ConcurrentMap<String, RateLimits> perTenantLimits = new ConcurrentHashMap<>();@Overridepublic boolean execRateLimit(String tenantId) {if (perTenantLimitsEnabled) {RateLimits rateLimits = perTenantLimits.computeIfAbsent(tenantId, id -> new RateLimits(perTenantLimitsConfiguration));if (!rateLimits.tryConsume()) {log.info("tryConsume false, tenantId={}, leftToken={}", tenantId, rateLimits.getAvailableTokens());return false;} else {log.info("tryConsume true, tenantId={}, leftToken={}", tenantId, rateLimits.getAvailableTokens());return true;}return true;}
}

具体API


@RestController
@RequestMapping
public class MyController {//实际上我们一般不会再controller中直接进行rateLimit, 而是在网关处,根据租户id,用户id,应用id,或者ip进行限流。//本程序只是RateLimit的例子,不建议直接在生产代码中使用@Autowiredprivate RateLimitSvc rateLimitSvc;@ApiOperation(value = "按用户id查询")@ApiImplicitParams({@ApiImplicitParam(name = "tenantId", value = "tenantId", defaultValue = "001", required = true, dataType = "string", paramType = "path"),})@GetMapping(value = "/tenants/{tenantId}", produces = "application/json;charset=UTF-8")public String getDevice(@PathVariable String tenantId) {if (rateLimitSvc.execRateLimit(tenantId)) {//这里实际中应该是调用设备服务查询数据库,本示例为了简化直接new了一个对象Device device = new Device();device.setId("001");device.setName("一号设备");return JSONObject.toJSONString(device, SerializerFeature.WriteMapNullValue);} else {JSONObject json = new JSONObject();json.put("errMsg", "too many requests");return json.toJSONString();}}
}

完整的代码在这里,欢迎fork, 加星。 谢谢!

效果截图

1, 访问效果

2, 日志信息

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了