MQ系列(五):RabbitMQ 之 AMQP 0-9-1 模型和概念说明

要使用 RabbitMQ,需先理解其基本概念,而 RabbitMQ 原生是实现 AMQP 0-9-1 版本协议的,RabbitMQ 基本概念来自于 AMQP 0-9-1 协议中的定义,所以就有必要了解协义中的相关概念。

AMQP 0-9-1(Advanced Message Queuing Protocol:高级消息队列协议)是一种消息传递协议,它两个应用可以通过中间件代理通信。RabbitMQ 实现该版本的协议,下面描述的概念和述语也是基本此版本协议。

RabbitMQ 官方文档:AMQP 0-9-1 模型说明及相关概念快速参考:AMQP 0-9-1 Quick Reference

简要模型

AMQP 0-9-1模型具有以下世界观:消息被发布到交换器,而交换器通常被比作邮局或邮箱。然后,交换器使用绑定(规则)将消息副本分发到队列;然后, Broker 要么将消息传递给订阅了队列的消费者,要么消费者按需从队列中 获取 / 拉取(fetch/pull) 消息。

AMQP Model Example

发布消息时,发布者可以指定各种消息属性(元数据)。这些元数据中的一些可能由代理使用,但是,它的其余部分对代理来说是完全不透明的,并且仅由接收消息的应用程序使用。

网络不可靠,应用程序可能无法处理消息,因此 AMQP 0-9-1 模型具有消息确认的概念:当消息传递给消费者时,消费者自动或手动立即通知代理。当消费消息确认时,代理只有在收到该消息(或消息组)的通知时才会从队列中完全删除消息。

在某些情况下,例如,当无法路由消息时,消息可能会返回给发布者,被丢弃,或者(如果代理实施了扩展)会将消息放入所谓的死信队列中。 发布者通过使用某些参数发布消息来选择如何处理这种情况。队列,交换和绑定统称为AMQP实体

交换器

Exchange 是发送消息的实例。交换器接收消息,并将其路由到零个或多个队列中。使用的路由算法取决于交换类型和称为绑定的规则。 AMQP 0-9-1 Broker 提供四种交换类型:

Exchange Type Default pre-declared names
Direct exchange (Empty string) and amq.direct
Fanout exchange amq.fanout
Topic exchange amq.topic
Headers exchange amq.match (and amq.headers in RabbitMQ)

Exchange 还声明了许多属性,其中最重要的是:

  • Name:名称。
  • Durability:是否具有持久性。
  • Auto-delete:当最后一个队列被解绑定后,exchange 会被删除。
  • Arguments: 可选项,由插件和 broker 特定功能使用。

交换器可以是 持久性的 或 短暂性 的。 持久性 交换器可在 broker 重新启动后继续存在,而 短暂性 交换器则不能(当 broker 重新上线时必须重新声明)。 并非所有场景要求交换器具有持久性。

Direct Exchange

Direct Exchange(直接交换器)基于消息路由键将消息传递到队列。直接交换器 是单播消息路由的理想选择(也可以用于多播路由)。工原理如下:

  • 队列通过绑定键 K 绑定到交换器。(绑定键也是路由键,这里称为绑定键以做区分)
  • 当带有路由键 R 的新消息到达直接交换器时,如果 K = R,则交换器将消息路由到队列。

直接交换器是完全匹配,单播模式。通常用于以轮循方式在同一个程序的多个实例之间分配任务。这样做的重点是要理解,在 AMQP 0-9-1 中,消息是在消费者之间进行负载均衡,而不是在队列之间。

默认交换器

RabbitMQ 默认交换器 就是 直接交换器,是由 broker 预先声明的不带名称(空字符串)的直接交换器。

例如,当声明名称为 search-indexing-online 的队列时,AMQP 0-9-1 broker 将使用 search-indexing-online 作为路由键将其绑定到默认交换器。 因此,使用路由键 search-indexing-online 发布到默认交换机的消息将被路由到search-indexing-online队列。 使得默认交换器看起来有可能将消息直接传递到队列。

直接交换器示意图:

直接交换器示意图

Fanout Exchange

Fanout Exchange(扇出交换器)将消息路由到与其绑定的所有队列,并且忽略路由键。

如果有 N 个队列绑定到扇出交换器,当有新消息发布到该交换器时,消息副本将被传递给所有队列(N个)。扇出交换器是广播消息路由的理想选择。

因为扇出交换器将消息的副本传递到绑定到它的每个队列,所以它适用于以下使用场景:

  • 大型多人在线(MMO)游戏可以将其用于排行榜更新或其他游戏全局事件。
  • 体育新闻网站可以使用扇出交换器来近乎实时地向移动客户端分发得分更新。
  • 分布式系统可以广播各种状态和配置更新。
  • 群组聊天可以使用扇出交换器在参与者之间分发消息(尽管 AMQP 没有内置的在线状态概念,因此 XMPP 可能是更好的选择)。

扇出交换器示意图:

扇出交换器示意图

Topic Exchange

Topic Exchange(主题交换器)根据消息路由键和某种模式(pattern:用于将队列绑定到交换)之间的匹配,将消息路由到一个或多个队列。

主题交换器类型通常用于实现各种 发布 / 订阅模式变体。主题交换器通常用于消息的多播路由。主题交换器有非常广泛的使用场景。当问题涉及多个 消费者/应用程序,这些消费者/应用程序选择他们希望接收的消息类型时,应考虑使用主题交换器。

使用场景示例:

  • 分发与特定地理位置有关的数据,例如 销售点。
  • 由多个工作者完成的后台任务处理,每个工作者可以处理特定的任务集。
  • 股票价格更新(以及其它金融/财务数据的更新)。
  • 涉及分类或标记的新闻的更新(例如,针对特定运动或团队)。
  • 云中的各种服务的编排。
  • 分布式体系结构/特定于(各种)操作系统的软件构建或打包,其中每个构建器只能处理一个体系结构或操作系统。

Headers Exchange

Headers Exchange(头交换器)旨在用于在多个属性上进行路由,这些属性比路由键更容易表示为消息头。

头交换器忽略了路由键属性,而是使用头属性(headers)替代路由的属性,如果头的值等于绑定时指定的值,则认为消息匹配。可以使用多个头属性进行匹配,将队列绑定到头交换。

此情况下,broker 需要应用开发人员提供另一条消息,即它是否应考虑具有任何匹配头的消息,或者考虑具有所有匹配头的消息。这就是 x-match 绑定参数的作用。 当x-match参数设置为 any 时,仅一个匹配的标头值就足够了。 或者,将 x-match 设置为 all 要求所有值都必须匹配。

头交换器可以看作是 【direct exchanges on steroids】。由于它们基于头值进行路由,因此可以将它们用作直接交换器,而路由键不必字符串。例如,可以是整数或哈希(字典)。

注意:以字符串 x- 开头的头属性将不用于评估匹配项。头交换器性能较直接交换器差很多,目前几乎不用了。

队列

AMQP 0-9-1 模型中的 Queues(队列)与其他消息和任务队列系统中的队列非常相似:它们存储被应用程序消费的消息。队列与交换器共享一些属性,也有一些其他属性:

  • Name:名称
  • Durable:持久性(broker 重启后是队列否存在)
  • Exclusive:独占性(仅由一个连接使用,并且该连接关闭时队列将被删除)
  • Auto-delete:自动删除(当最后一个消费者退订时,具有至少一个消费者的队列将被删除)
  • Arguments:(可选,由插件和特定于 broker 的功能使用,例如消息 TTL,队列长度限制等)

队列必须先声明,才能使用。声明队列将导致它被创建(如果不存在),如果队列已存在且其属性与声明中的相同,则该声明无效;如果队列已存在但队列属性与声明中的属性不同时,将引发错误编码为 406(PRECONDITION_FAILED) 的信道级异常。

队列名称

Queue Names(队列名称):应用程序可以选择队列的名称,也可以要求代理为其创建队列名。

队列名称最多可包含 255 个字节的 UTF-8 字符。

AMQP 0-9-1 broker 可以生成代表应用程序的唯一的队列名称。 要使用此功能,需传递一个空字符串作为队列名称参数,生成的名称将随队列声明响应一起返回给客户端。

amq. 开头的的队列名为保留关键字,供 broker 内部使用。若违返此规则将导致信道级异常,其响应错误编码为 403(ACCESS_REFUSED)。

队列持久化

Queue Durability(队列持久化):持久队列将持久化到磁盘,因此在 broker 重启后仍然存在,非持久的队列称为 临时队列。 并非所有场景都要求队列是持久的。

队列的持久性不会使路由到该队列的消息持久化。 如果关闭 broker 然后将其恢复,则在 broker 启动期间将重新声明持久队列,但是,仅持久消息将被恢复。

绑定

Bindings(绑定)是交换器用于将消息路由到队列的规则。为了指示交换器 E 将消息路由到队列 Q,必须将 Q 绑定到 E。

绑定可能具有某些交换器类型使用的可选路由键属性。路由键的目的是选择发布到交换机的某些消息以路由到绑定队列。换句话说,路由键的作用类似于过滤器。

做个类比:

  • Queue 就像你在纽约市的目的地。
  • Exchange 就像肯尼迪国际机场。
  • Bindings 是从肯尼迪国际机场到目的地的路线。可以有零种或多种方式达到目的地。

有了 绑定 这间接层的支持,就可以使用直接发布到队列来实现不可能或很难实现的路由方案,而且消除了应用开发人员必须做的某些重复工作。

如果消息无法路由到任何队列(例如,由于没有发布该消息的绑定),则该消息将被丢弃或返回给发布者,具体取决于发布者设置的消息属性。

消费者

消息存储在队列中是没有用的,直到应用程序消费了它们。在 AMQP 0-9-1 模型中,有两种方式来消费消息:

  • 将消息推送给客户端应用(push API)。
  • 客户端应用根据需要拉取消息(pull API)。

使用推送方式,消费者必须表明对来自特定队列的消息的消费兴趣,当这样做时说明注册了一个消费者,或者说,订阅了一个队列。每个队列可能有一个以上的消费者,或者注册一个独占(排他)的消费者。

每个消费者(订阅)都有一个称为消费者标签的标识符(消费者 ID),可用于退订消息。消费者标签只是字符串。

消息确认

Message Acknowledgements(消息确认):Consumer applications(消费者应用)有时可能无法处理消息,有时甚至会崩溃,网络不可靠也可能引发问题。也引出了一个问题:broker 何时将消息从队列中删除?,因此通常需要进行某种确认。

有时仅需要确认已收到消息,有时确认意味着消息已被消费者验证和处理。

AMQP 0-9-1 规范提供了称为消息确认(或称为 ack)的内置功能,使消费者可以对此进行控制,用于确认消息的传递 和/或 处理。有两种确认模式acknowledgement modes):

  • 自动确认:在 broker 发送消息到应用程序之后(使用 basic.deliverbasic.get-ok 方法)。
  • 显式确认:应用程序返回确认之后(使用 basic.ack 方法)。

显示确认(或称为手动确认),需要应用程序选择保时返回确认消息。可能在收到消息之后,或者在将消息持久化到数据存府之前,或在完全处理消息之后。

如果消费者应用崩溃(关闭连接时AMQP代理会注意到此情况),如果 AMQP broker 预期要收到的消息确认而未收到,则会对该消息重新排队(并可能立即重传给另一个消费者,如果存在的话);或者,如果当时没有其它可用的消费者的话,则 broker 将一直等待,直到至少一个消费者注册到同一队列之后再尝试重新发送消息。

拒绝消息

Rejecting Messages(拒绝消息) :当消费者应用程序接收到消息时,该消息的处理可能会成功,也可能不成功。应用程序可以返回拒绝消息 给 broker 以示表消息处理失败(或此时无法完成)。

当拒绝消息时,可以要求 broker 丢弃或重新排队。当队列中只有一个消费者时,要确保不会因反复拒绝和重新排队导致创建无限的消息传递循环,需要有重试机制的次数限制。

批量拒绝

Negative Acknowledgements(否定确认/批量拒绝):使用 basic.reject 方法拒绝消息有一个限制,不能一次拒绝多条消息。 RabbitMQ 为批量拒绝消息提供了解决方案。 RabbitMQ提供了AMQP 0-9-1扩展,称为 否定确认(批量拒绝确认) 或 否定

更多信息,请参阅 确认Confirmations)和 basic.nack 扩展basic.nack extension )指南。

预取消息

Prefetching Messages(预取消息):对于多个消费者共享一个队列的情况,在消费者回传下一个确认之前,可以指定每个消费者一次可接收消息的数量是非常用的(即在消费者返回 ACK 之前,broker 仍会向消费者发送消息的数量,这部分消息是预取的,缓存在客户端本地待处理的)。

这可以用作一种简单的负载均衡技术,或者在消息倾向于批量发布时提高吞吐量。

注意:RabbitMQ 只支持通道级预取计数,而不支持基于连接或大小的预取。

消息属性与负载

AMQP 0-9-1 模型中的消息具有属性的。 有些很常规的属性,AMQP 0-9-1 规范对它他进行了定义,应用开发人员不必考虑确切的属性名称。 一些例子是:

  • Content type:内容类型
  • Content encoding:内容编码
  • Routing key:路由键
  • Delivery mode (persistent or not):传输模式(持久化 或 非持久化)
  • Message priority:消息优先级
  • Message publishing timestamp:消息发布的时间戳
  • Expiration period:有效期(到期时间)
  • Publisher application id:发布者应用 ID

一些属性被 AMQP broker 使用,但大多数属性由消费者端解析。一些属性是可选的,称为消息头(heards)。它们类似于 HTTP 中的 X-Header,消息发布时设置消息属性。

消息还包含有效负载(携带的数据),AMQP broker 将其视为不透明的字节数组,broker 不会检查或修改负载。消息还可能只包含 属性 而没有负载。

通常使用 JSON,Thrift,Protocol Buffers 和 MessagePack 等序列化格式来对结构化的数据进行序列化,以便将其发布为消息有效负载。通常使用 content-typecontent-encoding 字段来传递有效负载的数据格式。

消息可能被发布为持久消息,这使 broker 将消息持久化到磁盘。 如果重新启动服务器,系统将确保接收的持久消息不会丢失。
简单地将消息发布到持久性交换器或路由到持久性队列并不能使消息持久化:这完全取决于消息本身的持久性模式。 将消息持久发布会影响性能(就像数据存储一样,持久性会以一定的性能成本为代价)。

可以从 Publishers guide(发布者指南)中了解更多。

方法

AMQP 0-9-1 由很多方法构造,方法是一系列操作(如 HTTP 方法),与面向对象编程语言中的方法没有任何共同之处。

AMQP 0-9-1 中的协议方法分为几类(classes)。 类(Classes)只是 AMQP 方法的逻辑分组。 AMQP 0-9-1参考包含所有AMQP方法的完整详细信息。

让我们看一下交换类(exchange class),这是一组与交换操作有关的方法。 它包括以下操作:

  • exchange.declare
  • exchange.declare-ok
  • exchange.delete
  • exchange.delete-ok

上面的操作形成了逻辑对:exchange.declareexchange.declare-okexchange.deleteexchange.delete-ok。这些操作是 请求(requests (客户端发送))和 响应(response(由代理响应上述 请求 ))

示例参考

  1. 客户端要求 Broker 使用 exchange.declare方法声明新的交换器,见下图:

    exchange.declare

  2. 如上图所示,exchange.declare带有几个参数。 它们使客户端可以指定交换器名称,类型,持久性标记等。

    如果操作成功,则 Broker 将使用 exchange.declare-ok方法进行响应,见下图:

    exchange.declare-ok

    exchange.declare-ok 除了通道号外不带任何参数。

  3. 对于AMQP 0-9-1 队列方法类上的另一个方法对,事件的顺序非常相似:queue.declarequeue.declare-ok

    queue.declare

    queue.declare-ok

注意:并非所有的 AMQP 0-9-1 方法都有对应的方法。 有些(basic.publish是使用最广泛的一种)没有相应的响应(response)方法,而另一些(例如 basic.get)则具有一种以上响应(response)方法。

连接

Connections(连接):AMQP 0-9-1 连接通常是长连接。 AMQP 0-9-1 是使用 TCP 进行可靠传输的应用程序级别协议。 连接使用身份认证,并且可以使用TLS进行保护。

当不再需要将应用程序连接到服务器时,它应优雅地关闭其 AMQP 0-9-1 连接,而不是突然关闭基础 TCP 连接。

信道

Channels(信道/通道):某些应用程序需要多个与 broker 的连接。 但是,不希望同时创建多个 TCP 连接,因为这样做会消耗系统资源,并且使配置防火墙更加困难。 AMQP 0-9-1 连接与可以被认为是 【共享单个 TCP 连接的轻量级连接】的 信道 复用。

客户端执行的每个协议操作都是发生在通道上的。 不同通道的各自通信是完全独立分开的。因此每种协议方法还带有一个通道ID(又称通道号),是一个整数,是 broker 和 客户 端用来确定该方法用于哪个通道。

通道仅存在于连接的上下文中,而不是单独存在的。 关闭连接后,其上的所有通道也将关闭。

对于使用多线程 / 进程进行处理的应用程序,通常做法是为每个线程/进程开启一个新通道,而不在它们之间共享通道。

虚拟机

Virtual Hosts(vhost:虚拟机):为了使单个 broker 可以承载多个隔离的环境(用户组,交换,队列等),AMQP 0-9-1支持了虚拟主机(vhost)的概念。

它们类似于许多流行的 Web 服务器使用的虚拟主机,并提供 AMQP 实体所在的完全隔离的环境。 协议客户端指定在连接协商期间要使用的虚拟主机。

可扩展

AMQP is Extensible(AMQP 是可扩展的),AMQP 0-9-1具有几个扩展点:

  • 自定义交换器使开发人员可以实现路由方案,这些路由方案提供已有的交换器类型不能很好满足的场景。例如,基于地理数据的路由。
  • 交换器和队列的声明可以包含 broker 可以使用的其他属性。例如,RabbitMQ 中的每个队列消息 TTL 是通过这种方式实现的。
  • 协议的特定于 Broker 的扩展。例如,参考 RabbitMQ 实现的扩展
  • 可以引入新的 AMQP 0-9-1 方法类
  • 可以使用其它插件来扩展 broker。例如,RabbitMQ 的 WEB管理端和 HTTP API 是插件实现的。

这些功能使 AMQP 0-9-1 模型更加灵活,可适用于非常广泛的问题。

客户端生态

有许多基于不同流行编程语言和平台的 AMQP 0-9-1 客户端。 其中一些紧密遵循AMQP术语,仅提供AMQP方法的实现。 其他一些提供了额外的功能,便捷方法和抽象。 有些客户端是异步的(非阻塞),有些是同步的(阻塞),有些则支持这两种模型。 一些客户端支持特定于供应商的扩展(例如,特定于 RabbitMQ 的扩展)。

因为 AMQP 的主要目标之一是互操作性,所以对于开发人员来说,理解协议操作而不是将自己局限于特定客户端库的术语是一个好主意。 与使用不同库的开发人员进行通信的这种方式将大大简化。

Java and Spring

Java

Spring Framework

相关参考

  1. AMQP 0-9-1 Model Explained(模型说明)
  2. AMQP 0-9-1 Complete Reference Guide(完整参考指南)

MQ系列(五):RabbitMQ 之 AMQP 0-9-1 模型和概念说明

http://blog.gxitsky.com/2020/02/16/MQ-05-RabbitMQ-amqp-0-9-1/

作者

光星

发布于

2020-02-16

更新于

2022-06-07

许可协议

评论