RabbitMQ基于死信发送延时消息
# 达成死信条件
- 消费者使用basic.reject或 basic.nack否认消息,并将requeue参数设置为false
- 消息过期
- 因为超过所在队列长度限制被丢弃
# 使消息过期的几种方式
如果几种方式同时设置会取时间最短的时间
# 统一为队列内消息设置
//创建了一个队列,其中消息最多可驻留 60 秒
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl" , 60000);
channel.queueDeclare("myqueue", false, false, false, args);
1
2
3
4
2
3
4
# 单独为每条消息设置
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
1
2
3
4
5
2
3
4
5
# 设置队列的过期时间
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);
1
2
3
2
3
# 为死信消息设置交换机
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
1
2
3
4
5
2
3
4
5
可以为死信消息设置routingKey
, 如果不设置则使用消息默认的。
args.put("x-dead-letter-routing-key", "some-routing-key");
1
# 示例代码
//声明一个交换机
String exchangeName = "e_dead";
channel.exchangeDeclare(exchangeName, "direct");
//声明一个队列,并指定死信消息的交换机
String queueName = "q_dead";
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange" , exchangeName);
channel.queueDeclare(queueName, false, false, false, args);
//声明接收死信消息的队列
channel.queueDeclare("q_delay", false, false, false, null);
//绑定交换机和队列
channel.queueBind("q_delay", exchangeName, "q_dead");
//设置消息的过期时间为10s
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("10000")
.build();
channel.basicPublish("", "q_dead", properties, "heheda".getBytes());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21