Kafka | 四 Broker
🌒

Kafka | 四 Broker

AI custom autofill
Kafka | Four Broker: A deep dive into Kafka's architecture and scalability.
Tags
CS
Kafka
Published
March 5, 2024

一 Broker工作流程

notion image
 

二 Kafka副本

2.1 基本信息

  • Kafka默认副本为1,生产环境一般设置为2。太多副本会造成磁盘和网络压力。
  • 副本功能为保证数据可靠性。
  • Kafka分区中所有副本统称为 AR
AR = ISR + OSR
ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。
 

2.2 Leader选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。
leader选举策略:在isr中存活为前提,按照AR中排在前面的优先
见图:1中的选举流程
 

2.3 Broker故障处理(数据一致性)

LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
HW(High Watermark):所有副本中最小的LEO 。
 
  1. follower故障
notion image
b. leader故障
notion image
 

2.4 分区副本分配

分配原则:尽可能负载均衡,且leader不再同一Broker上。
分配方案:按Broker列表进行主-从-从分配,每个partition中间空两个Broker。且在分配完Broker.size个partition后,新的partition分配时进行主-空-从-从方式分配。不断加入空位,避免刚好将主节点分配在同一Broker上。
 

手动分配

如果服务器之间容量不同,可以通过手动方式来平衡。
 
(1)创建一个新的 topic,名称为 three。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic three
(2)查看分区副本存储情况。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
(3)创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。
[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
输入如下内容:
{ "version":1, "partitions":[{"topic":"three","partition":0,"replicas":[0,1]}, {"topic":"three","partition":1,"replicas":[0,1]}, {"topic":"three","partition":2,"replicas":[1,0]}, {"topic":"three","partition":3,"replicas":[1,0]}] }
(4)执行副本存储计划。
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh -- bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
(5)验证副本存储计划。
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh -- bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
(6)查看分区副本存储情况。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
 

负载均衡

Leader partition 自动平衡
一般生产中不开启,再平衡需要较长时间,影响性能。
notion image
 
 

三 文件存储

3.1 存储机制

Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。
Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment(1G)。
每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。
notion image
 
log文件和index文件详解(kafka数据索引-稀疏索引)
notion image
注意:
1.index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引。参数log.index.interval.bytes默认4kb。
2.Index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大,因此能将offset的值控制在固定大小
notion image
 
index文件:
notion image
log文件:
notion image
 

3.2 文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
判断数据可以根据.timeindex文件判断
  • log.retention.hours,最低优先级小时,默认 7 天。
  • log.retention.minutes,分钟。
 
数据超期清理策略有 delete 和 compact 两种

delete 日志删除:将过期数据删除

log.cleanup.policy = delete 所有数据启用删除策略
(1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。 即最晚的数据。
(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。
配置:log.retention.bytes,默认等于-1,表示无穷大。
 

compact 日志压缩

compact日志压缩:对于相同key的不同value值,只保留最后一个版本。
这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。
 

四 高效读写数据

1. Kafka 本身是分布式集群,可以采用分区技术,并行度高

2. 读数据采用稀疏索引,可以快速定位要消费的数据

3. 顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

4. 页缓存 + 零拷贝技术

零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。
PageCache页缓存:Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存 都当做了磁盘缓存来使用。