延迟队列的使用场景
- 淘宝订单业务:下单后30min之内没有付款,就自动取消订单。
- 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。
- 关闭空闲连接:服务器中有很多客户端的连接,空闲一段时间之后需要关闭之。
- 缓存:缓存中的对象,超过了空闲时间,从缓存中移出。
- 任务超时处理:在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
- 失败重试机制:业务操作失败后,间隔一定的时间进行失败重试。
这类业务的特点就是:延迟工作、失败重试。一种比较笨的方式是使用后台线程遍历所有对象,挨个检查。这种方法虽然简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,过小则存在效率问题,而且做不到按超时的时间顺序处理。
本地延迟队列 DelayQueue
DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。
|
|
DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。(注意:不能将null元素放置到这种队列)
但是我们知道,利用DelayQueue实现的是一个单机的、JVM内存中的延迟队列,并没有集群的支持,而且无法满足在对业务系统泵机的时、消息消费异常的时候做相应的逻辑处理。
基于分布式消息队列RabbitMQ实现延迟队列
RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能:
Per-Queue Message TTL RabbitMQ可以对消息和队列设置TTL(过期时间)。
RabbitMQ针对队列中的消息过期时间(Time To Live, TTL)有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead message,消费者将无法再收到该消息。
Dead Letter Exchanges 死信消息
利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信有以下几种情况:
- 消息被拒绝(basic.reject or basic.nack)并且requeue=false
- 消息TTL过期
- 队列达到最大长度
DLX同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当队列中有死信消息时,RabbitMQ就会自动的将死信消息重新发布到设置的Exchange中去,进而被路由到另一个队列,publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。
结合以上两个特性,就可以模拟出延迟消息的功能.
流程图
源代码
|
|
测试代码
|
|