背景

在学习EventBus源码的时候,基于RabbitMQ持久化实现,顺带学习了下RabbitMQ。

  • RabbitMQ 是一个由erlang语言开发的遵循AMQP(Advanved Message Queue)协议的开源实现。

  • AMQP协议 是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

应用场景

异步处理

  • 上游不关心多下游执行结果,上游执行完发送到MQ,多下游订阅MQ。

应用解耦

  • 数据驱动的任务依赖,多个任务需要轮流执行,轮流订阅上一个任务。

流量削峰

  • 异步返回执行时间长

系统架构

核心概念

RabbitMQ结构

RabbitMQ结构

名词释义

Broker
  • 消息队列服务器,接受客户端的连接,实现AMQP实体服务
Connection
  • 连接,应用程序与Broker的网络连接
ConnectionFactory
  • Connection的制造工厂
Exchange
  • 交换机,接收消息,根据路由键转发消息到绑定的队列
Exchange Types
  • 交换机类型,RabbitMQ常用的Exchange Type有 fanoutdirecttopicheaders 这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述)
Queue
  • 也称为Message Queue,消息队列,保存消息并将它们转发给消费者
Channel
  • 网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
Message
  • 消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容。
Binding
  • 绑定,RabbitMQ通过绑定(Binding)将交换器(Exchange)和消息队列(Queue)关联起来,绑定(Binding)中可以包含(Binding Key),通过绑定键(Binding Key)就知道正确地将消息路由到指定的消息队列(Queue
Binding Key
  • 绑定键,它表示交换机(Exchange)和消息队列(Queue)是通过绑定键(Binding Key)进行联系,这个关系是固定的,初始化的时候,就会建立该队列
Routing Key
  • 路由键,生产者在将消息发送给交换机(Exchange)的时候,一般会指定一个路由键(Routing Key),来指定这个消息的路由规则。这个路由键(Routing Key)需要与交换机类型(Exchange Type)及绑定键(Binding Key)联合使用才能生,我们的生产者只需要通过指定路由键(Routing Key)来决定消息流向哪里。
Message acknowledgment
  • 消息回执,在实际应用中,可能会发生消费者收到消息队列(Queue)中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从消息队列(Queue)中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。
  • 这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑
Message durability
  • 可以将Queue与Message都设置为可持久化的(durable),在RabbitMQ服务重启的情况下,也不会丢失消息

RabbitMQ内部结构

RabbitMQ内部结构

交换机(Exchange)

交换机类型(Exchange Types)

  • fanout
    fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

    exchange-fanout

    上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。

  • direct
    direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

    exchange-direct

    以上图的配置为例,我们以routingKey=“error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routing Key=“info”或routing Key=“warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routing Key发送消息,则消息不会路由到这两个Queue中。

  • topic
    前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

    routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”,可以配置多个单词,最大上限255字节

    binding key与routing key一样也是句点号“. ”分隔的字符串。

    binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

    exchange-topic

    以上图中的配置为例,routing Key=“quick.orange.rabbit”的消息会同时路由到Q1与Q2,routing Key=“lazy.orange.elephant”的消息会路由到Q1与Q2,routing Key=“lazy.brown.fox”的消息会路由到Q2,routing Key=“lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routing Key与Q2的两个binding Key都匹配);routingKey=“quick.brown.fox”、routingKey=“orange”、routing Key=“quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何binding Key。

  • headers
    headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
    在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

消息队列(Message Queue)

  1. 普通队列

    普通队列

    这是最简单的使用方式,生产者负责发消息,消费者负责接收消息并处理,不需要指定交换机,直接发送到队列里

  2. 工作队列

    工作队列

    这种模式出现了两个消费者,为了保证消费者之间的负载均衡和同步,需要在消息队列之间加上同步功能,工作队列(又名任务队列)背后的主要思想是避免立即执行资源密集型任务,必须等待它完成。。相反,我们计划稍后完成任务。我们将任务封装为消息并将其发送到队列中。后台运行的一个工作进程将弹出任务并最终执行该任务。当你运行许多工人(消费者)时,任务将在他们之间分担。

    多个消费者同时监听一个队列,有两种分发模式:

    • 轮询分发
      • 每个消费者按照顺序分发,平均分配所有消息,不管消费者是否消费完成。
    • 公平分发
      • 每个消费者必须空闲了才会分发,能者多劳,避免消息堆积。必须关闭自动应答ack,改成手动应答。
  3. 订阅者模式

    订阅者模式

    一个生产者,多个消费者,消费者都有自己的队列,消息先发送到交换机exchange,每个队列都绑定到交换机。实现一个消息被多个消费者消费。
    队列如果不绑定到交换机,消息丢失,交换机没有存储能力。

  4. 路由器模式

    路由器模式

  5. 主题模式

    主题模式

  6. RPC模式

  • MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。

  • 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。

    路由器模式

  • RabbitMQ中实现RPC的机制是:

    • 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)。
    • 服务器端收到消息并处理;服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性;客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。

总结

RabbitMQ支持集群化、高可用部署架构、消息高可靠支持,对复杂系统的解耦以及复杂链路的路由调用和不弱的吞吐量,足以应付大部分业务需求了。

  • 比较RabbitMQ和Kafka
    Kafka定位在日志等方面,而RabbitMQ就是按照AMQP来设计作为消息队列,如果单纯的只是作为消息队列来使用,RabbitMQ足够胜任了。虽然RabbitMQ吞吐量差于Kafka,但是Kafka没有RabbitMQ对业务支持得好。