极光推送服务限流方案

如何让系统在汹涌澎湃的流量面前谈笑风生?我们的策略是不要让系统超负荷工作。如果现有的系统扛不住业务目标怎么办?加机器!机器不够怎么办?业务降级,服务限流!

正所谓「他强任他强,清风拂山岗;他横任他横,明月照大江」,降级和限流是系统可用性保障中必不可少的神兵利器,丢卒保车,以暂停边缘业务为代价保障核心业务的资源,以系统不被突发流量压挂为第一要务。

现状

JPush API 频率控制

JPush API对访问次数,具有频率控制。即一定的时间窗口内,API允许调用的次数是有限制的。免费版:1分钟600次。

超出后:

1
{"error":{"code":2002,"message":"Request times of the app_key exceed the limit of current time window"}}

各业务组直接对接极光API

目前各业务线都在使用极光推送服务,并且是直接调用极光API。水归一源,终汇聚一处。想实现流量管控,只有将调用出口整合到一处,方可达到限流管控的目的。

现状

解决方案

令牌桶算法

令牌桶算法是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。令牌桶属于控制速率类型的。令牌桶算法最初来源于计算机网络。在网络传输数据时,为了防止网络拥塞,需限制流出网络的流量,使流量以比较均匀的速度向外发送。令牌桶算法就实现了这个功能,可控制发送到网络上数据的数目,并允许突发数据的发送。

在 Wikipedia 上,令牌桶算法是这么描述的:

  1. 每秒会有 r 个令牌放入桶中,或者说,每过 1/r 秒桶中增加一个令牌。
  2. 桶中最多存放 b 个令牌,如果桶满了,新放入的令牌会被丢弃。
  3. 当一个 n 字节的数据包到达时,消耗 n 个令牌,然后发送该数据包。
  4. 如果桶中可用令牌小于 n,则该数据包将被缓存或丢弃。

令牌桶

大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。传送到令牌桶的数据包需要消耗令牌。不同大小的数据包,消耗的令牌数量不一样。

令牌桶这种控制机制基于令牌桶中是否存在令牌来指示什么时候可以发送流量。令牌桶中的每一个令牌都代表一个字节。如果令牌桶中存在令牌,则允许发送流量;而如果令牌桶中不存在令牌,则不允许发送流量。因此,如果突发门限被合理地配置并且令牌桶中有足够的令牌,那么流量就可以以峰值速率发送。

基于令牌桶算法改造消息中心,流程图如下:

限流方案

单机限流

Google Guava RateLimiter就是令牌桶算法的实现,速率限制器会在可配置的速率下分配许可证。如果必要的话,每个acquire() 会阻塞当前线程直到许可证可用后获取该许可证。一旦获取到许可证,不需要再释放许可证。RateLimiter经常用于限制对一些物理资源或者逻辑资源的访问速率。与Semaphore 相比,Semaphore 限制了并发访问的数量而不是使用速率。

1
2
3
4
5
6
7
8
9
//每秒两个许可
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // 拿不到许可就等待
executor.execute(task);
}
}

通过设置许可证的速率来定义RateLimiter。在默认配置下,许可证会在固定的速率下被分配,速率单位是每秒多少个许可证。为了确保维护配置的速率,许可会被平稳地分配,许可之间的延迟会做调整。可能存在配置一个拥有预热期的RateLimiter的情况,在这段时间内,每秒分配的许可数会稳定地增长直到达到稳定的速率。

分布式限流

Remote Dictionary Server(Redis)是一个基于 key-value 键值对的持久化数据库存储系统。支持多种数据结构,包括 string (字符串)、list (链表)、set (集合)、zset (sorted set –有序集合)和 hash(哈希类型)。这些数据类型都支持 push/pop、add/remove 及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。

想实现分布式限流,非Redis莫属。Redis限流的大体思路是设置一个带有过期时间的key,每次申请令牌时,将 key 所储存的值加上增量 increment, 判断key 所存储的值是否达到上限,未达到上限,则获得许可。如有需要,亦可阻塞当前线程直到获得许可。

限流相关的命令:

事务

MULTI、EXEC、DISCARD和WATCH命令是Redis事务功能的基础,Redis事务允许在一次单独的步骤中执行一组命令,并且可以保证如下两个重要事项:

  • Redis会将一个事务中的所有命令序列化,然后按顺序执行。

    Redis不可能在一个Redis事务的执行过程中插入执行另一个客户端发出的请求。这样便能保证Redis将这些命令作为一个单独的隔离操作执行。

  • 在一个Redis事务中,Redis要么执行其中的所有命令,要么什么都不执行。

    因此,Redis事务能够保证原子性。EXEC命令会触发执行事务中的所有命令。因此,当某个客户端正在执行一次事务时,如果它在调用MULTI命令之前就从Redis服务端断开连接,那么就不会执行事务中的任何操作;相反,如果它在调用EXEC命令之后才从Redis服务端断开连接,那么就会执行事务中的所有操作。当Redis使用只增文件(AOF:Append-only File)时,Redis能够确保使用一个单独的write(2)系统调用,这样便能将事务写入磁盘。然而,如果Redis服务器宕机,或者系统管理员以某种方式停止Redis服务进程的运行,那么Redis很有可能只执行了事务中的一部分操作。Redis将会在重新启动时检查上述状态,然后退出运行,并且输出报错信息。使用redis-check-aof工具可以修复上述的只增文件,这个工具将会从上述文件中删除执行不完全的事务,这样Redis服务器才能再次启动。

从2.2版本开始,除了上述两项保证之外,Redis还能够以乐观锁(Watch)的形式提供更多的保证,这种形式非常类似于”检查再设置”(CAS:Check And Set)操作。

从2.6版本开始提供脚本(Lua scripting)能力,一种更灵活的批量命令组织方式用于取代目前的事务机制。脚本提供了更强大和灵活的编程能力,但也是一把双刃剑,由于 Redis 需要保证脚本执行的原子性和隔离性,脚本执行期间会阻塞其他命令的执行,因此建议使用高效的脚本完成业务。

SET

SET key value [EX seconds] [PX milliseconds] [NX|XX]

将字符串值 value 关联到 key 。如果 key 已经持有其他值, SET 就覆写旧值,无视类型。对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。

从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改:

EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
XX :只在键已经存在时,才对键进行设置操作。

INCRBY

将 key 所储存的值加上增量 increment 。如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCRBY 命令。如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。

Redis官方基于上述命令提供的限流代码:https://redis.io/commands/incr#pattern-rate-limiter-1

代码验证

使用Jedis client进行限流方案验证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package hbec.app.stock.ratelimiter;
/**
* @author roc
* @date 2018/01/24
*/
public interface RateLimiter {
/**
* Acquires a permit only if one is available at the
* time of invocation.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the result {@code LimiterResult#acquired is true},
* reducing the number of available permits by one.
*
* <p>If no permit is available then this method will return
* immediately with the value {@code false}.
*
* @param permits the number of permits to acquire
* @return acquire result {@code true} if a permit was acquired,
* and {@code false is false} otherwise
*/
boolean tryAcquire(int permits);
/**
* Acquires a permit from this method, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.
*
* @param permits the number of permits to acquire
*/
void acquire(int permits) throws InterruptedException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package hbec.app.stock.ratelimiter;
import com.google.common.base.Preconditions;
import hbec.app.stock.commons.redis.JedisTemplate;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Response;
import redis.clients.jedis.Transaction;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* @author roc
* @date 2018/01/23
*/
public class RedisRateLimiter implements RateLimiter {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisRateLimiter.class);
private static final String ZERO = "0";
private static final String NOT_EXISTS = "NX";
private static final String P_EXPIRE = "PX";
public static final int NOT_SET = -1;
private static final Random TIME_RANDOM = new Random();
/**
* 限流标识,redis key
*/
private String rateLimiterName;
/**
* 存活时间
*/
private int timeToLive;
/**
* 许可上限
*/
private int upperLimitPermits;
private JedisTemplate jedisTemplate;
@Override
public boolean tryAcquire(final int permits) {
Preconditions.checkArgument(permits > 0, "The number of permits must be greater than 0.");
return acquireSync(permits).isAcquired();
}
@Override
public void acquire(final int permits) throws InterruptedException {
Preconditions.checkArgument(permits > 0, "The number of permits must be greater than 0.");
LimiterResult limiterResult;
do {
limiterResult = acquireSync(permits);
if (!limiterResult.isAcquired()) {
long sleepTime = limiterResult.getRemainderTTL() + TIME_RANDOM.nextInt(5);
Thread.sleep(sleepTime);
}
} while (!limiterResult.isAcquired());
}
public LimiterResult acquireSync(final int permits) {
return jedisTemplate.execute(new JedisTemplate.JedisAction<LimiterResult>() {
@Override
public LimiterResult action(Jedis jedis) {
Transaction transaction = jedis.multi();
transaction.set(rateLimiterName, ZERO, NOT_EXISTS, P_EXPIRE, timeToLive);
Response<Long> currentQPS = transaction.incrBy(rateLimiterName, permits);
Response<Long> keyTTL = transaction.pttl(rateLimiterName);
transaction.exec();
Long currentPermit = currentQPS.get();
Long remainderTTL = keyTTL.get();
boolean acquired = currentPermit <= upperLimitPermits;
if (!acquired && remainderTTL == NOT_SET) {
LOGGER.error("Warning! Set ttl false, key={}, current ttl={}", rateLimiterName, remainderTTL);
jedis.expire(rateLimiterName, timeToLive);
remainderTTL = (long) timeToLive;
}
LOGGER.debug("rateLimiterName={}, passed={}, remainderTTL={}, time={}",
rateLimiterName, acquired ? "true" : "false",
remainderTTL, currentPermit);
return LimiterResult.newInstance(acquired, remainderTTL);
}
});
}
public static Builder newBuilder(JedisTemplate jedisTemplate) {
return new Builder(jedisTemplate);
}
public static final class Builder {
private JedisTemplate jedisTemplate;
private String rateLimiterName;
private int timeToLive;
private int upperLimitPermits;
private Builder(JedisTemplate jedisTemplate) {
Preconditions.checkNotNull(jedisTemplate, "JedisTemplate can't be null.");
this.jedisTemplate = jedisTemplate;
}
public Builder setRateLimiterName(String rateLimiterName) {
this.rateLimiterName = rateLimiterName;
return this;
}
public Builder setTimeToLive(int timeToLive, TimeUnit timeUnit) {
this.timeToLive = (int) timeUnit.toMillis(timeToLive);
return this;
}
public Builder setUpperLimitPermits(int upperLimitPermits) {
this.upperLimitPermits = upperLimitPermits;
return this;
}
public RedisRateLimiter build() {
Preconditions.checkArgument(StringUtils.isNotBlank(rateLimiterName), "RateLimiterName can't be blank.");
Preconditions.checkArgument(upperLimitPermits > 0, "UpperLimitPermits must be an integer greater than 0.");
Preconditions.checkArgument(timeToLive > 0, "TimeToLive must be an integer greater than 0.");
RedisRateLimiter redisRateLimiter = new RedisRateLimiter();
redisRateLimiter.jedisTemplate = this.jedisTemplate;
redisRateLimiter.upperLimitPermits = this.upperLimitPermits;
redisRateLimiter.rateLimiterName = this.rateLimiterName;
redisRateLimiter.timeToLive = this.timeToLive;
return redisRateLimiter;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package hbec.app.stock.ratelimiter;
/**
* @author roc
* @date 2018/01/24
*/
public class LimiterResult {
private boolean acquired;
private Long remainderTTL;
public static LimiterResult newInstance(boolean acquired, Long remainderTTL) {
LimiterResult limiterResult = new LimiterResult();
limiterResult.setAcquired(acquired);
limiterResult.setRemainderTTL(remainderTTL);
return limiterResult;
}
public boolean isAcquired() {
return acquired;
}
public Long getRemainderTTL() {
return remainderTTL;
}
public void setAcquired(boolean acquired) {
this.acquired = acquired;
}
public void setRemainderTTL(Long remainderTTL) {
this.remainderTTL = remainderTTL;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("LimiterResult{");
sb.append("acquired=").append(acquired);
sb.append(", remainderTTL=").append(remainderTTL);
sb.append('}');
return sb.toString();
}
}

JedisTemplate.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
package hbec.app.stock.commons.redis;
import hbec.app.stock.redis.JedisUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
/**
* JedisTemplate 提供了一个template方法,负责对Jedis连接的获取与归还。
* JedisAction<T> 和 JedisActionNoResult两种回调接口,适用于有无返回值两种情况。
* PipelineAction 与 PipelineActionResult两种接口,适合于pipeline中批量传输命令的情况。
*
* 同时提供一些JedisOperation中定义的 最常用函数的封装, 如get/set/zadd等。
*
* @author roc
*/
public class JedisTemplate {
private static Logger logger = LoggerFactory.getLogger(JedisTemplate.class);
private JedisPool jedisPool;
public JedisTemplate(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
/**
* Callback interface for template.
*/
public interface JedisAction<T> {
T action(Jedis jedis);
}
/**
* Callback interface for template without result.
*/
public interface JedisActionNoResult {
void action(Jedis jedis);
}
/**
* Callback interface for template.
*/
public interface PipelineAction {
List<Object> action(Pipeline Pipeline);
}
/**
* Callback interface for template without result.
*/
public interface PipelineActionNoResult {
void action(Pipeline Pipeline);
}
/**
* Execute with a call back action with result.
*/
public <T> T execute(JedisAction<T> jedisAction) throws JedisException {
Jedis jedis = null;
boolean broken = false;
try {
jedis = jedisPool.getResource();
return jedisAction.action(jedis);
} catch (JedisException e) {
broken = handleJedisException(e);
throw e;
} finally {
closeResource(jedis, broken);
}
}
/**
* Execute with a call back action without result.
*/
public void execute(JedisActionNoResult jedisAction) throws JedisException {
Jedis jedis = null;
boolean broken = false;
try {
jedis = jedisPool.getResource();
jedisAction.action(jedis);
} catch (JedisException e) {
broken = handleJedisException(e);
throw e;
} finally {
closeResource(jedis, broken);
}
}
/**
* Execute with a call back action with result in pipeline.
*/
public List<Object> execute(PipelineAction pipelineAction) throws JedisException {
Jedis jedis = null;
boolean broken = false;
try {
jedis = jedisPool.getResource();
Pipeline pipeline = jedis.pipelined();
pipelineAction.action(pipeline);
return pipeline.syncAndReturnAll();
} catch (JedisException e) {
broken = handleJedisException(e);
throw e;
} finally {
closeResource(jedis, broken);
}
}
/**
* Execute with a call back action without result in pipeline.
*/
public void execute(PipelineActionNoResult pipelineAction) throws JedisException {
Jedis jedis = null;
boolean broken = false;
try {
jedis = jedisPool.getResource();
Pipeline pipeline = jedis.pipelined();
pipelineAction.action(pipeline);
pipeline.sync();
} catch (JedisException e) {
broken = handleJedisException(e);
throw e;
} finally {
closeResource(jedis, broken);
}
}
/**
* Handle jedisException, write log and return whether the connection is
* broken.
*/
protected boolean handleJedisException(JedisException jedisException) {
if (jedisException instanceof JedisConnectionException) {
logger.error("Redis connection lost.", jedisException);
} else if (jedisException instanceof JedisDataException) {
if ((jedisException.getMessage() != null) && (jedisException.getMessage().indexOf("READONLY") != -1)) {
logger.error("Redis connection are read-only slave.", jedisException);
} else {
// dataException, isBroken=false
return false;
}
} else {
logger.error("Jedis exception happen.", jedisException);
}
return true;
}
/**
* Return jedis connection to the pool, call different return methods
* depends on the conectionBroken status.
*/
protected void closeResource(Jedis jedis, boolean conectionBroken) {
try {
if (conectionBroken) {
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
} catch (Exception e) {
logger.error("return back jedis failed, will fore close the jedis.", e);
}
}
// / Common Actions ///
/**
* Remove the specified keys. If a given key does not exist no operation is
* performed for this key.
*
* return false if one of the key is not exist.
*/
public Boolean del(final String... keys) {
return execute(new JedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
return jedis.del(keys) == keys.length ? true : false;
}
});
}
// / String Actions ///
/**
* Get the value of the specified key. If the key does not exist null is
* returned. If the value stored at key is not a string an error is returned
* because GET can only handle string values.
*/
public String get(final String key) {
return execute(new JedisAction<String>() {
@Override
public String action(Jedis jedis) {
return jedis.get(key);
}
});
}
/**
* Get the value of the specified key as Long.If the key does not exist null
* is returned.
*/
public Long getAsLong(final String key) {
String result = get(key);
return result != null ? Long.valueOf(result) : null;
}
/**
* Get the value of the specified key as Integer.If the key does not exist
* null is returned.
*/
public Integer getAsInt(final String key) {
String result = get(key);
return result != null ? Integer.valueOf(result) : null;
}
/**
* Get the value of the specified key as Boolean.If the key does not exist
* null is returned.
*/
public Boolean getAsBoolean(final String key) {
String result = get(key);
return result != null ? Boolean.valueOf(result) : null;
}
/**
* Get the values of all the specified keys. If one or more keys dont exist
* or is not of type String, a 'nil' value is returned instead of the value
* of the specified key, but the operation never fails.
*/
public List<String> mget(final String... keys) {
return execute(new JedisAction<List<String>>() {
@Override
public List<String> action(Jedis jedis) {
return jedis.mget(keys);
}
});
}
/**
* Set the string value as value of the key. The string can't be longer than
* 1073741824 bytes (1 GB).
*/
public void set(final String key, final String value) {
execute(new JedisActionNoResult() {
@Override
public void action(Jedis jedis) {
jedis.set(key, value);
}
});
}
/**
* The command is exactly equivalent to the following group of commands:
* {@link #set(String, String) SET} + {@link #expire(String, int) EXPIRE}.
* The operation is atomic.
*/
public void setex(final String key, final String value, final int seconds) {
execute(new JedisActionNoResult() {
@Override
public void action(Jedis jedis) {
jedis.setex(key, seconds, value);
}
});
}
/**
* SETNX works exactly like {@link #setNX(String, String) SET} with the only
* difference that if the key already exists no operation is performed.
* SETNX actually means "SET if Not eXists".
*
* return true if the key was set.
*/
public Boolean setnx(final String key, final String value) {
return execute(new JedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
return jedis.setnx(key, value) == 1 ? true : false;
}
});
}
/**
* The command is exactly equivalent to the following group of commands:
* {@link #setex(String, String, int) SETEX} +
* {@link #sexnx(String, String) SETNX}. The operation is atomic.
*/
public Boolean setnxex(final String key, final String value, final int seconds) {
return execute(new JedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
String result = jedis.set(key, value, "NX", "EX", seconds);
return JedisUtils.isStatusOk(result);
}
});
}
/**
* GETSET is an atomic set this value and return the old value command. Set
* key to the string value and return the old value stored at key. The
* string can't be longer than 1073741824 bytes (1 GB).
*/
public String getSet(final String key, final String value) {
return execute(new JedisAction<String>() {
@Override
public String action(Jedis jedis) {
return jedis.getSet(key, value);
}
});
}
/**
* Increment the number stored at key by one. If the key does not exist or
* contains a value of a wrong type, set the key to the value of "0" before
* to perform the increment operation.
* <p>
* INCR commands are limited to 64 bit signed integers.
* <p>
* Note: this is actually a string operation, that is, in Redis there are
* not "integer" types. Simply the string stored at the key is parsed as a
* base 10 64 bit signed integer, incremented, and then converted back as a
* string.
*
* @return Integer reply, this commands will reply with the new value of key
* after the increment.
*/
public Long incr(final String key) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.incr(key);
}
});
}
public Long incrBy(final String key, final long increment) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.incrBy(key, increment);
}
});
}
public Double incrByFloat(final String key, final double increment) {
return execute(new JedisAction<Double>() {
@Override
public Double action(Jedis jedis) {
return jedis.incrByFloat(key, increment);
}
});
}
/**
* Decrement the number stored at key by one. If the key does not exist or
* contains a value of a wrong type, set the key to the value of "0" before
* to perform the decrement operation.
*/
public Long decr(final String key) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.decr(key);
}
});
}
public Long decrBy(final String key, final long decrement) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.decrBy(key, decrement);
}
});
}
// / Hash Actions ///
/**
* If key holds a hash, retrieve the value associated to the specified
* field.
* <p>
* If the field is not found or the key does not exist, a special 'nil'
* value is returned.
*/
public String hget(final String key, final String fieldName) {
return execute(new JedisAction<String>() {
@Override
public String action(Jedis jedis) {
return jedis.hget(key, fieldName);
}
});
}
public List<String> hmget(final String key, final String... fieldsNames) {
return execute(new JedisAction<List<String>>() {
@Override
public List<String> action(Jedis jedis) {
return jedis.hmget(key, fieldsNames);
}
});
}
public Map<String, String> hgetAll(final String key) {
return execute(new JedisAction<Map<String, String>>() {
@Override
public Map<String, String> action(Jedis jedis) {
return jedis.hgetAll(key);
}
});
}
public void hset(final String key, final String fieldName, final String value) {
execute(new JedisActionNoResult() {
@Override
public void action(Jedis jedis) {
jedis.hset(key, fieldName, value);
}
});
}
public void hmset(final String key, final Map<String, String> map) {
execute(new JedisActionNoResult() {
@Override
public void action(Jedis jedis) {
jedis.hmset(key, map);
}
});
}
public Boolean hsetnx(final String key, final String fieldName, final String value) {
return execute(new JedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
return jedis.hsetnx(key, fieldName, value) == 1 ? true : false;
}
});
}
public Long hincrBy(final String key, final String fieldName, final long increment) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.hincrBy(key, fieldName, increment);
}
});
}
public Double hincrByFloat(final String key, final String fieldName, final double increment) {
return execute(new JedisAction<Double>() {
@Override
public Double action(Jedis jedis) {
return jedis.hincrByFloat(key, fieldName, increment);
}
});
}
public Long hdel(final String key, final String... fieldsNames) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.hdel(key, fieldsNames);
}
});
}
public Boolean hexists(final String key, final String fieldName) {
return execute(new JedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
return jedis.hexists(key, fieldName);
}
});
}
public Set<String> hkeys(final String key) {
return execute(new JedisAction<Set<String>>() {
@Override
public Set<String> action(Jedis jedis) {
return jedis.hkeys(key);
}
});
}
public Long hlen(final String key) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.hlen(key);
}
});
}
// / List Actions ///
public Long lpush(final String key, final String... values) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.lpush(key, values);
}
});
}
public String rpop(final String key) {
return execute(new JedisAction<String>() {
@Override
public String action(Jedis jedis) {
return jedis.rpop(key);
}
});
}
public String brpop(final String key) {
return execute(new JedisAction<String>() {
@Override
public String action(Jedis jedis) {
List<String> nameValuePair = jedis.brpop(key);
if (nameValuePair != null) {
return nameValuePair.get(1);
} else {
return null;
}
}
});
}
public String brpop(final int timeout, final String key) {
return execute(new JedisAction<String>() {
@Override
public String action(Jedis jedis) {
List<String> nameValuePair = jedis.brpop(timeout, key);
if (nameValuePair != null) {
return nameValuePair.get(1);
} else {
return null;
}
}
});
}
/**
* Not support for sharding.
*/
public String rpoplpush(final String sourceKey, final String destinationKey) {
return execute(new JedisAction<String>() {
@Override
public String action(Jedis jedis) {
return jedis.rpoplpush(sourceKey, destinationKey);
}
});
}
/**
* Not support for sharding.
*/
public String brpoplpush(final String source, final String destination, final int timeout) {
return execute(new JedisAction<String>() {
@Override
public String action(Jedis jedis) {
return jedis.brpoplpush(source, destination, timeout);
}
});
}
public Long llen(final String key) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.llen(key);
}
});
}
public String lindex(final String key, final long index) {
return execute(new JedisAction<String>() {
@Override
public String action(Jedis jedis) {
return jedis.lindex(key, index);
}
});
}
public List<String> lrange(final String key, final int start, final int end) {
return execute(new JedisAction<List<String>>() {
@Override
public List<String> action(Jedis jedis) {
return jedis.lrange(key, start, end);
}
});
}
public void ltrim(final String key, final int start, final int end) {
execute(new JedisActionNoResult() {
@Override
public void action(Jedis jedis) {
jedis.ltrim(key, start, end);
}
});
}
public void ltrimFromLeft(final String key, final int size) {
execute(new JedisActionNoResult() {
@Override
public void action(Jedis jedis) {
jedis.ltrim(key, 0, size - 1);
}
});
}
public Boolean lremFirst(final String key, final String value) {
return execute(new JedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
Long count = jedis.lrem(key, 1, value);
return (count == 1);
}
});
}
public Boolean lremAll(final String key, final String value) {
return execute(new JedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
Long count = jedis.lrem(key, 0, value);
return (count > 0);
}
});
}
// / Set Actions ///
public Boolean sadd(final String key, final String member) {
return execute(new JedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
return jedis.sadd(key, member) == 1 ? true : false;
}
});
}
public Set<String> smembers(final String key) {
return execute(new JedisAction<Set<String>>() {
@Override
public Set<String> action(Jedis jedis) {
return jedis.smembers(key);
}
});
}
// / Ordered Set Actions ///
/**
* return true for add new element, false for only update the score.
*/
public Boolean zadd(final String key, final double score, final String member) {
return execute(new JedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
return jedis.zadd(key, score, member) == 1 ? true : false;
}
});
}
public Double zscore(final String key, final String member) {
return execute(new JedisAction<Double>() {
@Override
public Double action(Jedis jedis) {
return jedis.zscore(key, member);
}
});
}
public Long zrank(final String key, final String member) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.zrank(key, member);
}
});
}
public Long zrevrank(final String key, final String member) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.zrevrank(key, member);
}
});
}
public Long zcount(final String key, final double min, final double max) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.zcount(key, min, max);
}
});
}
public Set<String> zrange(final String key, final int start, final int end) {
return execute(new JedisAction<Set<String>>() {
@Override
public Set<String> action(Jedis jedis) {
return jedis.zrange(key, start, end);
}
});
}
public Set<Tuple> zrangeWithScores(final String key, final int start, final int end) {
return execute(new JedisAction<Set<Tuple>>() {
@Override
public Set<Tuple> action(Jedis jedis) {
return jedis.zrangeWithScores(key, start, end);
}
});
}
public Set<String> zrevrange(final String key, final int start, final int end) {
return execute(new JedisAction<Set<String>>() {
@Override
public Set<String> action(Jedis jedis) {
return jedis.zrevrange(key, start, end);
}
});
}
public Set<Tuple> zrevrangeWithScores(final String key, final int start, final int end) {
return execute(new JedisAction<Set<Tuple>>() {
@Override
public Set<Tuple> action(Jedis jedis) {
return jedis.zrevrangeWithScores(key, start, end);
}
});
}
public Set<String> zrangeByScore(final String key, final double min, final double max) {
return execute(new JedisAction<Set<String>>() {
@Override
public Set<String> action(Jedis jedis) {
return jedis.zrangeByScore(key, min, max);
}
});
}
public Set<Tuple> zrangeByScoreWithScores(final String key, final double min, final double max) {
return execute(new JedisAction<Set<Tuple>>() {
@Override
public Set<Tuple> action(Jedis jedis) {
return jedis.zrangeByScoreWithScores(key, min, max);
}
});
}
public Set<String> zrevrangeByScore(final String key, final double max, final double min) {
return execute(new JedisAction<Set<String>>() {
@Override
public Set<String> action(Jedis jedis) {
return jedis.zrevrangeByScore(key, max, min);
}
});
}
public Set<Tuple> zrevrangeByScoreWithScores(final String key, final double max, final double min) {
return execute(new JedisAction<Set<Tuple>>() {
@Override
public Set<Tuple> action(Jedis jedis) {
return jedis.zrevrangeByScoreWithScores(key, max, min);
}
});
}
public Boolean zrem(final String key, final String member) {
return execute(new JedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
return jedis.zrem(key, member) == 1 ? true : false;
}
});
}
public Long zremByScore(final String key, final double start, final double end) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.zremrangeByScore(key, start, end);
}
});
}
public Long zremByRank(final String key, final long start, final long end) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.zremrangeByRank(key, start, end);
}
});
}
public Long zcard(final String key) {
return execute(new JedisAction<Long>() {
@Override
public Long action(Jedis jedis) {
return jedis.zcard(key);
}
});
}

验证代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package hbec.app.stock.ratelimiter;
import hbec.app.stock.commons.redis.JedisTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import redis.clients.jedis.JedisPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author roc
* @date 2018/01/26
*/
public class RedisRateLimiterTest {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisRateLimiterTest.class);
private RateLimiter rateLimiter;
private AtomicInteger atomicInteger = new AtomicInteger(1);
@BeforeTest
public void init() {
JedisPool jedisPool = new JedisPool("10.0.30.66", 6379);
JedisTemplate jedisTemplate = new JedisTemplate(jedisPool);
String rateLimiterName = "rateLimiterROC";
rateLimiter = RedisRateLimiter.newBuilder(jedisTemplate).setRateLimiterName(rateLimiterName)
.setUpperLimitPermits(10).setTimeToLive(5, TimeUnit.SECONDS).build();
}
@Test(threadPoolSize = 10, invocationCount = 20)
public void testTryAcquire() throws Exception {
boolean acquired = rateLimiter.tryAcquire(1);
if (acquired) {
LOGGER.info("thread {} acquire the permit {} times.", Thread.currentThread().getId(), atomicInteger.getAndIncrement());
}
}
@Test(threadPoolSize = 10, invocationCount = 20)
public void testAcquire() throws Exception {
rateLimiter.acquire(1);
LOGGER.info("thread {} acquire the permit {} times.", Thread.currentThread().getId(), atomicInteger.getAndIncrement());
}
}

测试用例

10个线程20次请求,限制5s内10次请求。

测试结果

10次请求在5s内正常通过,剩下10次请求等待下个时间窗口轮询。

测试结果