RabbitMQ java client
# 简单的发送消息
# maven导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.15.0</version>
</dependency>
1
2
3
4
5
2
3
4
5
# 生产者
# 导包
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
1
2
3
4
2
3
4
# 创建链接并发送消息
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
//声明一个队列
String queueName = "q_test";
//参数: 队列名, 是否持久化, 是否独占, 是否自动删除, 队列其他属性
channel.queueDeclare(queueName, true, false, false, null);
//发送消息
/**
参数:
交换机名称(默认交换机名称为空字符串),
routingKey 此处与队列同名,
消息的其他属性,
消息的字节数组
*/
channel.basicPublish("", queueName, null, "heheda".getBytes());
//关闭链接和通道
channel.close();
conn.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 消费者
# 导包
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
1
2
3
4
2
3
4
# 消费消息
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String queueName = "q_test";
//消息传递时的回调
//参数: 消费者标签, 传递的消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
//消费消息
//参数: 队列名称, 是否自动确认, 消息传递时回调, 消费者取消时的回调
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 多消费者
即Work queues, 将上文中的消费者创建多个,同时监听某个队列的消息
# 消息确认
如果某一个消费者挂掉, 需要将其未能成功消费的消息转给其他消费者, 这时就需要手动确认消息
//修改消费端
//在回调中手动确认消息已被消费
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
//参数: 消息的标签, 是否一次确认多条
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
//把是否自动确认改为false
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 消息持久化
为防止RabbitMQ挂掉后所有消息都会丢失可对消息和队列进行持久化
//声明队列时对队列进行持久化
//参数: 队列名, 是否持久化, 是否独占, 是否自动删除, 队列其他属性
channel.queueDeclare(queueName, true, false, false, null);
/**
发送消息时对消息进行持久化
参数:
交换机名称(默认交换机名称为空字符串),
routingKey 此处与队列同名,
消息的其他属性,
消息的字节数组
*/
channel.basicPublish("",
queueName,
MessageProperties.PERSISTENT_TEXT_PLAIN,
"heheda".getBytes());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 公平调度
RabbitMQ分派消息时默认是平均分配的, 但如果有的任务耗时较长, 就容易出现有的消费者一直工作, 有的消费者很闲的情况. 想要解决这个问题可以把消费者的Qos设为1, 这样消费者在确认消息之前就不会收到新的消息, 并且会把消息分配给下一个空闲的消费者.
//消费端
channel.basicQos(1);
1
2
2
# 发布/订阅
一次向多个消费者发送消息
# 生产者
以下会省略通用代码
//声明一个扇形交换机
String exchangeName = "logs";
channel.exchangeDeclare(exchangeName, "fanout");
//发送消息, routingKey的值在扇形交换机中会被忽略,所以直接设为空字符串
channel.basicPublish(exchangeName, "", null, "heheda".getBytes());
1
2
3
4
5
2
3
4
5
# 消费者
//声明一个临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定扇形交换机
channel.queueBind(queueName, "logs", "");
//消息传递时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), true);
};
//消费消息
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 有选择的接收
比如需要有一个队列对error级别的日志进行存储, 另一个队列把所有级别的日志信息打印在控制台
# 生产者
//声明一个直连交换机
String exchangeName = "logs";
channel.exchangeDeclare(exchangeName, "direct");
//发送消息, routingKey对应四种日志级别: debug, info, warn, error
channel.basicPublish(exchangeName, "debug", null, "heheda".getBytes());
1
2
3
4
5
2
3
4
5
# 消费者
//声明一个error队列
String queueName = "q_error";
channel.queueDeclare(queueName, true, false, false, null);
//声明一个交换机, 并进行绑定(正常情况只需要声明一次,但此处先启动消费者,交换机还不存在所以需要先创建)
String exchangeName = "logs";
channel.exchangeDeclare(exchangeName, "direct");
//把队列跟交换机进行绑定并设置routingKey为error
channel.queueBind(queueName, exchangeName, "error");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 另一个消费者
String queueName = "q_all";
channel.queueDeclare(queueName, true, false, false, null);
String exchangeName = "logs";
channel.exchangeDeclare(exchangeName, "direct");
//绑定所有的routingKey
channel.queueBind(queueName, exchangeName, "error");
channel.queueBind(queueName, exchangeName, "warn");
channel.queueBind(queueName, exchangeName, "info");
channel.queueBind(queueName, exchangeName, "debug");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 主题交换机
*可以只替换一个单词。
# 可以代替零个或多个单词。
# 生产者
//声明交换机
String exchangeName = "e_topic";
channel.exchangeDeclare(exchangeName, "topic");
//声明队列
String queueName = "q_topic";
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, "aa.*");
//发送消息
channel.basicPublish(exchangeName, "aa.bb", null, "heheda".getBytes());
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 消费者
String queueName = "q_topic";
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 参考文档
https://www.rabbitmq.com/tutorials/tutorial-one-java.html (opens new window)