基于Rabbitmq实现延迟队列

延迟队列的使用场景

  1. 淘宝订单业务:下单后30min之内没有付款,就自动取消订单。
  2. 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。
  3. 关闭空闲连接:服务器中有很多客户端的连接,空闲一段时间之后需要关闭之。
  4. 缓存:缓存中的对象,超过了空闲时间,从缓存中移出。
  5. 任务超时处理:在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
  6. 失败重试机制:业务操作失败后,间隔一定的时间进行失败重试。

这类业务的特点就是:延迟工作、失败重试。一种比较笨的方式是使用后台线程遍历所有对象,挨个检查。这种方法虽然简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,过小则存在效率问题,而且做不到按超时的时间顺序处理。

本地延迟队列 DelayQueue

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。

1
2
DelayQueue = BlockingQueue + PriorityQueue + Delayed

DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。(注意:不能将null元素放置到这种队列)

但是我们知道,利用DelayQueue实现的是一个单机的、JVM内存中的延迟队列,并没有集群的支持,而且无法满足在对业务系统泵机的时、消息消费异常的时候做相应的逻辑处理。

基于分布式消息队列RabbitMQ实现延迟队列

RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能:

Per-Queue Message TTL RabbitMQ可以对消息和队列设置TTL(过期时间)。

RabbitMQ针对队列中的消息过期时间(Time To Live, TTL)有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead message,消费者将无法再收到该消息。

Dead Letter Exchanges 死信消息

利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信有以下几种情况:

  1. 消息被拒绝(basic.reject or basic.nack)并且requeue=false
  2. 消息TTL过期
  3. 队列达到最大长度

DLX同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当队列中有死信消息时,RabbitMQ就会自动的将死信消息重新发布到设置的Exchange中去,进而被路由到另一个队列,publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。

结合以上两个特性,就可以模拟出延迟消息的功能.

流程图

源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package hbec.app.stock.rabbitmq.utils;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
/**
* @Description <strong>基于RabbitMQ实现的分布式延迟重试队列</strong>
*
* <ul>
* <li>delayExchangeName : 交换器名称</li>
* <li>delayQueueName : 延迟队列名称</li>
* <li>delayRoutingKeyName : 路由器名称</li>
* <li>perDelayQueueMessageTTL : 延迟队列中message的默认ttl</li>
* </ul>
* 通过{@link RabbitMQDelayQueue#put(byte[], long, TimeUnit)}首次进入延迟队列的消息,
* 其ttl = min(message ttl, per queue message ttl),
* 消息被Reject/nack之后变成死信消息,其自带message ttl失效,
* 以后将按照{@link #perDelayQueueMessageTTL}指定的延迟时间投递给经由{@link RabbitMQDelayQueue#consumerRegister}注册的消费者,直到消息被Ack.
*
* @author roc roc.fly@qq.com
* @date Dec 9, 2016 3:29:39 PM
*/
public class RabbitMQDelayQueue {
private static Logger LOGGER = LoggerFactory.getLogger(RabbitMQDelayQueue.class);
private static final String POSTFIX_TASK = "_task";
// direct类型 交换器
public static final String EXCHANGE_TYPE_DIRECT = "direct";
private Connection connection;
//注册消费者
private ConsumerRegister consumerRegister;
//任务队列配置
private String taskExchangeName;
private String taskQueueName;
private String taskRoutingKeyName;
//延迟队列配置
private String delayExchangeName;
private String delayQueueName;
private String delayRoutingKeyName;
//延迟队列中的消息ttl
private long perDelayQueueMessageTTL;
public RabbitMQDelayQueue(Connection connection, ConsumerRegister consumerRegister, String delayExchangeName, String delayQueueName, String delayRoutingKeyName, long perDelayQueueMessageTTL) throws IOException {
this.connection = connection;
this.consumerRegister = consumerRegister;
this.delayExchangeName = delayExchangeName;
this.delayQueueName = delayQueueName;
this.delayRoutingKeyName = delayRoutingKeyName;
this.perDelayQueueMessageTTL = perDelayQueueMessageTTL;
this.taskExchangeName = delayExchangeName + POSTFIX_TASK;
this.taskQueueName = delayQueueName + POSTFIX_TASK;
this.taskRoutingKeyName = delayRoutingKeyName + POSTFIX_TASK;
init();
registerConsumer();
}
/**
*
* @Description 注册消费者
* @author roc roc.fly@qq.com
* @date Dec 29, 2016 1:36:25 PM
*/
public interface ConsumerRegister {
public Consumer register(Channel channel) throws IOException;
}
/**
* 注册带有ttl的queue和对应的任务队列
*
* @throws IOException
* @author roc
*/
private void init() throws IOException {
Channel channel = connection.createChannel();
channel.exchangeDeclare(taskExchangeName, EXCHANGE_TYPE_DIRECT, true);
channel.exchangeDeclare(delayExchangeName, EXCHANGE_TYPE_DIRECT, true);
// 任务队列 B
HashMap<String, Object> argumentsTask = Maps.newHashMap();
argumentsTask.put("x-dead-letter-exchange", delayExchangeName);
argumentsTask.put("x-dead-letter-routing-key", delayRoutingKeyName);
channel.queueDeclare(taskQueueName, true, false, false, argumentsTask);
channel.queueBind(taskQueueName, taskExchangeName, taskRoutingKeyName);
// 延迟队列 A
HashMap<String, Object> argumentsDelay = Maps.newHashMap();
argumentsDelay.put("x-dead-letter-exchange", taskExchangeName);
argumentsDelay.put("x-dead-letter-routing-key", taskRoutingKeyName);
argumentsDelay.put("x-message-ttl", perDelayQueueMessageTTL);
channel.queueDeclare(delayQueueName, true, false, false, argumentsDelay);
channel.queueBind(delayQueueName, delayExchangeName, delayRoutingKeyName);
channel.close();
}
/**
* 注册消费者
* @throws IOException
* @author roc
*/
private void registerConsumer() throws IOException {
LOGGER.info("register consumer ->{}", this);
Channel channel = connection.createChannel();
Consumer consumer = consumerRegister.register(channel);
channel.basicConsume(taskQueueName, false, consumer);
LOGGER.info("register consumer ->{} success", this);
}
/**
* 消息入队
*
* @param body 消息内容
* @param timeout 超时时间
* @param unit 超时时间单位
* @throws IOException
* @author roc
*/
public void put(byte[] body, long timeout, TimeUnit unit) throws IOException {
Preconditions.checkNotNull(body);
Preconditions.checkArgument(timeout >= 0);
Preconditions.checkNotNull(unit);
LOGGER.info("put element to delay queue ->{}", body.hashCode());
Channel channel = null;
try {
channel = connection.createChannel();
// deliveryMode=2 标识任务的持久性
long millis = unit.toMillis(timeout);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(millis)).deliveryMode(2).build();
channel.basicPublish(delayExchangeName, delayRoutingKeyName, properties, body);
LOGGER.info("put element to delay queue success");
} finally {
if (null != channel)
channel.close();
}
}
public static class Builder {
private Connection connection;
private ConsumerRegister consumerRegister;
private String delayExchangeName;
private String delayQueueName;
private String delayRoutingKeyName;
private long perDelayQueueMessageTTL;
public Builder setConnection(Connection connection) {
this.connection = connection;
return this;
}
public Builder setDelayExchangeName(String delayExchangeName) {
this.delayExchangeName = delayExchangeName;
return this;
}
public Builder setDelayQueueName(String delayQueueName) {
this.delayQueueName = delayQueueName;
return this;
}
public Builder setDelayRoutingKeyName(String delayRoutingKeyName) {
this.delayRoutingKeyName = delayRoutingKeyName;
return this;
}
public Builder setConsumerRegister(ConsumerRegister consumerRegister) {
this.consumerRegister = consumerRegister;
return this;
}
public Builder setPerDelayQueueMessageTTL(long timeout, TimeUnit unit) {
this.perDelayQueueMessageTTL = unit.toMillis(timeout);;
return this;
}
public RabbitMQDelayQueue build() throws IOException {
Preconditions.checkNotNull(connection);
Preconditions.checkNotNull(delayExchangeName);
Preconditions.checkNotNull(delayQueueName);
Preconditions.checkNotNull(delayRoutingKeyName);
Preconditions.checkNotNull(consumerRegister);
return new RabbitMQDelayQueue(connection, consumerRegister, delayExchangeName, delayQueueName, delayRoutingKeyName, perDelayQueueMessageTTL);
}
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package hbec.app.stock.rabbitmq.utils;
import hbec.app.stock.rabbitmq.utils.RabbitMQDelayQueue.ConsumerRegister;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 测试demo
*
*/
public class RabbitMQDelayQueueTest {
public static void main(String[] args) throws IOException {
delayQueue();
}
public static void delayQueue() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
Address address = new Address("10.0.30.67", 56720);
Connection connection = factory.newConnection(new Address[] { address });
RabbitMQDelayQueue delayQueue = new RabbitMQDelayQueue.Builder().setConnection(connection).setPerDelayQueueMessageTTL(15, TimeUnit.SECONDS).setDelayExchangeName("delay_exchange_roc").setDelayQueueName("delay_queue_roc").setDelayRoutingKeyName("delay_routing_key_roc").setConsumerRegister(new ConsumerRegister() {
@Override
public Consumer register(Channel channel) throws IOException {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
String routingKey = envelope.getRoutingKey();
// TODO do something
String content = new String(body, Charset.forName("utf-8"));
System.out.println("receive message --- > " + content);
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
System.out.println("xDeath--- > " + xDeath);
if (xDeath != null && !xDeath.isEmpty()) {
Map<String, Object> entrys = xDeath.get(0);
}
}
// 消息拒收
// if(do something) 消息重新入队
getChannel().basicReject(deliveryTag, false);
// else 消息应答
// getChannel().basicAck(deliveryTag, false);
}
};
}
}).build();
delayQueue.put("{\"name\" : \"i am roc!!\"}\"".getBytes("UTF-8"), 3, TimeUnit.SECONDS);
}
}