RabbitMQ中的AMQP

AMQP,Advanced Message Queuing Protocol,高级消息队列协议.

它是一个提供统一消息服务的应用层二进制协议,为面向消息的中间件设计。它是基于TCP/IP协议构造的协议

基于此协议的客户端与消息中间件可传递消息,可跨平台传输

img

为什么要用RabbitMQ?有什么好处?场景是什么?

结合黑马点评来说。

异步处理:对于一些不需要立即生效的操作,可以拆分出来,异步执行,使用消息队列实现,性能高。

流量消峰:在高并发场景下,消息队列可以作为一个缓冲区,暂时存储大量请求,避免系统因瞬时高流量而崩溃。比如秒杀中的订单。

应用解耦:消息队列允许生产者和消费者之间通过消息进行通信,而不需要直接调用对方的接口。

RabbitMQ 中有哪些重要的角色?

  • 生产者:消息的创建者,负责创建和推送数据到消息服务器;
  • 消费者:消息的接收方,用于处理数据和确认消息;
  • 代理:就是 RabbitMQ 消息队列本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。

RabbitMQ对应的架构如图: image.png 其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

还有其他什么消息队列?为什么使用RabbitMQ,优缺点?

还有RocketMQ,Kafka

几种常见MQ的对比:

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

RabbitMQ可靠性比较高:生产者确认、消费者确认、消息持久化。

灵活的路由机制:支持多种交换机类型,可以满足复杂的消息路由需求。

RabbitMQ 拥有友好的管理界面和丰富的文档资源,易于上手和维护。

缺点:

使用erlang实现,不利于二次开发和维护;

在高并发场景下,单机性能可能不如 Kafka ,

交换机的类型

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • headers:不依赖于routing key与binding key的匹配规则,而是根据发送消息内容中的headers属性进行匹配;除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多(目前几乎用不到了)

消息基于什么传输?为什么使用信道?

RabbitMQ 使用信道的方式来传输数据。

  • 由于TCP连接的创建和销毁开销较大。
  • TCP并发数受系统资源限制,会造成性能瓶颈。
  • 信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制

消息属性和有效载荷(消息主体)

Content type: 内容类型

Content encoding: 内容编码

Routing Key: 路由键

Delivery mode: 投递方式(持久化 or 非持久化)

Message priority: 消息优先权

Message publishing timestamp: 消息发布的时间戳

Expiration period: 消息的有效期

Publisher application id: 发布应用的id

怎么设置消息的过期时间?

  • 在生产端发送消息时,给消息设置过期时间,单位毫秒(ms)
// 设置"tyson"消息时间为3000毫秒 
Message msg = new Message("tyson".getBytes(), mp); 
msg.getMessageProperties().setExpiration("3000");
  • 在消息队列创建队列时,指定队列的TTL,从消息入队列开始计算,超过该时间的消息将会被移除。

如何确保消息不丢失?

生产者连接重试

就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数。

生产者确认

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大多都由编程错误导致(RoutingKey填错了,名称写错了,忘记bind) image.png

默认两种机制都是关闭状态,需要通过配置文件来开启。

Publisher Return:当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

Publisher Confirm

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

MQ的可靠性(持久化)

RabbitMQ 提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ,消息也不会丢失。

为了保证数据的可靠性,必须配置数据持久化,包括:交换机持久化、队列持久化、消息持久化

当发布一条消息到交换机上时,RabbitMQ 会先把消息写入持久化日志文件,然后才向生产者发送响应。

一旦消费者从持久队列中消费了一条持久化消息并且做了确认,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集,从而移除这条消息。

如果持久化消息在被消费之前RabbitMQ重启,服务器会自动重建交换机和队列(以及绑定),并重新加载持久化日志中的消息到相应的队列或者交换机上,保证消息不会丢失。

消费者确认机制

即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • **none**:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • **manual**:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • **auto**:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;

消费者重试

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。

如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:

Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

但是这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

总结

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

如何解决消息积压的问题

要么是发送变快了。要么是消费变慢了。

  • 可以通过 扩容消费端的实例数来提升总体的消费能力
  • 如果短时间内没有足够的服务器资源进行扩容,那么就将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。

如何处理消息堆积情况?几千万条数据在MQ里积压了七八个小时

一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:

  1. 然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue
  2. 临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。

  3. 等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息

由于消息积压导致过期被清理了怎么办

不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。

等过了高峰期以后,这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来。

假设1万个订单积压在mq里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次。

消费端怎么进行限流?

当 RabbitMQ 服务器积压大量消息时,队列里的消息会大量涌入消费端,可能导致消费端服务器崩溃。这种情况下需要对消费端限流。

Spring RabbitMQ 提供参数 prefetch 参数指定了消费者在处理完当前消息之前,可以从队列中预取的消息数量。

  • 例如,如果 prefetch=10,消费者会一次性从队列中拉取 10 条消息到本地缓存,然后逐条处理。
  • 如果消费者同时处理的消息到达最大值的时候,则该消费者会阻塞,不会消费新的消息,直到有消息 ack 才会消费新的消息。

默认是不设置prefetch,RabbitMQ 默认会尽可能多地将消息推送给消费者(不是轮寻),可能会导致以下问题:如果某个消费者处理速度较慢,它可能会堆积大量未处理的消息,而其他消费者可能处于空闲状态。

重复消费(业务幂等性)

在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。

查询和删除是幂等的,但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

唯一消息ID解决:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

业务判断:

例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。

因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

比如修改订单状态的时候,只有原来是未支付,我才修改为已支付。

如何保证消息的有序性?

RabbitMQ 本身并不完全保证消息的全局有序性。

只能通过合理的配置和设计来实现消息的有序性。

单队列单消费者:如果只有一个队列和一个消费者,RabbitMQ 会严格按照消息进入队列的顺序将消息传递给消费者。

重试机制:比如有一个微博业务场景的操作,发微博、写评论、删除微博,这三个异步操作,如果一个消费者先执行了写评论的操作,但是这时微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行发微博的操作后,再执行,就可以成功。

延迟消息

例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

法1:死信交换机+TTL

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。

而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因TTL(有效期)到期的消息

如何实现?

ttl.queue根本没有消费者,最终消息会过期,然后被放到死信交换机,到达延迟队列。

注意这里的ttl.fanout不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct才能正确路由消息。

image-20250302190821924

法2:DelayExchange插件

  • 声明延迟交换机
  • 发送延迟消息时:必须通过x-delay属性设定延迟时间:

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。

如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。

因此,不建议设置延迟时间过长的延迟消息

假如订单超时支付时间为30分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为30分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。 但是大多数情况下用户支付都会在1分钟内完成,我们发送的消息却要在MQ中停留30分钟,额外消耗了MQ的资源。因此,我们最好多检测几次订单支付状态,而不是在最后第30分钟才检测。 例如:我们在用户下单后的第10秒、20秒、30秒、45秒、60秒、1分30秒、2分、...30分分别设置延迟消息,如果监视器提前发现订单已经支付,则后续的检测取消即可。这样就可以有效避免对MQ资源的浪费了。

你了解LazeQueue吗?

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。

此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,在3.12版本之后,LazyQueue已经成为所有队列的默认格式

  • 接收到消息后直接存入磁盘而非内存。
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储
Copyright © 版权信息 all right reserved,powered by aspire-zero and Gitbook该文件修订时间: 2025-03-03 13:56:28

results matching ""

    No results matching ""