RabbitMQ中的AMQP
AMQP,Advanced Message Queuing Protocol,高级消息队列协议.
它是一个提供统一消息服务的应用层二进制协议,为面向消息的中间件设计。它是基于TCP/IP协议构造的协议
基于此协议的客户端与消息中间件可传递消息,可跨平台传输。
为什么要用RabbitMQ?有什么好处?场景是什么?
结合黑马点评来说。
异步处理:对于一些不需要立即生效的操作,可以拆分出来,异步执行,使用消息队列实现,性能高。
流量消峰:在高并发场景下,消息队列可以作为一个缓冲区,暂时存储大量请求,避免系统因瞬时高流量而崩溃。比如秒杀中的订单。
应用解耦:消息队列允许生产者和消费者之间通过消息进行通信,而不需要直接调用对方的接口。
RabbitMQ 中有哪些重要的角色?
- 生产者:消息的创建者,负责创建和推送数据到消息服务器;
- 消费者:消息的接收方,用于处理数据和确认消息;
- 代理:就是 RabbitMQ 消息队列本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。
RabbitMQ对应的架构如图:
其中包含几个概念:
publisher
:生产者,也就是发送消息的一方consumer
:消费者,也就是消费消息的一方queue
:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理exchange
:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。virtual host
:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
还有其他什么消息队列?为什么使用RabbitMQ,优缺点?
还有RocketMQ,Kafka
几种常见MQ的对比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
RabbitMQ可靠性比较高:生产者确认、消费者确认、消息持久化。
灵活的路由机制:支持多种交换机类型,可以满足复杂的消息路由需求。
RabbitMQ 拥有友好的管理界面和丰富的文档资源,易于上手和维护。
缺点:
使用erlang实现,不利于二次开发和维护;
在高并发场景下,单机性能可能不如 Kafka ,
交换机的类型
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- headers:不依赖于routing key与binding key的匹配规则,而是根据发送消息内容中的headers属性进行匹配;除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多(目前几乎用不到了)
消息基于什么传输?为什么使用信道?
RabbitMQ 使用信道的方式来传输数据。
- 由于TCP连接的创建和销毁开销较大。
- TCP并发数受系统资源限制,会造成性能瓶颈。
- 信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制
消息属性和有效载荷(消息主体)
Content type: 内容类型
Content encoding: 内容编码
Routing Key: 路由键
Delivery mode: 投递方式(持久化 or 非持久化)
Message priority: 消息优先权
Message publishing timestamp: 消息发布的时间戳
Expiration period: 消息的有效期
Publisher application id: 发布应用的id
怎么设置消息的过期时间?
- 在生产端发送消息时,给消息设置过期时间,单位毫秒(ms)
// 设置"tyson"消息时间为3000毫秒
Message msg = new Message("tyson".getBytes(), mp);
msg.getMessageProperties().setExpiration("3000");
- 在消息队列创建队列时,指定队列的TTL,从消息入队列开始计算,超过该时间的消息将会被移除。
如何确保消息不丢失?
生产者连接重试
就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate
与MQ连接超时后,多次重试。
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数。
生产者确认
一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
开启生产者确认比较消耗MQ性能,一般不建议开启。而且大多都由编程错误导致(RoutingKey填错了,名称写错了,忘记bind)
默认两种机制都是关闭状态,需要通过配置文件来开启。
Publisher Return:当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
Publisher Confirm:
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
- 其它情况都会返回NACK,告知投递失败
MQ的可靠性(持久化)
RabbitMQ 提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ,消息也不会丢失。
为了保证数据的可靠性,必须配置数据持久化,包括:交换机持久化、队列持久化、消息持久化
当发布一条消息到交换机上时,RabbitMQ 会先把消息写入持久化日志文件,然后才向生产者发送响应。
一旦消费者从持久队列中消费了一条持久化消息并且做了确认,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集,从而移除这条消息。
如果持久化消息在被消费之前RabbitMQ重启,服务器会自动重建交换机和队列(以及绑定),并重新加载持久化日志中的消息到相应的队列或者交换机上,保证消息不会丢失。
消费者确认机制
即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch
机制捕获,消息处理成功时返回ack,处理失败时返回nack.
SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
**none**
:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用**manual**
:手动模式。需要自己在业务代码中调用api,发送ack
或reject
,存在业务入侵,但更灵活**auto**
:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:- 如果是业务异常,会自动返回
nack
; - 如果是消息处理或校验异常,自动返回
reject
;
- 如果是业务异常,会自动返回
消费者重试
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。
如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:
Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
- 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
- 本地重试3次以后,抛出了
AmqpRejectAndDontRequeueException
异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject
但是这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
总结
- 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
- 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
- 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
如何解决消息积压的问题
要么是发送变快了。要么是消费变慢了。
- 可以通过 扩容消费端的实例数来提升总体的消费能力。
- 如果短时间内没有足够的服务器资源进行扩容,那么就将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。
如何处理消息堆积情况?几千万条数据在MQ里积压了七八个小时
一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:
- 然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue。
临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。
等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息
由于消息积压导致过期被清理了怎么办
不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。
等过了高峰期以后,这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来。
假设1万个订单积压在mq里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次。
消费端怎么进行限流?
当 RabbitMQ 服务器积压大量消息时,队列里的消息会大量涌入消费端,可能导致消费端服务器崩溃。这种情况下需要对消费端限流。
Spring RabbitMQ 提供参数 prefetch 参数指定了消费者在处理完当前消息之前,可以从队列中预取的消息数量。
- 例如,如果
prefetch=10
,消费者会一次性从队列中拉取 10 条消息到本地缓存,然后逐条处理。 - 如果消费者同时处理的消息到达最大值的时候,则该消费者会阻塞,不会消费新的消息,直到有消息 ack 才会消费新的消息。
默认是不设置prefetch,RabbitMQ 默认会尽可能多地将消息推送给消费者(不是轮寻),可能会导致以下问题:如果某个消费者处理速度较慢,它可能会堆积大量未处理的消息,而其他消费者可能处于空闲状态。
重复消费(业务幂等性)
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
查询和删除是幂等的,但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。
- 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
- 退款业务。重复退款对商家而言会有经济损失。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:
- 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
- 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
- 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
- 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。
唯一消息ID解决:
- 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
业务判断:
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。
因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。
相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。
比如修改订单状态的时候,只有原来是未支付,我才修改为已支付。
如何保证消息的有序性?
RabbitMQ 本身并不完全保证消息的全局有序性。
只能通过合理的配置和设计来实现消息的有序性。
单队列单消费者:如果只有一个队列和一个消费者,RabbitMQ 会严格按照消息进入队列的顺序将消息传递给消费者。
重试机制:比如有一个微博业务场景的操作,发微博、写评论、删除微博,这三个异步操作,如果一个消费者先执行了写评论的操作,但是这时微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行发微博的操作后,再执行,就可以成功。
延迟消息
例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。
法1:死信交换机+TTL
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue
参数设置为false - 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。
而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
死信交换机有什么作用呢?
- 收集那些因处理失败而被拒绝的消息
- 收集那些因队列满了而被拒绝的消息
- 收集因TTL(有效期)到期的消息
如何实现?
ttl.queue根本没有消费者,最终消息会过期,然后被放到死信交换机,到达延迟队列。
注意这里的ttl.fanout
不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct
才能正确路由消息。
法2:DelayExchange插件
- 声明延迟交换机
- 发送延迟消息时:必须通过x-delay属性设定延迟时间:
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。
如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。
假如订单超时支付时间为30分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为30分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。 但是大多数情况下用户支付都会在1分钟内完成,我们发送的消息却要在MQ中停留30分钟,额外消耗了MQ的资源。因此,我们最好多检测几次订单支付状态,而不是在最后第30分钟才检测。 例如:我们在用户下单后的第10秒、20秒、30秒、45秒、60秒、1分30秒、2分、...30分分别设置延迟消息,如果监视器提前发现订单已经支付,则后续的检测取消即可。这样就可以有效避免对MQ资源的浪费了。
你了解LazeQueue吗?
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
- 消费者宕机或出现网络故障
- 消息发送量激增,超过了消费者处理速度
- 消费者处理业务发生阻塞
一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。
此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut
. PageOut
会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。
为了解决这个问题,在3.12版本之后,LazyQueue已经成为所有队列的默认格式
- 接收到消息后直接存入磁盘而非内存。
- 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
- 支持数百万条的消息存储