MQ系列(七):RabbitMQ连接,交换器,队列,发送,消费,确认,拒绝的使用

现在的 Java 应用大多会基于 Spring 框架开发,Spring 为集成 RabbitMQ 提供了封装好的依赖库 spring-rabbit,一些重复性的工作只需要通过 XML 一次性配置。Spring Boot 也为 RabbitMQ 提供了 start 依赖库,少许配置就可直接使用。

封装带来了使用的便捷性,但仍有必要了解其内部的一些运行流程和机制,方便在实际开发过程中灵活运用,也易于快速定位和解决问题。

本篇对 RabbitMQ 原生 Java 客户端 amqp-client 的运行流程进行讲解,大致涉及连接,交换器,队列,发送消息,消费消息,消息消费确认,拒绝消息等。

先看下上一篇博文 MQ系列(七):RabbitMQ连接,交换器,队列,发送,消费,确认,拒绝 中的 Java 集成 RabbitMQ 小节。

连接

服务间要通道,必须先建立连接,RabbitMQ 也一样,除了创建连接外,还要创建 信道,因为 RabbitMQ 的大多数操作都是在 信道 上操作的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("123456");
factory.setHost("192.168.220.129");
factory.setPort(5672);
factory.setVirtualHost("/");
// 可以使用 URI 的方式
//factory.setUri("amqp://username:password@server_ip:port/virtualHost");

// 创建连接
Connection connection = factory.newConnection();
// 创建信道,用于发送和接收消息
Channel channel = connection.createChannel();

连接通常都会通过连接工厂来创建,Connection 可以创建多个 Channel 实例,应为每个线程创建 Channel, Channel 不参在线程间共享。

如果在使用 Channel 时其已处于关闭状态,则会抛出异常:com.rabbitmq.client.ShutdownSignalException,同时也要捕获 IOException 或者 SocketException ,以防 Connection 意外关闭。

交换器

使用 RabbitMQ 必须先声明 交换器队列

声明交换器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 声明交换器
String exchangeName = "news-exchange";
//交换器类型枚举类 BuiltinExchangeType
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
// 自动创建队列名,从信道中获取队列名只对同一个 Connection 有效,同一个 Connection 不同 Channel 可共用
//String queueName = channel.queueDeclare().getQueue();

// 声明一个持久化,非排他,非自动删除的队列, 在应用中共享一个队列
String queueName = "news-queue";
channel.queueDeclare(queueName, true, false, false, null);
// 路由键
String routingKey = "news-routingKey";
// 队列与交换机绑定
channel.queueBind(queueName, exchangeName, routingKey);

交换器声明 exchangeDeclare 和 队列声明 queueDeclare 都有多种重载形式,只能传入的参数不同,根据需要调用。

  • exchangeDeclare:返回的是Exchange.DeclareOk,即待等服务器返回 Exchange.Declare-Ok命令

    1
    2
    3
    4
    5
    6
    Exchange.DeclareOk exchangeDeclare(String exchange,
    BuiltinExchangeType type,
    boolean durable,
    boolean autoDelete,
    boolean internal,
    Map<String, Object> arguments) throws IOException;
    • exchange:交换器名
    • type:交换器类型,BuiltinExchangeType 是个枚举类,还有方法可以传入字符串
    • durable:是否持久化
    • autoDelete:至少有一个队列和交换器与此交换器绑定,否则自动删除此交换器
    • internal:是否内置交换器。true表示内置,客户端无法直接发送消息到此交换器,只能通过交换器路由到交换器的方式
    • arguments:其他一些结构化参数
  • exchangeDeclareNoWait:没有返回值,比 exchangeDeclare 多设置了一个 AMQP 中 Exchange.Declare 命令的 nowait 参数,表示不需要服务器返回。

    1
    2
    3
    4
    5
    6
    void exchangeDeclareNoWait(String exchange,
    String type,
    boolean durable,
    boolean autoDelete,
    boolean internal,
    Map<String, Object> arguments) throws IOException;

    可能出现声明了交换器而服务器未创建完成,客户端就立即使用,就会出现异常,一般不常用。

  • exchangeDeclarePassive:增加了检测相应的交换器是否存在,存在则正常返回,否则抛出异常:404 channel exception,同时 channel 也会关闭。

    1
    Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

删除交换器

1
2
3
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
  • exchange:交换器名
  • ifUnused:为 true,交换器在没有被使用的情况下删除;为 false ,只要调用就被删除。

交换器之间绑定

在 AMQP 0-9-1 中,queue.bind方法将队列绑定到交换器,以便消息从交换器(源)流到队列(目标)

RabbitMQ 引入了exchange.bind方法,该方法将一个交换绑定到另一个交换,这样可以创建丰富的路由拓扑。

exchange.bind方法中的 sourcedestination 字段反映了消息的流动,从源交换器目标交换器exchange.bind 也像 queue.bind一样,可以在同一绑定端点之间创建多个不同的绑定。

当所有绑定被删除时,声明为自动删除的交换器将被删除,无论这些绑定是队列还是交换。注意,只有当源交换器被删除,启用了自动删除的绑定交换器才会被删除。

交换器之间绑定的方法

1
2
3
4
5
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;

Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

消息从 source 交换器转发到 destination 交换器。

交换器绑定到交换器示例

1
2
3
4
5
6
7
8
9
10
11
// 声明2个交换器
channel.exchangeDeclare("source", "direct", false, true, null);
channel.exchangeDeclare("destination", "fanout", false, true, null);
// 2个交换器绑定
channel.exchangeBind("destination", "source", "exKey");
// 声明队列
channel.queueDeclare(queueName, true, false, false, null);
// 队列绑定到目标交换器
channel.queueBind(queueName, "destination", "");
// 消息发送到源交换器
channel.basicPublish("source", "exKey", null, "msg content".getBytes());

交换器之间解绑

有绑定,就有解绑的方法,如下

1
2
3
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

队列

声明队列

  • queueDeclare:返回 Queue.DeclareOk

    1
    2
    3
    4
    // 默认创建一个由 RabbitMQ 命名的,排他的,自动删除的,非持久化的队列
    Queue.DeclareOk queueDeclare() throws IOException;
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
    Map<String, Object> arguments) throws IOException;
    • exclusive:为 true 时,设置队列排它,仅对首次声明它的连接有效,在连接断开时自动删除。排它队列基于连接可见,其他连接不允许建立同名的排他队列。
    • autoDelete:为 true 则,至少要有一个消费者连接到此队列,否则自动删除。
  • queueDeclareNoWait:调用传参同 queueDeclare 方法,无返回值,即不需要等待服务器创建对列完成的响应确认。

  • queueDeclarePassive:检则队列是否存在,存在则正常返回,不存在则抛出异常:404 channel exception。

删除队列

1
2
3
4
5
Queue.DeleteOk queueDelete(String queue) throws IOException;
// ifUnused 未使用时删除, ifEmpty 未空时删除
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
// 无返回值,不需等待服务器响应
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

清空队列

清空队列内容,而不是删除队列本身

1
Queue.PurgeOk queuePurge(String queue) throws IOException;

队列绑定交换器

队列必须且只能与交换器绑定,消息才能被交换器路由到匹配的队列

1
2
3
4
5
//queue-队列名,exchange-交换器名,routingKey-路由键,arguments-绑定的参数
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
// 无返回值,无需等待
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

队列与交换器解绑

1
2
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

发送消息

1
2
3
4
5
6
7
8
9
10
//exchange-交换器名,routingKey-路由键,props-消息属性,body-消息体
//BasicProperties:可设置contentType,contentEncoding,deliveryMode,priority等,
//BasicProperties:可使用 MessageProperties 枚举类,或使用 new AMQP.BasicProperties.Builder().属性设置
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
//为了可以更好地控制发送,可以使用 mandatory 这个参数,交换器找不到队列时将消息返还给生产者,false则直接丢弃
//mandatory 交换器将消息路由到队列时,发现不存在消费者时消息不入队列并返回给生产者
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;

消费消息

RabbitMQ 消费消息有 2 种模式:推(Push)模式 和 拉(Pull)模式。

  • 推模式:采用 Basic.Consumer 进行消费
  • 拉模式:采用 Basic.Get 进行消费

推模式

推模式使用订阅的方式来消费消息,RabbitMQ 会不断地推送消息给消费者(受 Basic.Qos命令限制(channel.basicQos(64)))。

一般通过继承 Consumer 接口或继承 DefaultConsumer 类来实现。

1
2
3
4
String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
  • consumerTag:消费者标签,用来区分多个消费者

  • noLocal:为 true 时,不能将同一个 Connection 中生产者发送的消息传送给这 Connection 中的消费者。

    简单理解就是,不能自己发自己消费。

  • exclusive:是否排它

  • deliverCallback:消息被传递时通知的回调接口

  • cancelCallback:消费者退出时通知的回调接口

  • shutdownSignalCallback:当消费者信道或基础连接关闭时通知的回调接口。

Consumer 接口的默认实现类 DefaultConsumer 类中处理消息的方法 handleDelivery是空方法,需要重写。还有一些其它空方法支持重写以实现定制化需求。

1
2
3
4
5
6
7
8
void handleConsumeOk(String consumerTag); // 1
// 显式取消消费者订阅时(Channel.basicCancel)调用
void handleCancelOk(String consumerTag) {} // 3
void handleCancel(String consumerTag) throws IOException {}
// 当 channel 或 connection 关闭时调用
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {}
void handleRecoverOk(String consumerTag) {}
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {} // 2

使用如下示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//关闭自动确认
boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey1 = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("routing key:" + routingKey1);
System.out.println("content type:" + contentType);
long deliveryTag = envelope.getDeliveryTag();
String msg = new String(body, "UTF-8");
System.out.println("消息内容:" + msg);
//手动确认
channel.basicAck(deliveryTag, false);
}
});

拉模式

拉模式的使用:channel.basicGet(queue, autoAck),返回 GetResponse ,如果要获取单条信息,建议使用此 拉模式。注意:Basic.Get 放在一个循环里代替 Basic.Consumer,这样做会严重影响 RabbitMQ 的性能。

1
GetResponse basicGet(String queue, boolean autoAck) throws IOException;

要实现高吞吐量,消费者理应使用方法。

消费确认

RabbitMQ 提供了消息确认机制,参考 MQ系列(五):RabbitMQ 之 AMQP 0-9-1 模型和概念说明 > 消息确认

消费者通过 Basic.ConsumerBasic.Get 命令消费消息(下面方式),可以指定 autoAck 参数,默认是 true 自动确认,RabbitMQ 会自动把发送出去的消息置为确认,然后从队列(磁盘)中删除,不管消费者是否真正地消费到这些消息。

1
2
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
GetResponse basicGet(String queue, boolean autoAck) throws IOException;

如果显式设置为 false,RabbitMQ 会一直等待,直到消费者显示调用了 Basic.Ack命令为止,方法调用如下:

1
2
# 即一般处理消息的末尾调用 channel.basicAck(deliveryTag, false) 方法
void basicAck(long deliveryTag, boolean multiple) throws IOException;

如果 RabbitMQ 一直没有收到预期的消费确认,并且此消费者连接已经断开,则会将消息重新入队,等待投递给下一个消费者。RabbitMQ 不会为未确认的消息设置过期时间,判断是否重新投递给其它消费者的唯一依据是该消费者连接是否已经断开。这样设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。

消费拒绝

拒绝一条

RabbitMQ 支持明确地拒绝当前消息,使用 Basic.Reject命令,一次只能拒绝一条消息。方法调用如下:

1
2
3
4
5
/**
* deliveryTag: 可理解为消息ID,是一个64位的长整型值
* requeue: true 重新排队发送其它消费者;false 立即把消息从队列中移除(丢弃或入死信队列),不会发给其它消费者
*/
void basicReject(long deliveryTag, boolean requeue) throws IOException;

拒绝多条

如果要拒绝多条消息(批量拒绝),则可以使用 Basic.Nack命令。方法调用如下:

1
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  • multiple:是否拒绝多条。为false时,表示拒绝编号为 deliveryTag 的消息,效果于 basicReject 一样;为 true 时,则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息。

basicRejectbasicNack 方法中的 requeue 设置为 false,可以启用 死信队列(dead-lettered),用于检测被拒绝或者未送达的原因。

消息重发

RabbitMQ 支持重新发送未被确认的消息,使用 Basic.Recover 命令,让未确认的重入队列。方法调用 :channel.basicRecover()

1
2
Basic.RecoverOk basicRecover() throws IOException;
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
  • requeue:不传默认为 true,未被确认的消息重入加入队列,同一条消息可能被分配置给与之前不同的消费者;为false时,同一条消息会被分配给与之前相同的消费者。

关闭连接

应用在使用完后,需关闭连接,释放资源。方法调用如下:

1
2
3
//关闭信道和连接
channel.close();
connection.close();
1
2
3
4
// 关闭对象
void close() throws IOException, TimeoutException;
// 显式指定关闭编码和关闭原因
void close(int closeCode, String closeMessage) throws IOException, TimeoutException;

在 Connection 关闭时,Channel 也会被自动关闭(Channel 继承了 AutoCloseable)。

AMQP 协义中的 Connection 和 Channel 采用相同的方式来管理网络失败,内部错误和显式地关闭连接。

Connection 与 Channel 的状态如下:

  • Open:开启状态,表示对象可用
  • Closing:正在关闭状态。当前对象被显式调用关闭方法(shutdown),就产生了一个关闭请求让其内部对象进行相应操作,并等待这些关闭操作的完成。
  • Closed:已经关闭状态。也是最终状态。

ConnectionChannel 都继承了 ShutdownNotifier 关闭通知接口,支持添加关闭监听器。

ShutdownNotifier 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.rabbitmq.client;

public interface ShutdownNotifier {
/**
* 添加关闭监听器
*/
void addShutdownListener(ShutdownListener listener);

/**
* 移除关闭监听器
*/
void removeShutdownListener(ShutdownListener listener);

/**
* 获取关闭原因
*/
ShutdownSignalException getCloseReason();

/**
* Protected API - 通知监听器 ShutdownListener
*/
void notifyListeners();

/**
* 检测对象是否处于开启状态。 如果正在关闭, 返回false
* 注意:状态可以在调用后更改的
* 不建议使用,内部依赖 shutdownCause 使用了同步锁,可能出现竞争的情况
*/
boolean isOpen();
}

ShutdownListener 监听器:就一个方法

1
2
3
4
@FunctionalInterface
public interface ShutdownListener extends EventListener {
void shutdownCompleted(ShutdownSignalException cause);
}

ShutdownSignalException 提供一些方法来分析关闭原因

1
2
3
4
5
6
7
8
// true 表示 connection 错误,false 表示 channel 错误
public boolean isHardError() { return _hardError; }
// true 表示异常由显示操作引起;false 表示源于 Broker 或检测到非故意失败而导致
public boolean isInitiatedByApplication() { return _initiatedByApplication; }
// 获取关闭原因
public Method getReason() { return _reason; }
// 对触发信号的Connection或Channel对象的引用
public Object getReference() { return _ref; }

相关参考

  1. 官方:RabbitMQ Connections

MQ系列(七):RabbitMQ连接,交换器,队列,发送,消费,确认,拒绝的使用

http://blog.gxitsky.com/2020/02/19/MQ-07-RabbitMQ-Prd-C-X-Q-ack-reject/

作者

光星

发布于

2020-02-19

更新于

2022-06-07

许可协议

评论