一 消费方式
采用主动拉取模式(PULL)。
因为每个消费者的消费性能不同,不能确认推送速率。
二 消费者工作流程
2.1 消费者工作流程
不同消费组之前是消费隔离的,即每个消费组都消费的是该 topic 全部的信息。
且消费组中多个消费者消费多个 Partition 时,每个分区的数据只能由消费者组中一个消费者消费
2.2 消费者组原理
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
消费者组内消费者数量与 Partition 数量关系如下几种:
消费者组初始化流程
coordinator:辅助实现消费者组的初始化和分区的分配
coordinator节点选择 = groupid的hashcode值 % 50( __consumer_offsets的分区数量)
例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。
消费组中消费再平衡
每个消费者都会和coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms5分钟),也会触发再平衡。
- 心跳超过45s没有同步
- 处理消息时间超过5min
以上两个条件都会导致消费者在消费者组中下线,触发再平衡。
消费者组详细消费流程
消费者发送send后等待onSuccess的回调方法获取数据,获取数据的time有如下机种(类似生产者发送数据)
- Fetch.min.bytes每批次最小抓取大小,默认1字节
- fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms(即等待500ms,没有到批次大小也拉回)
另外注意:
Fetch.max.bytes每批次最大抓取大小,默认50m。
数据抓取到completedFetches队列中后,consumer的FetchedRecords 从队列中抓取数据。默认一次抓取500条。
然后经过反序列化、拦截器等过程。
三 消费者API
3.1 独立消费者案例
public static void main(String[] args) { // 0 配置 Properties properties = new Properties(); // 连接 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消费者组id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5"); // 1 创建一个消费者 "", "hello" KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 订阅主题 first ArrayList<String> topics = new ArrayList<>(); topics.add("first"); kafkaConsumer.subscribe(topics); // 3 消费数据 while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } kafkaConsumer.commitAsync(); } }
3.2 独立消费者(订阅分区)
消费者消费指定分区
// 2 订阅主题对应的分区 ArrayList<TopicPartition> topicPartitions = new ArrayList<>(); topicPartitions.add(new TopicPartition("first",0)); kafkaConsumer.assign(topicPartitions);
四 分区分配以及再平衡
Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。
可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。
partition.assignment.strategy = Range、RoundRobin、Sticky、CooperativeSticky。
4.1 Range以及再平衡
原理:针对一个Topic而言
topic数量较多时,容易造成数据倾斜问题。
再平衡案例:
(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 3、4 号分区数据。
2 号消费者:消费到 5、6 号分区数据。
0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、1、2、3 号分区数据。
2 号消费者:消费到 4、5、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。
4.2 RoundRobin 以及再平衡
原理:RoundRobin 针对集群中所有Topic而言。
4.3 Sticky 以及再平衡
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
再平衡案例:
在消费者退出消费组后,将该消费组消费的partition尽量均匀的再分布在剩下的消费者上。但是之前已经分配的partition不重新分配。
五 Offset位移
5.1 offset的默认维护位置
从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。(之前在zk中存储)
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+ 分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact(压缩),也就是每个 group.id+topic+分区号就保留最新数据。
5.2 自动提交offset
kafka提供自动提交offset功能。相关参数见下:
- enable.auto.commit:是否开启自动提交offset功能,默认是true
- auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
5.3 手动提交offset
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
- commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。
- commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
// 3 消费数据 while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } // 手动提交offset kafkaConsumer.commitSync(); kafkaConsumer.commitAsync(); }
5.4 指定offset消费
auto.offset.reset = earliest | latest | none 默认是 latest。
- earliest 从最早开始消费
- latest(默认值) 从启动后增量数据开始消费,采用最新的offset
同时还可以通过指定时间进行消费
5.5 漏消费和重复消费
重复消费:已经消费了数据,但是 offset 没提交。(先消费,再提交)
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。(先提交,再消费)
六 消费者事务
如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset 过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如MySQL)。
七 数据积压(消费者如何提高吞吐量)
1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)
2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间< 生产速度),使处理的数据小于生产的数据,也会造成数据积压。
从一次最多拉取500条,调整为一次最多拉取1000条
二客户端开发
一般消费逻辑需要以下步骤:
- 配置消费者客户端参数、创建相应的消费者实例
- 订阅主题
- 拉取消息并消费
- 提交消费位移
- 关闭消费者实例
消费者客户端 demo 如下:
public class KafkaConsumerAnalysis { public static final String brokerList = "localhost:9092"; public static final String topic = "topic-demo"; public static final String groupId = "group.demo"; public static final AtomicBoolean isRunning = new AtomicBoolean(true); public static Properties initConfig() { Properties props = new Properties(); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("bootstrap.servers", brokerList); props.put("group.id", groupId); //消费组 id props.put("client.id", "consumer.client.id.demo"); return props; } public static void main(String[] args) { Properties props = initConfig(); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); try { while (isRunning.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset()); System.out.println("key = " + record.key() + ", value = " + record.value()); //do something to process record. } } } catch (Exception e) { log.error("occur exception ", e); } finally { consumer.close(); } } }
2.1 必要参数配置
参数配置与生产者类似,只有新增一个必填的 group.id 属性。用于标识消费组 id。
2.2 订阅主题与分区
consumer.subscribe(Arrays.asList(topic));
客户端支持消费多个主题。该部分可以是 Collection 的形式订阅,也可以是正则的方式进行订阅。
客户端还可以直接订阅某些主题的特定分区。
consumer.assign(Arrays.asList(new TopicPartition("topic-demo", 0)));
同时客户端也支持取消订阅,以下三种方式都可以实现取消订阅的目的。
consumer.unsubscribe(); consumer.subscribe(new ArrayList<String>()); consumer.assign(new ArrayList<TopicPartition>());
三种订阅方式:
- subscribe(Collection):AUTO—TOPICS
- subscribe(Pattern):AUTO—PATTERN
- assign(Collection):USER_ASSIGNED
该三种方式一个消费组中只能使用一种。
2.3 反序列化
同生产者类似,消费者也有其反序列化的部分,可以通过实现 Deserializer 接口来自定义反序列化器。如下:
public class ProtostuffDeserializer implements Deserializer<Company> { public void configure(Map<String, ?> configs, boolean isKey) { } public Company deserialize(String topic, byte[] data) { if (data == null) { return null; } Schema schema = RuntimeSchema.getSchema(Company.class); Company ans = new Company(); ProtostuffIOUtil.mergeFrom(data, ans, schema); return ans; } public void close() { } }
2.4 消息消费
Kafka 的消息消费是poll 拉取模式。
public ConsumerRecords<K, V> poll(Duration timeout)
timeout 为 poll 方法的阻塞时间,没有消息时会进行阻塞。消费到的结构类型为ConsumerRecord,与生产者的 ProducerRecord相对应。
public class ConsumerRecord<K, V> { public static final long NO_TIMESTAMP = -1L; public static final int NULL_SIZE = -1; public static final int NULL_CHECKSUM = -1; private final String topic; private final int partition; private final long offset; private final long timestamp; private final TimestampType timestampType; private final int serializedKeySize; private final int serializedValueSize; private final Headers headers; private final K key; private final V value; private volatile Long checksum; }
poll()返回的数据结构为ConsumerRecords,是多个ConsumerRecord的集合。
2.5 消费位移
对于消息在分区中的位置,使用偏移量表示(offset);而对于消费者消费到的位置,使用位移来表示(offset)。
消费位移包括自动提交和手动提交。
自动提交是在固定的时间周期下提交消费偏移量(默认值5s),可能会造成消息丢失和消息重复的问题。
手动提交又可以分为同步和异步提交。
