0%

kafka 运行机制

本文介绍 kafka 相关运行机制,主要包括:

  1. 分区策略
  2. 消息压缩算法
  3. Kafka 通信方式
  4. 消费者组相关机制

分区策略

kafka的分区机制提供了负载均衡的能力,实现了系统的可伸缩性

我们希望生产者将消息均匀的分发到每个分区中,分区策略有:

  • 轮询策略
  • 随机策略
  • 按消息键保序策略

消息压缩算法

希望以极小的CPU开销带来更少的磁盘占用或更少的网络I/O传输

Q1:什么时候进行压缩?

A1:生产者 或 Broker

Broker 压缩的情况:

  1. Producer压缩消息后,发送到Broker,如果Broker指定了压缩算法,且与Producer的算法不同,则会解压后重新压缩;
  2. Broker 端发生了消息格式的转换,比如 Broker 发现有旧版本的消息,需要解压转换为新版本格式,再压缩

Broker 压缩会影响性能,应该尽量避免由于压缩算法设置不一致,或消息版本不一致导致的重复压缩

Q2:什么时候进行解压?

A2:Broker 或 消费者

除了上面提到的 Broker 压缩前需要解压外,每个压缩过的消息在 Broker 端都会解压,对消息执行各种校验;

Think:Broker 具体校验哪些方面呢,能否可以在不解压的情况下校验呢?

有哪些压缩算法?

  • GZIP
  • Snappy
  • LZ4
  • Zstandard(zstd)

四种压缩算法的比较:

  • 压缩比:zstd > lz4 > gzip > snappy
  • 吞吐量:lz4 > snappy > zstd 和 gzip

即:如果希望节省网络带宽,则使用 zstd;如果追求速度,则使用lz4

如何防止 kafka 消息丢失

可能导致消息丢失的场景:

  1. 生产者端丢失:发送过程中由于网络问题、消息不合格等原因,且生产者未感知
  2. 消费者端丢失:先更新offset,后消费消息,如果更新offset后消费消息的线程中断,则它负责的消息没有处理,但是 offset 却更新了

对于上述两种场景的解决方案是:

  1. 生产者端:使用带有回调的 send 方法,在回调中发现失败进行重试等处理
  2. 消费者端:先消费,成功后再提交更新 offset

防止消息丢失的最佳实践:

  1. 【生产者端】使用 producer.send(msg, callback)
  2. 【生产者端】设置 acks = all,表示全部 broker 都收到消息,才认为是已提交
  3. 【生产者端】设置 retries 为一个较大值,自动重试消息发送
  4. 【Broker端】unclean.leader.election.enable = false,不允许一个落后原先 leader 太多的 broker 成为新的 leader
  5. 【Broker端】设置 replication.factor >= 3,将消息多保存几份
  6. 【Broker端】设置 min.insync.replicas > 1 消息至少要被写入多少个副本才算是已提交
  7. 【Broker端】确保 replication.factor > min.insync.replicas,推荐 replication.factor = min.insync.replicas + 1
  8. 【消费者端】消费成功后再提交

Kafka 的通信方式

生产者是如何和Broker通信的

通信协议:TCP

什么时候建立连接:

  1. 创建 producer 实例时,生产者应用会在后台创建一个线程,该线程会与 bootstrap.server 参数中指定的所有 Broker 创建连接
  2. Producer 与 bootstrap.server 的 Broker 请求 kafka 集群 Metadata,拿到集群的全部 Broker 后,Producer 会与未建立连接的 Broker 建议连接,这样 Producer 就与集群中所有 Broker 都建立了连接
  3. Producer 在发送消息时,如果发现与目标 Broker 未建立连接,会创建连接

消费者组

消费者组的作用

使消费者的水平扩展,提高消费能力

位移是如何管理的

不同的版本位移的管理是不同的

2.8.0 之前:zookeeper

2.8.0 之后:kafka 内部主题 __consumer_offsets

位置放在 zookeeper 中存在的问题:zookeeper 不适合频繁的写更新,大吞吐量的写操作会极大地拖慢 zooKeeper 集群的性能

位移主题:保存消费者的位移信息,需要保证持久性和高吞吐量

consumer 如何提交位移:

  • 自动提交

    参数

    enable.auto.commit = true

    auto.commit.interval.ms = 5000

    执行流程:周期性的 poll(拉取)消息前,都提交上次 poll 的位移

    优缺点:可以保证消息不丢失,但可能重复消费

    为什么会重复消费:在一个周期内发生重平衡,重平衡后再开始消费,由于本周期的位移没有提交,拿到的位移是本周期前的,导致重平衡前本周期消费的消息被重复消费

  • 手动-同步提交

    kafkaConsumer#commitSync()

    优缺点:灵活可控,提交失败可以自动重试,但提交过程中会阻塞消费者进程,也有重复消费的问题(已消费未提交位移时重启、重平衡)

  • 手动-异步提交

    kafkaConsumer#commitAsync()

    优缺点:灵活,异步提交,不阻塞消费进程,缺点是发现提交失败,也无法重试,因为重试提交的也不是最新的位移,只会导致重复消费

综上,无论用哪种提交位移的方式,都会产生重复消费,所以需要消费者自己处理重复消费,如天然是幂等的,或去重

消费者重平衡

何时发生重平衡:

  • 消费者组成员数量发生变更
  • 订阅主题数发生变更
  • 订阅主题的分区数发生变更

重平衡的影响:在重平衡过程中,组内所有实例都不能消费任何消息,影响了消费者的 TPS

消费者线程

从 0.10.1.0 版本开始,kafka consumer 采用了用户主线程和心跳线程的双线程设计,是为了将心跳线程与主线程的 poll 方法分开,防止因消费时间过长无法及时 poll 导致的重平衡

在单节点上,消费线程是单线程。如果希望多线程消费,可以采用下面几种方案:

  • 方案一:启动多个消费者线程,即拉取消息又处理消息
  • 方案二:单(多)消费者线程负责拉取消息,多线程处理

方案一就是在一个节点上启动了多个消费者,和启动多个消费者节点是一样的,所以启动的消费者数受限于主题分区数,如果启动了大于分区数的消费者,那么多启动的线程也是空闲的

方案二将拉取消息与处理消息分开,如果拉取消息是多线程,与方案一相同,多线程处理消息可以提高消费速度,但是不能保证消息有序消费;多线程消费需要手动提交位移,需要感知到当前 poll 的消息全部处理完

位移管理

消费者接入到一个 partition 后从哪个位置开始消费,与 auto.offset.reset 有关:

  • earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • nearlist: 当各分区下有已提交的offset时,从提交的offset开始消费;如果小于 beginingOffset 从 earliest 开始,如果超过 endOffset 从 latest 开始
offset.drawio

消费者负载不均衡

消费模式

顺序消费

通过分配规则,对消息 key 计算出对应的 partition,发送到指定的 partition 中

Q:什么情况会破坏顺序消费?

A:批量消费,没有去重,发生 Rebalance 时,可能破坏顺序

比如批量 poll 了一批消息 [a1, a2, a3, a4, a5],已经消费到 a4,还没有提交位移,这时发生了 rebalance,则 rebalance 后会重新消费 a1, a2, a3, a4(不去重的情况),这时发生了先消费 a3,后消费 a1 的情况

延时消费

见延时消费

集群设置

broker 负载均衡

consumer 负载均衡

负载均衡策略(Rebalance 策略)

  • Range 算法

    1. 对每个 topic 里面的分区按序号排序(P0, P1, P2,...),对消费者按字母顺序排序(C0, C1, C2, ...)
    2. 对每个 topic 都执行操作:将分片依次分配给消费者

    存在的问题:多个 topic,且分区大于消费者个数时,排序靠前的消费者会分配到更多的分区,比如有 N 个 topic,每个 topic 的分区数都比消费者个数多一个,则 C0 比其他消费者多分配 N 个分区

  • RoundRobin 算法

    1. 遍历主题列表,从每个主题获取其所有分区的列表,组成分区集合 P
    2. 对 P 中分区列表进行排序,首先按照主题名称,然后按照分区编号进行排序
    3. 对消费者实例进行排序,保证分配过程中的顺序是确定的
    4. 系统遍历P中的分区列表,从第一个消费者开始,依次分配一个分区,直至分配完列表中的所有分区

    存在问题:

  • Sticky 算法

    目标:

    1. 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个
    2. 分区的分配尽可能的与上次分配的保持相同

    步骤:

    1. 按照 RoundRobin 的方式分配
    2. 发生重平衡时,将离线消费者的分区分配给在线消费者,保证分区差不超过1个

消费者组不均衡的场景

  • 场景1:消费者组订阅了多个topic,而 topic 之间流量差异很大,默认的分配策略是不考虑 topic 具体流量的,只会按分区数分配
  • 场景2:有一些内置 topic,如 RETRY_TOPIC,这些内置 topic 平时流量很少,但也参与分配;这就可能导致一些实例上的消费者只分配了内置 topic 的分区,而一些实例上分配了正常 topic 的分区,导致负载不均(与场景1其实是同一个问题)

解决方案:

  • 场景1:给不同 topic 设置权重,按照权重分配
  • 场景2:给内置 topic 设置权重为0

服务实例分配分区不均衡的场景

由于分区分配策略是在消费者组内部的,不会考虑其他消费者组已分配的分区,当服务实例在多个消费者组中,容易出现不均衡的现象,下面是可能发生的场景:

  • 场景1:使用 RoundRobin 策略,3个实例(A,B,C),在2个消费组内,订阅2个Topic,各4个分区(P0-0, P0-1, P0-2, P0-3, P1-1, P1-2, P1-3, P1-4)

    分配后的结果为:A分配了P0-0,P0-3,P1-1, P1-4, B分配了P0-1,P1-2,C分配了P0-2,P1-3,可以发现A比B,C多分配了两个分区,如果在更多的消费组中,则这个问题会更严重

  • 场景2:使用 Sticky 策略,3个实例(A,B,C)同上

    先看一个消费组内,在启动服务的过程中,由于分配策略和服务启动先后有关,则一定有一个实例多分配了一个,在N个消费组的情况下,这个实例会被多分配N个分区(因为不同消费组的分配策略是一致的)

  • 场景3:使用任何一种策略,10个实例,在30个消费者组中,30个topic,每个topic4个分区

    分配后,肯定存在4个实例,被分配了30个分区,6个实例,分配0个分区

实验验证

只验证场景1,2,场景3同理

3个Topic,每个 topic 4个分区,3个消费者组,每组3个消费者,client.id 依次是 c1, c2, c3

使用 RoundRobin 和 Sticky 策略,观察分区分配的结果

RoundRobin策略

分配结果 分区统计
第一个消费者上线 所有分区都是C1 C1:12个
第二个消费者上线 T0-P0,P2:C1
T0-P1,P3:C2
T1-P0,P2:C1
T1-P1,P3:C2
T2-P0,P2:C1
T2-P1,P3:C2
C1:6个
C2:6个
第三个消费者上线 T0-P0,P3:C1
T0-P1:C2
T0-P2:C3
T1-P0,P3:C1
T1-P1:C2
T1-P2:C3
T2-P0,P3:C1
T2-P1:C2
T2-P2:C3
C1:6个
C2:3个
C3:3个

Sticky 策略

分配结果 分区统计
第一个消费者上线 所有分区都是C1 C1:12个
第二个消费者上线 T0-P0,P1:C1
T0-P2,P3:C2
T1-P0,P1:C1
T1-P2,P3:C2
T2-P0,P1:C1
T2-P2,P3:C2
C1:6个
C2:6个
第三个消费者上线 T0-P0,P1:C1
T0-P2:C2
T0-P3:C3
T1-P0,P1:C1
T1-P2:C2
T1-P3:C3
T2-P0,P1:C1
T2-P2:C2
T2-P3:C3
C1:6个
C2:3个
C3:3个

可以发现,在RoundRobin策略和Sticky策略中,C1比C2,C3多分配3个分区

解决方案

上面两个场景的本质是相同的,都是多消费者组分配分区执行结果一致,导致部分实例被多分配

场景1:不同消费组对消费者排序后,机器的顺序是相同的,排序的算法对 client.id 的 hashCode 进行排序的,所以只要让同一实例上不同组内的消费者 client.id 不同,就可以避免这个问题

场景2:Sticky分配策略与启动顺序有关,且消费者进程和服务启动是同步的,解决方案是将消费者进程的启动和实例的启动解耦,通过随机睡眠的方式,利用消费者进程启动的随机性,避免这个问题

场景3:无论使用哪种策略,都要加入随机睡眠

Reference

如何为Kafka集群选择合适的Topics/Partitions数量

kafka的 groupid 与 auto.offset.reset

Kafka Note(六)负载均衡机制

kafka 分区分配策略

kafka 的几种分配策略