MQ - RabbitMQ全局流量控制

2023年10月2日
大约 6 分钟

MQ - RabbitMQ全局流量控制

当消息积压时,消息会进入到队列深处,消费消息会使服务器性能大大降低。而内存告警和磁盘告警就是通过设置阈值来预防此情形,当达到阈值后,阻塞集群中所有的Connection,直到对应项恢复正常,属于全局性的流量控制 (Global Flow Control)。而这里将要提到的流控是针对连接Connection来的(Per-Connection Flow Control或者Internal Flow Control)。
流控作用:

  • 流控机制是用来避免消息的发送速率过快而导致服务器难以支撑的情形,从而保障服务器资源在安全范围内,提高业务的稳定性。

1. 流控机制

流控是对什么进行控制?

-是对进程邮箱的阈值进行控制。

rabbitmq进程邮箱:

  • 进程通信方式有四种,分别为主从式、会话式、消息传递(邮箱机制)、共享存储区。而Erlang 进程之间并不共享内存 (binary 类型的除外),而是通过消息传递来通信,每个进程都有自己的进程邮箱(mailbox)。
  • 默认情况下,Erlang并没有对进程邮箱的大小进行限制,所以当有大量消息持续发往某个进程时,会导致该进程邮箱过大,最终内存溢出而崩溃。
  • 在rabbitmq中,如果生产者持续高速发送,而消费者消费速度较低时,如果没有流控,很快就会使内部进程邮箱的大小达到内存阈值。

流控机制是什么?

  • rabbitmq使用了一种基于信用证算法(credit-based algorithm)的流控机制,来限制发送消息的速率来解决进程邮箱大小达到内存阈值而导致服务器崩溃问题。
  • 通过监控各个进程的进程邮箱,当某个进程负载过高而来不及处理消息时,这个进程的进程邮箱就会开始堆积消息。
  • 当堆积到一定量时,就会阻塞而不接收上游的新消息。从而慢慢地,上游进程的进程邮箱也会开始堆积消息。当堆积到一定量时也会阻塞而停止接收上游的消息,最后就会使负责网络数据包接收的进程阻塞而暂停接收新的数据。

2. 流控原理

流控原理流程:

  • 进程XXX——> 进程A——> 进程B——> 进程C—— 进程…,每个进程中都有一对关于收发消息的credit值。
  • 拿进程B来看,有两个参数:
  • {{credit_from,C},value]参数,表示能发送多少条消息给进程C,每发送一条消息该值减 1,当为 0 时,进程 B 不再往进程 C 发送消息也不再接收进程 A 的消息。
  • [{credit_to,A},value}参数,表示再接收多少条消息就向进程A 发送增加credit值的通知,进程 A 接收到该通知后就增加{{credit_from,B),value]所对应的值,这样进程 A 就能持续发送消息。
    • 当上游发送速率高于下游接收速率时,credit值就会被逐渐耗光,此时进程被阻塞,阻塞情况会一直传递到最上游。
    • 当上游进程收到来自下游进程的增加 credit 值的通知时,若此时上游进程处于阻塞状态则解除阻塞,开始按收更上游进程的消息,一个一个传导最终能够解除最上游的阻塞状态。
    • 综上分析可知,基于信用证的流控机制最终将消息发送进程的发送速率限制在消息处理进程的处理能力范围之内。

图

3. 流控状态显示

当一个连接(Connection)触发流控时会处于“flow”的状态,代表此Connection状态每秒在 blocked 和 unblocked 之间来回切换数次。以此可以将消息发送的速率控制在服务器能够支撑的范围之内。web页面可以查看,也可以通过 rabbitmactl list_connections 命令查看。

图

4. 流控对象

流控机制对象:

  • 连接(Connection)
  • 信道 (Channel)
  • 队列(Queue)

如下图,从Connection进程——> Channel进程——> Queue进程——> 消息持久化存储进程,是一条消息从接收到存储的一个必需的流控连,对于处于整个流控链中的任意进程,只要该进程阻塞,上游的进程必定全部被阻塞。

图

主要进程:

  • rabbit_reader进程:连接的处理进程,负责接收、解析 AMQP 协议数据包等。
  • rabbit_channel进程:信道的处理进程,负责处理AMQP协议的各种方法、进行路由解析等。
  • rabbit_amqqueue_process进程:队列的处理进程,负责实现队列的所有逻辑。
  • rabbit_msg_store进程:负责实现消息的持久化。

各进程状态情形分析:

  • 当某个Connection处于 flow 状态时,但这个 Connection 中没有一个 Channel 处于 flow 状态,代表这个Connection中有一个或者多个 Channel 出现了性能瓶颈。
  • 存在情形:某些 Channel进程的运作(比如处理路中逻辑) 导致服务器 CPU 的负载过高,尤其是在发送大量较小的非持久化消息时最容易发生。
  • 当某个 Connection处于flow 状态,这个Connection 中也有若干个 Channel 处于 flow状态,但没有任何一个对应的队列处于 flow 状态时,这就意味着有一个或者多个队列出现了性能瓶颈。
  • 存在情形:消息存入队列时导致服务器CPU 负载过高;持久化消息入盘导致服务器 I/O 负载过高,尤其是在发送大量较小的持久化消息时最容易发生。
  • 当某个 Connection处于 flow 状态,这个 Connection 中的若干个 Channel 处于 flow状态,并且也有若干个对应的队列处于 flow 状态时,这就意味着在消息持久化时出现了性能瓶颈。
  • 存在情形:持久化消息入盘导致服务器 I/O 负载过高,尤其是在发送大量较大的持久化消息时最容易发生。

5. 性能提升

一般情况下,向 一个队列里推送消息时,往往会在rabbit_amqqueue_process中(即队列进程中) 产生性能瓶颈。

提升队列性能方式:

  • 第一种,若Erlang版本在18.x以上,可以开启HiPE 功能,可以提高 30%~40%的性能。
  • 第二种,代码层面提升,将单个rabbit_amqqueue_process替换成多个rabbit_amqqueue_process。这里并不是使用多个队列,而是将交换器、队列、绑定关系、生产和消费的方法全部进行封装,这样对于应用来说好是在操作一个(逻辑) 队列。至于怎么封装的,纯运维就不用去了解了,开发就自行百度测试。

图

引用资料

  • https://blog.csdn.net/yi_qingjun/article/details/128496818