Kafka | 五 调优
🌒

Kafka | 五 调优

AI custom autofill
Kafka is a powerful distributed streaming platform for real-time data processing.
Tags
CS
Kafka
Published
March 5, 2024

一 调优

1. Kafka 硬件配置选择

1.1 场景说明

100 万日活,每人每天 100 条日志,每天总共的日志条数是 100 万 * 100 条 = 1 亿条。 1 亿/24 小时/60 分/60 秒 = 1150 条/每秒钟。
每条日志大小:0.5k - 2k(取 1k)。 1150 条/每秒钟 * 1k ≈ 1m/s 。 高峰期每秒钟:1150 条 * 20 倍 = 23000 条。 每秒多少数据量:20MB/s。

1.2 服务器选择

服务器台数= 2 * (生产者峰值生产速率 * 副本 / 100) + 1
= 2 * (20m/s * 2 / 100) + 1
= 3 台
建议 3 台服务器。

1.3 磁盘选择

kafka 底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多。 建议选择普通的机械硬盘。 每天总数据量:1 亿条 * 1k ≈ 100g 100g * 副本 2 * 保存时间 3 天 / 0.7 ≈ 1T 建议三台服务器硬盘总大小,大于等于 1T

1.4 内存选择

Kafka 内存组成:堆内存 + 页缓存
 
1)堆内存:建议每个节点:10g ~ 15g
2)页缓存:页缓存是 Linux 系统服务器的内存。我们只需要保证 1 个 segment(1g)中 25%的数据在内存中就好。 每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如 10 个分区,页缓存大小=(10 * 1g * 25%)/ 3 ≈ 1g 建议服务器内存大于等于 11G。

1.5 cpu选择

num.io.threads = 8 负责写磁盘的线程数,整个参数值要占总核数的 50%。
num.replica.fetchers = 1 副本拉取线程数,这个参数占总核数的 50%的 1/3。
num.network.threads = 3 数据传输线程数,这个参数占总核数的 50%的 2/3。 建议 32 个 cpu core。

1.6 网络选择

网络带宽 = 峰值吞吐量 ≈ 20MB/s 选择千兆网卡即可。 100Mbps 单位是 bit;10M/s 单位是 byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s。 一般百兆的网卡(100Mbps )、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。
 

2. 生产者调优

2.1 生产者原理

notion image
参数名称
描述
bootstrap.servers
生产者连接集群所需的 broker 地址清单。 例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。
key.serializer 和 value.serializer
指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memory
RecordAccumulator 缓冲区总大小,默认 32m。
batch.size
缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms
如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks
0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader 收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和 all是等价的。
max.in.flight.requests.per.connection
允许最多没有返回 ack 的次数,默认为 5,开启幂等性 要保证该值是 1-5 的数字。
retries
当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms
两次重试之间的时间间隔,默认是 100ms。
enable.idempotence
是否开启幂等性,默认 true,开启幂等性。
compression.type
生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。

2.2 生产者如何提高吞吐量

参数名称
描述
buffer.memory
RecordAccumulator 缓冲区总大小,默认 32m。
batch.size
缓冲区一批数据最大值,默认 16k。适当增加该值,可 以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms
如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
compression.type
生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4 和 zstd。

2.3    数据可靠性

参数名称
描述
acks
0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader 收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all是等价的。
• 至少一次(At Least Once)= ACK 级别设置为-1 + 分区副本大于等于 2 + ISR 里应答的最小副本数量大于等于 2

2.4 数据去重

参数名称
描述
enable.idempotence
是否开启幂等性,默认 true,表示开启幂等性。
幂等性+事务

2.5 数据有序

单分区内,有序(有条件的,不能乱序);
多分区,分区与分区间无序

2.6 数据乱序

参数名称
描述
enable.idempotence
是否开启幂等性,默认 true,表示开启幂等性。
max.in.flight.requests.per.connection
允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。

3. Kafka Broker

notion image
核心参数
参数名称
描述
replica.lag.time.max.ms
ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出ISR。 该时间阈值,默认 30s。
auto.leader.rebalance.enable
默认是 true。 自动 Leader Partition 平衡。建议关闭。
leader.imbalance.per.broker.percentage
默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds
默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
log.segment.bytes
Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。
log.index.interval.bytes
默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。
log.retention.hours
Kafka 中数据保存的时间,默认 7 天。
log.retention.minutes
Kafka 中数据保存的时间,分钟级别,默认关闭。
log.retention.ms
Kafka 中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms
检查数据是否保存超时的间隔,默认是 5 分钟。
log.retention.bytes
默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最的 segment。
log.cleanup.policy
默认是 delete,表示所有数据启用删除策略; 如果设置值为compact,表示所有数据启用压缩策略。
num.io.threads
默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。
num.replica.fetchers
默认是 1。副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads
默认是 3。数据传输线程数,这个参数占总核数的 50%的 2/3 。
log.flush.interval.messages
强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms
每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。
自动创建主题
如果 broker 端配置参数auto.create.topics.enable 设置为 true(默认值是 true),那么当生产者向一个未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认值为 1)、副本因子为 default.replication.factor(默认值为 1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。生产环境建议将该参数设置为 false。

4. 消费者调优

notion image
notion image

4.1 核心参数

参数名称
描述
bootstrap.servers
向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer和value.deserializer
指定接收消息的 key 和 value 的反序列化类型。一定要写全 类名。
group.id
标记消费者所属的消费者组。
enable.auto.commit
默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms
如果设置了 enable.auto.commit 的值为 true, 则该值定义了 消费者偏移量向Kafka 提交的频率,默认 5s。
auto.offset.reset
当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions
consumer_offsets 的分区数,默认是 50 个分区。不建议修改。
heartbeat.interval.ms
Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。不建议修改。
session.timeout.ms
Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超 过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms
消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes
默认 1 个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms
默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes
默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes ( brokerconfig)or max.message.bytes (topic config)影响。
max.poll.records
一次 poll 拉取数据返回消息的最大条数,默认是 500 条。

4.2 消费者再平衡

参数名称
描述
heartbeat.interval.ms
Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。
session.timeout.ms
Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms
消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
partition.assignment.strategy
消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range +CooperativeSticky。Kafka 可以同时使用多个分区分配策略。可以选择的策略包括: Range 、RoundRobin 、Sticky 、CooperativeSticky

4.3 消费者如何提高吞吐量

增加分区 或者:
参数名称
描述
fetch.max.bytes
默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records
一次 poll 拉取数据返回消息的最大条数,默认是 500 条

5. Kafka总体(重点)

5.1 如何提高吞吐量

  1. 提升生产吞吐量
    1. buffer.memory:发送消息的缓冲区大小,默认值是 32m,可以增加到 64m。
    2. batch.size:默认是 16k。如果 batch 设置太小,会导致频繁网络请求,吞吐量下降;如果 batch 太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
    3. linger.ms,这个值默认是 0,意思就是消息必须立即被发送。一般设置一个 5-100毫秒。如果 linger.ms 设置的太小,会导致频繁网络请求,吞吐量下降;如果 linger.ms 太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
    4. compression.type:默认是 none,不压缩,但是也可以使用 lz4 压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 producer 端的CPU 开销。
  1. 增加分区
  1. 消费者提高吞吐量
    1. 调整 fetch.max.bytes 大小,默认是 50m。
    2. 调整 max.poll.records 大小,默认是 500 条。
  1. 增加下游消费者处理能力
 

5.2 数据精准一次

  1. 生产者角度
    1. acks 设置为-1 (acks=-1)。
    2. 幂等性(enable.idempotence = true) + 事务 。
  1. broker 服务端角度
    1. 分区副本大于等于 2 (--replication-factor 2)。
    2. ISR 里应答的最小副本数量大于等于 2 (min.insync.replicas = 2)。
  1. 消费者
    1. 事务 + 手动提交 offset (enable.auto.commit = false)。
    2. 消费者输出的目的地必须支持事务(MySQL、Kafka)。
    3.  

5.3 合理设置分区数

  • 创建一个只有 1 个分区的 topic。
  • 测试这个 topic 的 producer 吞吐量和 consumer 吞吐量。
  • 假设他们的值分别是 Tp 和 Tc,单位可以是 MB/s。
  • 然后假设总的目标吞吐量是 Tt,那么分区数 = Tt / min(Tp,Tc)。
例如:producer 吞吐量 = 20m/s;consumer 吞吐量 = 50m/s,期望吞吐量 100m/s;分区数 = 100 / 20 = 5 分区
分区数一般设置为:3-10 个
分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区个数。
 

5.4 单条日志大于 1m

参数名称
描述
message.max.bytes
默认 1m,broker 端接收每个批次消息最大值。
max.request.size
默认 1m,生产者发往 broker 每个请求消息最大值。针对 topic级别设置消息体的大小。
replica.fetch.max.bytes
默认 1m,副本同步数据,每个批次消息最大值。
fetch.max.bytes
默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。

6. 压测

6.1 生产者压测

测试: 1、batch.size=16384 linger.ms=0 9.76 MB/sec 2、batch.size=32768 linger.ms=0 9.76 MB/sec 3、batch.size=4096 linger.ms=0 3.81 MB/sec 4、batch.size=4096 linger.ms=50 3.83 MB/sec 5、batch.size=4096 linger.ms=50 compression.type=snappy 3.77 MB/sec 6、batch.size=4096 linger.ms=50 compression.type=zstd 5.68 MB/sec 7、batch.size=4096 linger.ms=50 compression.type=gzip 5.90 MB/sec 8、batch.size=4096 linger.ms=50 compression.type=lz4 3.72 MB/sec 9、batch.size=4096 linger.ms=50 buffer.memory=67108864 3.76 MB/sec