• 消息队列的作用:

    • 解耦
    • 削峰
    • 异步
    • 日志处理
  • 什么是消息队列?

    消息队列(MQ)指保存消息的一个容器,本质是个队列。

    但这个队列,需要支持高吞吐,高并发,并且高可用

  • 业界常见的消息队列:

    Kafka:分布式的、分区的、多副本的日志提交服务,在高吞吐场景下发挥较为出色
    RocketMQ:低延迟、强一致、高性能、高可靠、万亿级容量和灵活的可扩展性,在一些实时场景中运用较广
    Pulsar:是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用存算分离的架构设计
    BMQ:和Pulsar架构类似,存算分离,初期定位是承接高吞吐的离线业务场景,逐步替换掉对应的Kafka集群

Kafka

  • 如何使用Kafka

    创建集群→新增Topic→编写生产者逻辑→编写消费者逻辑

基本架构

  • 基本概念:

    Topic:逻辑队列,不同Topic可以建立不同的Topic

    Cluster:物理集群,每个集群中可以建立多个不同的Topic

    Producer:生产者,负责将业务消总发送到Topic中

    Consumer:消费者,负责消费Topic中的消息

    ConsumerGroup:消费者组,不同组Consumer消费进度互不干涉

    Partition【分片】:通常topic会有多个分片,不同分片之间消息是可以并发来处理的,这样提高单个Topic的吞吐

    Offset:消息在partition内的相对位置信息,可以理解为唯一ID,在partition内部严格递增。

    Replica:分片的副本,分布在不同的机器上,可用来容灾,Leader对外服务 ,Follower异步去拉取leader的数据进行一个同步,如果Leader挂掉了,可以将Follower提升成Leader再对外进行服务;每个分片有多个Replica,Leader Replica将会从ISR(In-Sync Replicas)中选出

    ISR:意思是同步中的副本,对于Follower来说, 始终和leader是有一定差距的,但只有当这个差距比较小的时候,我们才可以将这个follower副本加入到ISR中,不在ISR中的副本是不允许提升成Leader的(下图中的Replica3与Leader差距过大,不允许加入ISR)

  • Kafka中副本分布图示例

    上面这幅图代表着Kafka中副本的分布图。图中一个Broker代表一个Kafka的节点, 所有的Broker节点最终组成了一个集群。

    图中整个集群,包含了4个Broker机器节点,集群有两个Topic,分别是Topic1和Topic2, Topic1有两个分片, Topic2有1个分片,每个分片都是三副本的状态。

    这里中间有一个Broker同时也扮演 了Controller的角色,Controller是整个集群的大脑,负责对副本和Broker进行分配。

  • Kafka整体架构:

    在集群的基础上,还有一个模块是ZooKeeper, 这个模块存储了集群的元数据信息,比如副本的分配信息等等,Controller计算好的方案都会放到这个地方

生产消费流程(好的特性)

  • 一条消息的“自述”:

    Producer→Broker→Consumer

    • Producer批量发送消息(Batch)

      批量发送可以减少IO次数,从而加强发送能力

      增大吞吐

    • Producer数据压缩

      通过压缩,减少消息大小,降低带宽流量,以应对可能出现的网络带宽不足的问题。目前支持Snappy、 Gzip、LZ4、ZSTD压缩算法

    • Broker采用顺序写(末尾追加)的方式进行写入,以提高写入效率

    • Broker消息索引(稀疏索引)

      寻找消息的机制

    • Broker零拷贝:

      减少拷贝次数,加快消息数据传输速度

      Consumer从Broker中读取数据,通过sendfile的方式, 将磁盘读到os内核缓冲区后,直接转到NIC buffer进行网络发送
      Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入(从用户态直接写入磁盘,不用再写内核态,减少内存拷贝次数)

    • Consumer的rebalance(关于Consumer如何分配Partition

      背景是Consumer可以并发消费Partition,那么一个Consumer要如何确定自己该消费哪个Partition呢?

      (Low Level)手动分配:在启动consumer的时候预先设定好该consumer会消费的partition;优点是快,但是这样有许多弊端,不能容灾,且不够灵活

      (High Level)自动分配:对于不同的Consumer Group来讲,都会在Broker集群中选取一台Broker当做Coordinator,而Coordinator的作用就是帮助Consumer Group进行分片的分配,也叫做分片的rebalance,使用这种方式,如果ConsumerGroup中有发生宕机,或者有新的Consumer加入,整个partition和Consumer都会重新进行分配来达到一个稳定的消费状态

      • Consumer Rebalance过程示意图

    总结:可以帮助Kafka提高吞吐或者稳定性的功能
    Producer:批量发送、数据压缩
    Broker:顺序写,消息索引,零拷贝
    Consumer:Rebalance

存在的问题

  • Kafka数据复制问题:

    由一些特殊场景下的节点大量【重启/替换/扩容/缩容】(Kafka集群的这类运维操作都会导致节点的变动,都会有数据复制带来的时间成本问题)引发的,会使某个Broker承载大量帮其他节点进行数据同步的时间开销

  • Kafka负载不均衡:

    场景:

    这个场景当中,同一个Topic有4个分片,两副本(因为Partition分布在两个Broker上,即两个机器上),可以看到,对于分片1来说,数据量是明显比其他分片要大的,当我们机器IO达到瓶颈的时候,可能就需要把第一台Broker上面的Partition3迁移到其他负载小的Broker上面,但这个为了降低Broker1的IO,平衡副本之间负载的迁徙操作同样是数据复制操作,同样会引起Broker1的IO升高。所以就需要权衡IO设计出一个极其复杂的负载均衡策略。

  • Kafka问题总结

    1. 运维成本高

    2. 对于负载不均衡的场景,解决方案复杂

    3. 没有自己的缓存,完全依赖Page Cache

    4. Controller和Coordinator和Broker在同一进程中,大量IO会造成其性能下降

      【Controller是整个集群的大脑,负责对副本和Broker进行分配】

      【Coordinator的作用就是帮助Consumer Group进行分片的分配,也叫做分片的rebalance

BMQ

兼容Kafka协议,存算分离,云原生消息队列

架构模型

  • BMQ架构图

    特点:

    • 兼容Kafka,所以Producer和Consumer都兼容

    • 加入Proxy Cluster和Broker Cluster,且Controller和Coordinator被独立出来进行部署

    • 存算分离,可以在底层用另外的分布式存储系统Distributed Storage System进行

    • Meta Storage System类似Kafka中的Zoo Keeper,用于存储集群的元数据信息,比如副本的分配信息等等,Controller计算好的方案都会放到这个地方

    • 读写分离(类似数据库):Proxy Cluster处理读请求,Broker Cluster处理写请求

      得益于存算分离+读写分离,Proxy和Broker都是无状态的,不会存数据,没有数据复制,降低了运维成本:

  • BMQ不会出现Kafka中的负载不均问题

    BMQ底层是借助HDFS进行存储的,HDFS写文件流程:

    通过前面的介绍,我们知道了,同一个副本是由多个segment组成,我们来看看BMQ对于单个文件写入的机制是怎么样的,首先客户端写入前会选择一定数量的DataNode,这个数量是副本数,然后将一个文件写入到这三个节点上, 切换到下一个segment之后,又会重新选择三个节点进行写入。这样一来,对于单个副本的所有segment来讲,会随机的分配到分布式文件系统的整个集群中

    • 和Kafka比较:

      对于Kafka分片数据的写入,是通过先在Leader上面写好文件,然后同步到Follower上,所以对于同一个副本的所有Segment都在同一台机器上面。这样就会存在之前我们所说到的单分片过大导致负载不均衡的问题。

      但在BMQ集群中,因为对于单个副本来讲,segment是随机分配到不同的节点上面的,因此BMQ不会存在Kafka的负载不均问题

读写流程

  • Broker-Partition状态机【写入过程】

    保证任意分片在同一时刻只能在一个Broker上存活

    对于写入的逻辑来说,我们还有一个状态机的机制,用来保证不会出现同一个分片在两个Broker上同时启动的情况,另外也能够保证一个分片的正常运行。

    首先,Controller做好分片的分配之后,如果当前分片分到了某个Broker,首先会start这个分片,然后进入Recover状态,这个状态主要有两个目的:

    1. 获取分片写入权利:也就是说,对于hdfs来讲,只会允许我一个分片进行写入,只有拿到这个权利的分片才能写入
    2. 应对异常情况下的恢复:第二个目的是如果上次分片是异常中断的,没有进行save checkpoint,这里会重新进行一次save checkpoint,然后就进入了正常的写流程状态,创建文件,写入数据,到一定大小之后又开始建立新的文件进行写入。

    状态机图中的Recover状态详解:

    Writer Thread下面的流程对应上一张图白框里的流程

    数据校验: CRC, 验证参数是否合法
    校验完成后,会把数据放入Buffer中,通过一个异步的Write Thread线程将数据最终写入到底层的存储系统当中
    这里有一个地方需要注意一下, 就是对于业务的写入来说,可以配置返回方式,可以在写完缓存之后直接返回,另外也可以等数据真正写入存储系统后再返回,对于这两个来说前者损失了数据的可靠性,带来了吞吐性能的优势,因为只写入内存是比较快的,但如果在下一次flush前发生宕机了, 这个时候数据就有可能丢失了,后者的话,因为数据已经写入了存储系统,这个时候也不需要担心数据丢失,相应的来说吞吐就会小一些。
    我们再来看看Thread的具体逻辑,(Write Data)首先会将Buffer中的数据取出来, 调用底层写入逻辑,(flush)在一定的时间周期上去flush, (Build Index)flush完成后开始建立Index,也就是offset和timestamp对于消息具体位置的映射关系;Index建立好以后,会save一次checkpoint, 也就表示,checkpoint后的数据是可以被消费的,我们想一下, 如果没有checkpoint的情况下会发生什么问题,如果flush完成之后宕机,index还没有建立,这个数据是不应该被消费的,可能会导致数据和index不对应。有了checkpoint就可以保证宕机发生后,也会先Build Index,再进行数据的写入。

    最后当文件到达一定大小之后,需要建立一个新的segment文件来写入。


    状态机图中的Failover状态详解:

    如果在向HDFS中写入segment时出现某个DataNode不能写入,这个时候不会等待这个DataNode恢复,为了保证高可用性,会重新寻找可以写入的DataNode进行写入

  • Proxy【读取流程】

    Wait阶段详解:

    consumer发送的Fetch Request如果没有fetch到指定大小的数据,proxy会先等待一段时间,再返回用户侧,这样就可以降低fetch请求的IO次数

    经过Wait流程后,Proxy会先到Cache中查找数据,找到直接返回,未找到再去存储系统中查找,首先open file,再通过Index找到数据的位置,再读取数据并返回。

  • BMQ多机房部署:

    线上高可用服务除了防止单机故障的意外影响,也要防止机房级别故障带来的影响。

    Proxy对Broker是一对多跨机房访问的

高级特性

泳道→Databus→Mirror→Index→Parquet

  • 泳道:

    解决主干泳道流量隔离问题以及泳道资源重复创建问题

    为了测试环境重新建立一个Topic

  • DataBus

    Kafka生产者代码:

    直接使用原生SDK会有什么问题?

    1.客户端配置较为复杂

    2.不支持动态配置,更改配置需要停掉服务

    3.对于latency不是很敏感的业务,batch 效果不佳

    Databus的架构:

    对应上面原生SDK的缺陷,Databus的优势:

    1.简化消息队列客户端复杂度

    2.解耦业务与Topic

    3.缓解集群压力,提高吞吐(对于高吞吐的业务比较友好,但是对于latency要求高的业务不太友好)

  • Mirror

    解决跨Region的问题(Region是较大的地区概念,比如不同国家)

    不能采取之前多机房部署的方式,那样写数据的延迟会非常高。

    可以在不同Region之间部署Mirror,进行Region之间的异步同步,如下图所示:

    使用Mirror通过最终一致的方式,解决跨Region读写问题

  • Index

    如果希望通过写入的LogID、UserID或者其他业务字段进行消息的查询,应该怎么做?

    直接在BMQ中将数据结构化,配置索引DDL,异步构建索引后,通过Index Query服务读出数据。

  • Parquet

    Apache Parquet是Hadoop生态圈中一种新型列式存储格式,它可以兼容Hadoop生态圈中大多数计算框架(Hadoop、Spark等), 被多种查询引擎支持(Hive、Impala、 Dill等)。

    • 行式存储和列式存储:

      • 行式存储在查询某个表中所有的列数据时更快,如select * from table where id = 1
      • 列式存储在查询某个特定的列时更快

      显然目前列式存储相应的查询方式更为多见

    对于Producer生产的消息,原本的存储方式更像是列存储,可以通过存入Parquet Engine,将数据结构化,使用不同的方式构建Parquet格式文件,方便后续的数据分析

总结:

  1. BMQ的架构模型(解决Kafka存在的问题)
  2. BMQ读写流程(Failover 机制,写入状态机)
  3. BMQ高级特性(泳道、Databus、Mirror、Index、 Parquet)

RocketMQ

  • RocketMQ基本概念

  • RocketMQ架构

    数据流也是通过Producer发送给Broker集群,再由Consumer进行消费

    Broker节点有Master和Slave的概念

    NameServer为集群提供轻量级服务发现和路由

  • RocketMQ存储模型:

    接下来我们来看看RocketMQ消息的存储模型,对于一个Broker来说所有的消息的会append到一个CommitLog 上面,然后按照不同的Queue,重新
    Dispatch到不同的Consumer中,这样Consumer就可以按照Queue进行拉取消费 ,但需要注意的是,这里的ConsumerQueue所存储的并不是真实的数据,真实的数据其实只存在CommitLog中,这里存的仅仅是这个Queue所有消息在CommitLog上面的位置,相当于是这个Queue的一个密集索引

  • RocketMQ高级特性

    事务场景

    在下面的场景中,【库存记录-1】和【消息队列】两步需要保证在同一个事务内

    image-20230209164232759

    即在Producer添加了一个两阶段提交

    延迟发送

    image-20230209164336108

    可以延迟发送消息队列中的消息

    原理是添加了ScheduleTopic来单独处理需要延迟发送的消息,在到了延迟的时间后再回写进CommitLog

    处理失败

    消费消息失败之后会进行重试,超过一定的重试次数会送入Dead Letter死信队列,不会再重试尝试发送

  • RocketMQ小结:

    1. RocketMQ的基本概念(Queue, Tag)
    2. RocketMQ的底层原理(架构模型、存储模型)
    3. RocketMQ的高级特性(事务消息、重试和死信队列,延迟队列)

总结

  • 前世今生:消息队列发展历程
  • Kafka:基本概念、架构设计、底层原理、架构缺点
  • BMQ:架构设计、底层原理、Kafka 比较、高级特性
  • RocketMQ:架构设计、底层原理、高级特性