kafka相关思考


1. Kafka 中的 ISR (InSyncRepli)、OSR (OutSyncRepli)、AR (AllRepli) 代表什么?

  • AR (All Replicas):表示一个分区(Partition)中所有的副本。这包括 Leader 副本和所有的 Follower 副本。AR = ISR + OSR。
  • ISR (In-Sync Replicas):表示与 Leader 副本保持同步的副本集合。这个集合里的副本,其 LEO(Log End Offset,后面会解释)与 Leader 的 LEO 之间的差距在设定的阈值 (replica.lag.time.max.ms) 之内。只有 ISR 中的副本才有可能被选举为新的 Leader,以保证数据不丢失。
  • OSR (Out-of-Sync Replicas):表示未与 Leader 副本保持同步的副本集合。这些副本的 LEO 已经落后 Leader 太多(超过 replica.lag.time.max.ms),或者已经停止工作。它们无法被选举为 Leader,因为它们的数据不是最新的,如果被选为 Leader 会导致数据丢失。

2. Kafka 中的 HW、LEO 等分别代表什么?

  • LEO (Log End Offset)日志末端偏移量。LEO 是 Kafka 日志文件中下一条待写入消息的偏移量。对于 Leader 副本,LEO 代表它已写入的所有消息的下一条偏移量;对于 Follower 副本,LEO 代表它已从 Leader 复制并写入到本地日志中的下一条消息的偏移量。
  • HW (High Watermark)高水位HW 是一个分区的 Leader 副本和所有 ISR 集合中的 Follower 副本都已成功提交的最小偏移量
    • 生产者可见性:生产者发送的消息,只有当其偏移量小于或等于 HW 时,才会被消费者可见。这确保了消息的持久性和一致性:消息在所有 ISR 副本上都已同步,即使 Leader 宕机,也不会丢失。
    • 消费者可见性:消费者只能消费到小于或等于 HW 的消息。
  • 其他相关概念:
    • MinISR (Minimum In-Sync Replicas):一个分区正常工作所需的最小 ISR 副本数。如果 ISR 数量少于这个值,分区将无法写入消息(Kafka 1.0 版本后可以通过 unclean.leader.election.enable 配置决定是否允许 OSR 选举)。

3. Kafka 中是怎么体现消息顺序性的?

Kafka 保证同一个分区内的消息是有序的

  • 生产者发送: 生产者发送到同一个分区的消息,会按照发送的顺序写入日志,并被赋予递增的偏移量。
  • 消费者消费: 消费者从同一个分区消费消息时,会严格按照这些消息写入的顺序进行消费。
  • 跨分区不保证: Kafka 不保证跨分区的消息顺序。如果你将同一个 Topic 的消息发送到不同的分区,它们在消费者端接收到的顺序可能是乱的。
  • 保证顺序性的关键: 如果业务上需要严格保证一组消息的全局顺序性,你需要将这些消息发送到同一个分区。通常可以通过指定消息的 Key 来实现,因为 Kafka 会根据 Key 的哈希值将消息路由到特定分区。

4. Kafka 中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?

是的,它们是 Kafka 生产者客户端的重要组成部分。

  • 分区器 (Partitioner)
    • 作用:决定生产者发送的每条消息应该被发送到 Topic 的哪个分区。
    • 默认实现:默认分区器会根据消息的 Key 进行哈希,然后取模计算分区。如果消息没有 Key,则轮询发送到不同分区。
    • 自定义:你可以实现 org.apache.kafka.clients.producer.Partitioner 接口来自定义分区逻辑。
  • 序列化器 (Serializer)
    • 作用:将 Java 对象(如消息的 Key 和 Value)转换为字节数组,因为 Kafka 只能传输字节数据。
    • 默认提供:Kafka 提供了常用的序列化器,如 StringSerializerIntegerSerializerByteArraySerializer 等。
    • 自定义:你可以实现 org.apache.kafka.common.serialization.Serializer 接口来自定义序列化逻辑。
  • 拦截器 (Interceptor)
    • 作用:允许用户在消息发送前和发送结果返回后对消息进行拦截、修改或记录。
    • 分为两类
      • ProducerInterceptor:生产者拦截器。
      • ConsumerInterceptor:消费者拦截器。
    • 链式调用:可以配置多个拦截器,它们会形成一个拦截器链。

它们之间的处理顺序 (针对生产者客户端):

  1. 序列化器 (Serializer):最先执行。当生产者准备发送一条消息时,首先会使用 Key 和 Value 的序列化器将其转换为字节数组。
  2. 分区器 (Partitioner):紧接着序列化器执行。消息被序列化为字节数组后,分区器会根据 Key(或无 Key 时的轮询)计算出消息应该发送到的目标分区。
  3. 生产者拦截器 (ProducerInterceptor)
    • onSend() 方法:在消息被序列化和确定分区之后,但在消息被发送到 Kafka 之前调用。你可以在这里修改消息(例如添加 Header)、过滤消息或记录日志。
    • onAcknowledgement() 方法:在消息被 Kafka Broker 成功接收(或发送失败)并收到确认之后调用。你可以在这里记录发送结果、统计成功率等。
    • 顺序:如果有多个拦截器,onSend() 会按照配置顺序依次调用,onAcknowledgement() 则会按照配置顺序的逆序调用。

5. Kafka 生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?

Kafka 生产者客户端的整体结构主要由以下几个核心组件和线程组成:

整体结构:

  1. 主线程 (调用线程):你的应用程序代码运行的线程,负责调用 producer.send() 方法来发送消息。
  2. Producer Record (消息):你通过 producer.send() 发送的实际消息对象,包含 Topic、可选的 Key、Value、分区、时间戳等信息。
  3. RecordAccumulator (消息累加器/缓冲池):一个线程安全的缓冲池,用于存储未发送的消息。producer.send() 方法会将消息先写入到这里。消息会根据目标分区被分组,每个分区对应一个双端队列 (Deque)。
  4. Sender 线程 (后台发送线程):一个独立的后台线程,负责从 RecordAccumulator 中取出消息批次 (RecordBatch),并将其发送给 Kafka Broker。
  5. NetworkClient (网络客户端):负责与 Kafka Broker 进行实际的网络通信(建立连接、发送请求、接收响应)。
  6. Metadata (元数据):存储 Topic 和分区信息,以及 Broker 列表等。NetworkClient 会定期更新这些元数据。

使用了几个线程来处理?分别是什么?

主要使用了两个核心线程

  1. 主线程 (Main Thread / User Thread)

    • 职责:执行应用程序代码,包括创建 KafkaProducer 实例、调用 producer.send() 方法。
    • 特点:send() 方法是一个异步操作,它将消息放入缓冲池后就立即返回,不会阻塞主线程。
  2. Sender 线程 (Background Thread)

    • 职责:这是

      1
      KafkaProducer

      内部启动的一个后台守护线程它负责:

      • RecordAccumulator批量获取消息(RecordBatch)。
      • 将这些批次通过 NetworkClient 发送到对应的 Kafka Broker。
      • 处理来自 Broker 的响应(ACKs),包括成功提交的确认或错误信息。
      • 调用用户提供的回调函数 (Callback) 或生产者拦截器的 onAcknowledgement() 方法。
      • 刷新元数据
    • 特点:这是一个单线程,负责所有分区的消息发送。它不断循环地从消息队列中拉取消息并发送。

总结图示:

1
2
3
4
5
6
7
8
9
10
User Application Thread  ---send()---> RecordAccumulator (Buffer)
|
V
Sender Thread (Background)
|
V
NetworkClient (I/O)
|
V
Kafka Brokers

6.“消费组中的消费者个数如果超过 topic 的分区,那么就会有消费者消费不到数据”这句话是否正确?

正确。

  • 分区是消费的最小并行单位: Kafka 中一个分区在任意给定时间只能被一个消费组内的一个消费者实例消费。这是为了保证分区内的消息顺序性。
  • 消费者与分区的关系:
    • 如果消费者数量少于分区数量:一个消费者会消费多个分区。
    • 如果消费者数量等于分区数量:每个消费者大致消费一个分区(理想情况下)。
    • 如果消费者数量多于分区数量:有些消费者将分配不到任何分区。它们将处于空闲状态,无法消费任何数据。

所以,为了达到最佳的消费并行度,通常建议将消费组中的消费者数量设置得等于或略小于 Topic 的分区数量。


7. 消费者提交消费位移时提交的是当前消费到的最新消息的 offset 还是 offset+1?

消费者提交消费位移时提交的是下一条待消费消息的 offset,也就是**当前消费到的最新消息的 offset + 1**。

  • 为什么要 +1? 偏移量 (offset) 代表的是消息在分区内的位置。当消费者提交 X 作为其消费位移时,Kafka 认为消费者已经成功消费了所有偏移量小于 X 的消息,并且下次开始消费时应该从偏移量 X 的消息开始。
  • 例子: 如果你消费了一条偏移量为 100 的消息,并成功处理了它,那么你应该提交的位移是 101。这表示你已经消费到 100,下次从 101 开始。

8. 有哪些情形会造成重复消费?

重复消费是 Kafka 消费者可能面临的一个问题,通常发生在**“at least once”**的语义下。常见的情形有:

  1. 消费者提交位移失败:
    • 消费者成功处理了消息,但在提交位移到 Kafka 或外部存储(如 ZooKeeper、数据库)时发生网络抖动、宕机等错误,导致位移提交失败。当消费者重启或分区被重新分配给其他消费者时,会从上次成功提交的位移处开始消费,从而再次消费到之前已处理过的消息。
  2. 消费者进程崩溃/重启:
    • 消费者在处理消息的过程中或处理完成后(但在位移提交前)崩溃或被强制终止。当它重启或其分区被其他消费者接管时,由于位移未提交,会从上一次提交的位移开始重新消费。
  3. 网络分区/Rebalance 期间:
    • 在消费者组 Rebalance 期间,如果旧的消费者提交位移太慢或失败,而新的消费者已经开始消费,就可能导致消息被多个消费者重复消费。
  4. 消费业务逻辑异常后未提交位移:
    • 消费者从 Kafka 拉取到一批消息,但在处理这些消息的业务逻辑中途发生异常,并且没有正确地捕获异常并提交位移。下一次拉取时,这些消息会再次被拉取到。
  5. 位移错乱/手动设置错误:
    • 开发者或运维人员手动调整了消费者组的位移到一个较旧的值。
    • 消费位移被非法篡改或存储的位移信息损坏。

解决重复消费的方案: 业务代码层面实现幂等性。即无论一条消息被处理多少次,其最终结果都是一致的。例如,使用消息的唯一 ID 进行去重。


9. 哪些情景会造成消息漏消费?

消息漏消费是更严重的问题,通常发生在**“at most once”**的语义下,这意味着消息可能丢失。常见的情形有:

  1. 消费者先提交位移后处理消息:
    • 消费者拉取到消息后,优先提交了位移,然后才开始处理这些消息。如果在这个处理过程中消费者崩溃、断电或发生其他异常,导致消息未能处理完成,那么由于位移已经提交,下次再消费时将从已提交的位移之后开始,之前未处理的消息就丢失了。
  2. 消息处理过程中发生不可恢复的异常:
    • 消费者成功拉取并开始处理消息,但在业务逻辑处理过程中发生致命错误(例如,数据格式错误导致无法解析,或依赖的外部服务不可用),导致该批次消息无法被正确处理,并且消费者跳过了这些消息(或者被强制关闭,而其位移已经提前或自动提交)。
  3. 自动提交位移(enable.auto.commit=true)间隔过大或时机不当:
    • 如果开启了自动提交位移,并且提交间隔较大。在这段时间内,消费者拉取并处理了一部分消息,但自动提交尚未触发,此时消费者崩溃,那么这批已处理但未提交位移的消息就会丢失。
  4. acks 配置不当:
    • 生产者配置 acks=0(不等待任何确认)或 acks=1(只等待 Leader 确认)。在这种情况下,如果 Leader 收到消息后立即崩溃,而 Follower 还没来得及同步,新选举的 Leader 可能不包含该消息,导致消息丢失。
  5. 生产者发送后未确认:
    • 生产者发送消息后,没有检查返回的 Future 对象,或者没有正确处理发送结果(例如,网络错误、Broker 故障导致消息未能成功写入)。
  6. 消费者被强制退出/被 Kill -9:
    • 消费者进程没有优雅关闭,导致其没有机会提交最终的消费位移。

解决漏消费的方案:

  • 将消费者提交位移的策略设置为**“先处理消息,后提交位移”**。
  • 生产者设置 acks=-1 (或 all),并结合 retries 重试机制。
  • 业务逻辑中对消息处理失败进行重试或降级处理,而不是简单跳过。
  • 使用事务性的生产者和消费者来保证Exactly Once语义(Kafka 0.11+)。

10. 当你使用 kafka-topics.sh 创建(删除)了一个 topic 之后,Kafka 背后会执行什么逻辑?

当你使用 kafka-topics.sh 命令创建(或删除)一个 Topic 时,Kafka 内部会经历一个协调过程,主要涉及 ZooKeeperController Broker

创建 Topic 的逻辑:

  1. kafka-topics.sh 发送请求给任一 Broker: 客户端(kafka-topics.sh)连接到集群中的任何一个 Broker,并发送创建 Topic 的请求。
  2. 请求被转发给 Controller: 这个 Broker 会将请求转发给当前的 Kafka Controller
  3. Controller 在 ZooKeeper 中创建 znode:
    • Controller 负责实际的 Topic 创建工作。它会在 ZooKeeper 的 /brokers/topics 路径下创建一个新的临时 znode,例如 /brokers/topics/your_topic_name
    • 这个 znode 中包含了 Topic 的元数据,如分区数、副本因子等信息。
  4. ZooKeeper 触发 Controller 监听:
    • 由于 Controller 会对 ZooKeeper 中与 Topic 相关的路径设置监听器(Watcher),当 /brokers/topics 路径下有新 znode 创建时,Controller 会收到 ZooKeeper 的通知。
  5. Controller 分配分区 Leader 和 Follower:
    • Controller 接收到通知后,开始执行 Topic 的创建逻辑。它会根据配置的分区数和副本因子,决定每个分区的 Leader 副本和 Follower 副本分别应该落在哪些 Broker 上。这个信息也会被写入 ZooKeeper。
  6. Controller 发送 LeaderAndIsr 请求给相关 Broker:
    • Controller 会向每个分配到该 Topic 分区副本的 Broker 发送 LeaderAndIsr 请求
    • 这些 Broker 收到请求后,会在本地创建相应的分区日志目录和文件,并初始化其作为 Leader 或 Follower 的角色。
  7. Broker 更新本地元数据并响应 Controller:
    • Broker 完成分区创建后,会更新自己的本地元数据缓存,并向 Controller 返回响应。
  8. Controller 更新自身和集群的元数据缓存:
    • Controller 收到所有 Broker 的响应后,会更新自身维护的集群元数据缓存,并广播给集群中的所有 Broker,使得所有 Broker 的元数据都保持一致。

删除 Topic 的逻辑:

删除 Topic 的过程类似,但方向相反。kafka-topics.sh 客户端发送删除请求给任意 Broker,请求转发给 Controller。Controller 会在 ZooKeeper 的 /admin/delete_topics 路径下创建一个待删除 Topic 的 znode。Controller 的监听器感知到这个 znode 后,会向相关 Broker 发送停止服务和删除分区的请求,Broker 删除本地日志文件,最后 Controller 会移除 ZooKeeper 中的相关元数据。


11. Topic 的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?

Topic 的分区数可以增加。

  • 如何增加:

    你可以使用 kafka-topics.sh 命令来增加一个 Topic 的分区数。

    1
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic your_topic_name --partitions <new_total_partitions>

    其中,<new_total_partitions> 必须是大于当前分区数的新分区总数。例如,如果当前是 3 个分区,你想增加到 5 个,那么 <new_total_partitions> 就是 5。

  • 增加原理:

    • 当执行增加分区命令后,请求会发给 Controller。
    • Controller 会更新 ZooKeeper 中的 Topic 元数据。
    • 然后,Controller 会向所有 Broker 发送 UpdateMetadata 请求,通知它们新的分区布局。
    • 受影响的 Broker 会在本地创建新的分区目录和文件。
    • 新的分区一开始没有历史数据,也不会有历史消息。只有当生产者开始向这些新分区发送消息时,它们才会开始写入数据。
  • 注意事项:

    • 增加分区通常是在线操作,不会中断服务。
    • 增加分区不会导致现有数据的重新分布。旧分区的数据仍在旧分区中,新数据会根据分区器分布到新旧分区中。
    • 增加分区后,分区器行为会改变。如果你的生产者使用默认分区器(基于 Key 的哈希),那么增加分区后,相同的 Key 可能会被分配到不同的分区,这会破坏基于 Key 的消息顺序性。你需要谨慎处理这种情况,可能需要调整生产者逻辑或接受新的分区分配。
    • 增加分区后,消费者组可能需要进行 Rebalance,以确保所有分区都被消费者消费。

12. Topic 的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?

Topic 的分区数不可以直接减少。

  • 为什么不可以?

    • 数据丢失风险: 减少分区意味着删除现有的分区。如果这些分区中有未消费的消息,或者即使已消费但需要保留历史数据,这些消息都会被永久删除。Kafka 的设计哲学是尽可能保证数据不丢失。
    • 消息顺序性问题: 如果强行减少分区,并尝试将旧分区的消息合并到新分区,那么原先在不同分区中、各自有序但整体无序的消息可能会被强制排序,从而破坏原有分区的顺序性保证。这会使系统的行为变得不可预测。
    • 实现复杂性: 在保证数据不丢失和顺序性的前提下,实现分区缩减的逻辑非常复杂,可能需要大量的数据迁移和元数据更新,且难以做到无缝在线进行。
  • 替代方案(曲线救国):

    虽然不能直接减少,但如果你真的需要更少的分区,可以考虑以下“曲线救国”的方案:

    1. 创建新 Topic: 创建一个分区数符合要求的新 Topic。
    2. 迁移数据: 编写一个程序,将旧 Topic 中的所有数据逐条读取并写入到新的 Topic 中。这个过程需要小心处理消息的顺序性和幂等性。
    3. 切换生产者和消费者: 将所有生产者和消费者切换到新的 Topic 上。
    4. 删除旧 Topic: 确认所有数据已迁移且服务已稳定运行在新 Topic 上后,再删除旧 Topic。

这种方法虽然繁琐,但能确保数据不丢失和顺序性不被破坏。


13. Kafka 有内部的 topic 吗?如果有是什么?有什么作用?

**是的,Kafka 有内部 Topic。**最常见的内部 Topic 是:

  1. __consumer_offsets
    • 作用:用于存储消费者组的消费位移 (offset)
    • 为什么需要:在 Kafka 0.8.2 版本之前,消费位移是存储在 ZooKeeper 中的。但 ZooKeeper 不擅长高并发的写入,且位移提交是高频操作,容易成为性能瓶颈。Kafka 从 0.8.2 版本开始,将消费位移的存储从 ZooKeeper 迁移到了 Kafka 自身的内部 Topic __consumer_offsets 中。
    • 特点
      • 这个 Topic 是自动创建的,默认是 50 个分区(可通过 offsets.topic.num.partitions 配置)且副本因子为 3(可通过 offsets.topic.replication.factor 配置)。
      • 消息的 Key 是 group.id + topic + partition,Value 是对应的位移信息。
      • 消费位移数据会进行压缩周期性清理(通过 Log Compaction)。
      • 消费者组提交位移时,实际上就是向这个 Topic 发送一条消息。
    • 重要性:它是实现消费者组高可用和消费位移持久化的核心机制。
  2. __transaction_state (或 __transaction_coordinator_state 在早期版本中):
    • 作用:用于存储事务状态
    • 为什么需要:从 Kafka 0.11 版本开始引入了事务(Exactly Once 语义)。事务协调器 (Transaction Coordinator) 需要持久化事务的状态,例如事务的 ID、状态(开启、提交、中止)以及包含的生产者 ID 和分区信息等。这些信息就存储在这个内部 Topic 中。
    • 特点:同样是自动创建和管理的。
  3. __cluster_metadata (Kafka 2.8+,KIP-500/KRaft 模式下):
    • 作用:在 KRaft 模式下(移除了对 ZooKeeper 的依赖),这个 Topic 用于存储 Kafka 集群的所有元数据,包括 Broker 信息、Topic 信息(分区、副本、ISR 等)、控制器信息、用户配额等。
    • 特点:它是 KRaft 模式下取代 ZooKeeper 的核心。

总结:这些内部 Topic 是 Kafka 集群正常运行的基石,它们使得 Kafka 能够高效地管理消费者位移、事务状态和集群元数据,从而实现其高可用和强大的功能。它们通常对用户是透明的,不需要手动管理。


14. Kafka 分区分配的概念?

Kafka 分区分配是指在消费者组 (Consumer Group) 内,将一个或多个 Topic 的所有分区,分配给组内各个消费者实例的过程。这个过程的目标是确保每个分区只被组内的一个消费者消费,并且尽可能地实现负载均衡。

  • 分配时机:

    分区分配发生在消费者组进行 Rebalance (再平衡)时。Rebalance 在以下几种情况会触发:

    1. 新消费者加入消费组。
    2. 消费者离开消费组。 (正常关闭或非正常崩溃)
    3. Topic 分区数增加或减少。 (虽然分区不能减少,但可以增加)
    4. 消费者组订阅的 Topic 发生变化。
    5. 消费者心跳超时。
  • 分配流程 (高层):

    1. 当触发 Rebalance 时,消费者组会选举出一个 Group Coordinator (通常是某个 Broker)。
    2. Group Coordinator 负责协调 Rebalance 过程,并选举出组内的一个 Leader Consumer
    3. Leader Consumer 根据所配置的分区分配策略,生成一个分区到消费者的分配方案。
    4. Leader Consumer 将分配方案提交给 Group Coordinator。
    5. Group Coordinator 将这个方案广播给组内所有消费者。
    6. 每个消费者根据分配方案,开始消费自己被分配到的分区。
  • 分区分配策略 (Partition Assignment Strategy):

    Kafka 提供了多种内置的分区分配策略,你可以通过消费者参数

    1
    partition.assignment.strategy

    进行配置:

    • RangeAssignor (范围分配策略):默认策略。按 Topic 排序,然后将每个 Topic 的分区按范围(partitionId)分配给消费者。例如,Topic A 有 10 个分区,3 个消费者。消费者 1 可能分到 A-0, A-1, A-2, A-3;消费者 2 分到 A-4, A-5, A-6;消费者 3 分到 A-7, A-8, A-9。这种策略可能导致某些消费者分配到的分区数不均匀,尤其是在分区数不能被消费者数整除时。
    • RoundRobinAssignor (轮询分配策略):将所有 Topic 的所有分区“混合”在一起,然后轮询分配给消费者。它通常能实现更均匀的分区分配,但可能导致单个 Topic 的分区不连续。
    • StickyAssignor (粘性分配策略):从 Kafka 0.11 版本引入。它在 Rebalance 时,力求在保持均衡的同时,尽可能地保留之前分配给消费者的分区。这减少了不必要的分区迁移,降低了 Rebalance 的开销。它是目前推荐使用的策略,因为它在保持负载均衡的同时,最小化了分区移动。
    • CooperativeStickyAssignor (协作式粘性分配策略):从 Kafka 2.4 版本引入,是 StickyAssignor 的增强版,支持增量式 Rebalance (Incremental Rebalance)。它允许消费者在 Rebalance 过程中继续消费部分分区,减少了 Rebalance 停顿时间,提高了可用性。
  • 自定义分配策略: 你也可以通过实现 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 接口来定义自己的分区分配策略。


15. 简述 Kafka 的日志目录结构?

Kafka Broker 在其配置的日志目录(通过 log.dirs 配置,可以配置多个目录,用逗号分隔)下,会为每个 Topic 的每个分区创建独立的子目录来存储消息数据。

典型的日志目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
/opt/kafka/data/  <-- log.dirs 配置的根目录之一
├── topic1-0/ <-- topic1 的 partition 0
│ ├── 00000000000000000000.log <-- 日志段文件
│ ├── 00000000000000000000.index <-- 偏移量索引文件
│ ├── 00000000000000000000.timeindex <-- 时间戳索引文件
│ └── leader-epoch-checkpoint <-- Leader Epoch 检查点文件
├── topic1-1/ <-- topic1 的 partition 1
│ ├── ...
├── topic2-0/ <-- topic2 的 partition 0
│ ├── ...
└── __consumer_offsets-0/ <-- 内部 Topic 的 partition 0
├── ...

目录及文件说明:

  1. Topic-Partition 目录 (<topic_name>-<partition_id>/):
    • 每个 Topic 的每个分区(无论是 Leader 还是 Follower 副本)都会在 log.dirs 指定的目录下拥有一个独立的子目录。例如,my_topic-0 代表 my_topic 的第 0 个分区。
    • 这个目录就是存储该分区所有日志数据和索引文件的位置。
  2. 日志段文件 (.log 文件):
    • 00000000000000000000.log
    • 这是实际存储消息数据的文件。消息是追加写入到这些文件中。
    • 当一个日志段文件达到一定大小(由 log.segment.bytes 配置,默认 1GB)或达到一定时间(由 log.roll.mslog.roll.hours 配置)时,会进行日志分段(Log Roll),创建一个新的 .log 文件。
    • 文件名中的数字(例如 00000000000000000000)表示该日志段中第一条消息的起始偏移量 (base offset)
    • 这是 Kafka 高性能的核心之一:顺序读写磁盘,利用了操作系统的页缓存。
  3. 偏移量索引文件 (.index 文件):
    • 00000000000000000000.index
    • 用于存储消息的稀疏索引。它映射逻辑偏移量 (relative offset) 到消息在 .log 文件中的物理位置 (position)
    • 当消费者请求某个偏移量的消息时,Kafka 会先在这个 .index 文件中查找,快速定位到 .log 文件中的大致位置,然后进行顺序读取。
  4. 时间戳索引文件 (.timeindex 文件):
    • 00000000000000000000.timeindex
    • 用于存储时间戳到偏移量的映射。
    • 主要用于按时间查找消息(例如,consumer.seek(timestamp))。它同样是稀疏索引,映射时间戳到对应的偏移量。
  5. leader-epoch-checkpoint 文件:
    • 这是一个文本文件,记录了 Leader 副本的纪元 (Epoch) 信息。
    • 每当一个分区的 Leader 发生变化时,就会创建一个新的 Leader Epoch。这个文件帮助 Broker 在 Leader 选举和恢复时保持一致性。
  6. replication-offset-checkpoint 文件:
    • 记录了 Follower 副本从 Leader 复制的最新偏移量信息,用于恢复 Follower 的复制进度。

16. 如果我指定了一个 offset,Kafka Controller 怎么查找到对应的消息?

Kafka Controller 不直接负责查找到对应的消息,它的主要职责是集群元数据管理和 Leader 选举。查找到对应消息是 Broker 的职责。

当一个消费者指定一个偏移量 offset 来查找消息时(例如通过 consumer.seek(partition, offset)),其流程大致如下:

  1. 消费者发送 FetchRequest 给 Leader Broker: 消费者客户端会向指定分区(Partition)的当前 Leader 副本所在的 Broker 发送一个 FetchRequest,请求从给定 offset 开始的消息。

  2. Leader Broker 接收请求并处理:

    目标 Broker 收到

    1
    FetchRequest

    后,会执行以下步骤来查找消息:

    • 确定目标日志段: Broker 会利用分区目录下的 .index 文件(偏移量索引)来快速定位包含指定 offset 的日志段(.log 文件)。日志段的文件名就是其起始偏移量。
    • 在日志段中查找: 一旦确定了目标 .log 文件,Broker 会在该文件的 .index 索引文件中查找与指定 offset 最接近且不大于 offset 的索引条目。这个索引条目会告诉 Broker 在 .log 文件中的物理位置
    • 顺序读取消息: 从找到的物理位置开始,Broker 会在 .log 文件中顺序读取消息数据,直到达到请求的最大字节数或读取到指定 offset 的消息(包括它自己)及其后续消息。
    • 返回消息给消费者: Broker 将读取到的消息数据返回给消费者客户端。

总结:

  • Kafka Controller: 负责整个集群的元数据(Topics、分区、副本、Broker 信息)管理和 Leader 选举。它知道哪个 Broker 是哪个分区的 Leader。
  • Leader Broker: 负责存储分区的实际消息数据。当需要查找特定偏移量的消息时,是由该分区的 Leader Broker 来完成的。它通过日志段文件.log)和偏移量索引文件.index)来高效地定位和读取消息。

17. 聊一聊 Kafka Controller 的作用?

Kafka Controller 是 Kafka 集群中一个非常重要的角色,它是集群的大脑和管理者。在一个 Kafka 集群中,只有一个 Broker 会被选举为 Controller,其他 Broker 都是普通的 Follower Broker。

Controller 的主要作用和职责包括:

  1. Leader 选举 (Leader Election):
    • 这是 Controller 最核心的功能。当一个分区的 Leader 副本宕机或不可用时,Controller 会负责从该分区的 ISR 列表中选举出新的 Leader。
    • 它会更新 ZooKeeper 中的 Leader 和 ISR 信息,并通知所有相关 Broker。
  2. 集群元数据管理和同步 (Metadata Management and Synchronization):
    • Controller 维护着集群中所有 Broker、Topic、分区、副本、消费者组的元数据信息。
    • 它负责将这些元数据的最新状态同步给集群中的所有 Broker,确保所有 Broker 都拥有最新的集群视图。
  3. Topic 管理 (Topic Management):
    • 创建/删除 Topic: 当用户创建或删除 Topic 时,Controller 负责实际执行这些操作,包括在 ZooKeeper 中创建/删除相应的 znode,并在所有相关 Broker 上创建/删除分区目录。
    • 增加分区: 当 Topic 增加分区时,Controller 负责分配新分区,并通知相关 Broker 创建。
  4. 副本状态管理 (Replica State Management):
    • Controller 负责监控所有分区的副本状态(Leader、Follower、In-Sync、Out-of-Sync)。
    • 当 Follower 副本与 Leader 失去同步时(落后太多),Controller 会将其从 ISR 列表中移除。当 Follower 追上 Leader 时,Controller 会将其重新加回 ISR。
  5. Broker 故障检测和处理 (Broker Failure Detection and Handling):
    • Controller 会在 ZooKeeper 上监听 /brokers/ids 路径。当有 Broker 上线或下线时,Controller 会收到通知。
    • 如果一个 Broker 宕机,Controller 会检测到,并:
      • 将该 Broker 上所有的 Leader 副本转移到其他健康的 Broker 上(Leader 选举)。
      • 将该 Broker 上所有的 Follower 副本标记为不可用。
      • 在 Broker 恢复上线后,Controller 会帮助其进行数据同步和副本状态恢复。
  6. 分区重分配 (Partition Reassignment):
    • 当用户手动执行分区重分配(例如为了负载均衡或 Broker 扩容)时,Controller 负责协调整个重分配过程,确保数据平稳迁移。
  7. Preferred Leader 选举 (Preferred Leader Election):
    • 为了均衡负载,Controller 会周期性地检查并尝试将分区的 Leader 恢复到 Preferred Leader(在 AR 列表中排名第一的副本),前提是 Preferred Leader 也在 ISR 中。

Controller 的选举:

Controller 是通过在 ZooKeeper 上创建临时节点来选举产生的。第一个成功创建 /controller znode 的 Broker 就会成为当前的 Controller。如果当前的 Controller 宕机,这个 znode 会消失,其他 Broker 会竞争创建新的 znode,从而选举出新的 Controller。

总结:Controller 是 Kafka 集群高可用和自动化管理的核心组件。它的存在使得 Kafka 能够在 Broker 故障、元数据变更等情况下保持稳定运行和数据一致性。


18. Kafka 中有那些地方需要选举?这些地方的选举策略又有哪些?

Kafka 中主要有以下几个地方需要选举:

  1. Controller 选举:
    • 作用: 选举出整个 Kafka 集群的唯一大脑,负责集群元数据管理和 Leader 选举。
    • 选举策略:
      • 基于 ZooKeeper 的公平选举: 所有 Broker 都会尝试在 ZooKeeper 的 /controller 路径下创建一个临时(Ephemeral)znode。ZooKeeper 保证只有一个 Broker 能成功创建。第一个成功创建的 Broker 就成为 Controller。
      • Leader Epoch: 为了防止脑裂(Split-Brain)问题,Controller 每次选举成功后会递增一个 Leader Epoch ID,并将其持久化。所有 Broker 在与 Controller 交互时都会带上这个 Epoch ID,旧的 Controller 会被新的 Epoch 拒绝。
    • 时机: Kafka 集群启动时;当前 Controller 宕机时。
  2. 分区 Leader 选举:
    • 作用: 为每个分区选择一个 Leader 副本,所有生产和消费请求都通过 Leader 副本进行。
    • 选举策略:
      • 基于 Controller 的 ISR 选举(默认): 当分区的 Leader 副本宕机时,Controller 会从该分区的 ISR (In-Sync Replicas) 列表中选择第一个健康的副本作为新的 Leader。这是最安全的策略,因为它只从同步副本中选择,保证数据不丢失。
      • 非同步副本选举(Unclean Leader Election):
        • 可以通过配置 unclean.leader.election.enable=true 启用。
        • 在这种情况下,如果 ISR 中没有可用的副本,Controller 会从 OSR (Out-of-Sync Replicas) 甚至所有 AR (All Replicas) 中选择一个副本作为 Leader。
        • 风险: 启用此选项可能会导致数据丢失,因为 OSR 副本可能不包含 Leader 之前的所有消息。但它可以在极端情况下(所有 ISR 副本都宕机)提高可用性,避免分区长时间不可用。
      • Preferred Leader Election (优选 Leader 选举):
        • 为了负载均衡,Controller 会定期(或手动触发)检查每个分区的 AR 列表中的第一个副本是否是当前 Leader,并且是否在 ISR 中。如果是,Controller 会尝试将其选举为 Leader。
        • 这个策略旨在将 Leader 分布到所有 Broker 上,避免某些 Broker 负载过重。
    • 时机: 分区 Leader 宕机;分区 Leader 所在的 Broker 宕机;执行 kafka-preferred-replica-election.sh 命令;Broker 重启后。
  3. 消费者组 Leader 选举 (Group Coordinator/Leader Consumer):
    • Group Coordinator (协调器):
      • 作用: 协调整个消费者组的 Rebalance 过程,并存储消费者组的位移信息。
      • 选举策略: 消费者组的每个消费者会根据 group.id 的哈希值,连接到一个特定的 Broker 作为其 Group Coordinator。这个 Broker 会在启动时被选举出来(通过 Broker ID 的哈希值等)。
      • 时机: 消费者组第一次启动时;当前 Group Coordinator 宕机时。
    • Leader Consumer (组内协调者):
      • 作用: 在一次 Rebalance 中,由它负责根据分配策略生成分区分配方案。
      • 选举策略: Group Coordinator 会选择组内第一个加入的消费者,或者某个特定的消费者作为 Leader Consumer。
      • 时机: 每次 Rebalance 开始时。

19. 失效副本是指什么?有那些应对措施?

失效副本 (Offline Replica 或 Out-of-Sync Replica) 通常指以下几种情况的副本:

  1. 处于 OSR 列表中的副本: 这是最常见的“失效副本”情况。指 Follower 副本的 LEO 已经远远落后于 Leader 副本的 LEO(超过 replica.lag.time.max.ms 配置的时间),或者该 Follower 副本所在的 Broker 已经宕机。Controller 会将这些副本从 ISR 列表中移除。
  2. 副本所在的 Broker 宕机: 如果某个 Broker 宕机,那么它上面承载的所有分区副本都会变为“失效”状态。如果这些副本是 Leader,会触发 Leader 选举;如果是 Follower,会从 ISR 中移除。
  3. 副本自身故障: 比如磁盘损坏、网络隔离等,导致副本无法正常与 Leader 同步。

应对措施:

  1. Leader 选举 (自动处理):
    • 如果失效副本是 Leader,Controller 会立即从其 ISR 中选举出新的 Leader,确保分区可用性。
  2. 副本同步与恢复:
    • 当失效的 Follower 副本所在的 Broker 恢复上线,或者网络/磁盘问题解决后,该 Follower 副本会从新的 Leader 副本那里重新开始同步数据
    • 它会尝试从 Leader 的 HW 开始复制,直到追上 Leader 的 LEO,并重新回到 ISR 列表。
  3. unclean.leader.election.enable 配置:
    • 如前所述,如果所有 ISR 副本都失效,为了可用性,可以配置 unclean.leader.election.enable=true 允许从 OSR 中选举 Leader。但这会牺牲数据一致性。
  4. 增加副本因子 (Replica Factor):
    • 在 Topic 创建时设置更高的副本因子(例如 3 或 5),可以增加容错能力。即使多个副本失效,也能保证 ISR 中有足够多的副本,从而避免分区不可用。
  5. 监控和告警:
    • 密切监控 Broker 的健康状况、ISR 副本的数量(例如 kafka.server:type=ReplicaManager,name=IsrShrinksPerSec 指标),以及磁盘使用情况。
    • 当 ISR 数量低于 min.insync.replicas 或有 Broker 宕机时,及时发出告警。
  6. 分区重分配 (Partition Reassignment):
    • 如果某个 Broker 长期失效或磁盘损坏,可以考虑将它上面的分区副本手动迁移到其他健康的 Broker 上,以避免数据丢失风险和恢复服务能力。
    • 使用 kafka-reassign-partitions.sh 工具。
  7. 日志压缩 (Log Compaction) 和保留策略:
    • 对于 __consumer_offsets 等内部 Topic 或一些需要保留最新状态的 Topic,使用日志压缩可以防止日志无限增长。
    • 对于其他 Topic,合理配置 log.retention.mslog.retention.bytes,确保日志不会无限期地保留,以免耗尽磁盘空间。

20. Kafka 的哪些设计让它有如此高的性能?

Kafka 之所以能达到如此高的吞吐量和性能,得益于其一系列精妙的设计和工程实现:

  1. 顺序读写磁盘 (Sequential Disk I/O):
    • 设计: Kafka 将消息以追加 (append-only) 的方式写入日志文件(Segment Log),并采用顺序读写。
    • 优势: 顺序 I/O 相比随机 I/O 效率高出几个数量级,因为它避免了磁头的频繁寻道,几乎能达到内存读写速度。即使是 SSD,顺序写入也能发挥最佳性能。
  2. 零拷贝 (Zero-Copy):
    • 设计: 在消息传输给消费者时,Kafka 充分利用了操作系统的 sendfile 系统调用 (或类似的零拷贝技术)。
    • 优势: 避免了数据在内核空间和用户空间之间多次复制,减少了 CPU 和内存开销。数据直接从磁盘文件系统缓存发送到网络套接字,显著提高了吞吐量和降低了延迟。
  3. 批处理 (Batching):
    • 设计: 生产者在发送消息时,不是每条消息都立即发送,而是会将多条消息聚合到一起形成一个消息批次 (RecordBatch),然后一次性发送。同样,消费者也是批量拉取消息。
    • 优势: 减少了网络请求的次数和 I/O 操作的开销,分摊了 TCP/IP 协议栈的开销,提高了网络利用率。
  4. 分区 (Partitioning) 和并行度:
    • 设计: Topic 被划分为多个分区,每个分区是一个独立的日志。分区可以分布在不同的 Broker 上,甚至同一个 Broker 上的不同磁盘上。
    • 优势:
      • 可伸缩性: 允许集群水平扩展,通过增加 Broker 和分区来提高整体吞吐量。
      • 并行处理: 生产者可以并行向多个分区发送消息,消费者组可以并行消费多个分区。
  5. 消息压缩 (Message Compression):
    • 设计: Kafka 支持多种压缩算法 (如 Gzip, Snappy, LZ4, Zstandard),可以在生产者端对消息进行批量压缩,然后在 Broker 存储,消费者拉取后解压。
    • 优势: 显著减少了网络传输的数据量和磁盘存储空间,尤其对于大量重复或文本消息效果显著。
  6. 文件系统缓存 (Page Cache):
    • 设计: Kafka 严重依赖操作系统的页缓存 (Page Cache) 来缓存消息数据。
    • 优势: 大部分读写操作都在内存中完成,速度极快。操作系统会智能地管理内存,将最近访问的数据保留在缓存中,提高缓存命中率。即使重启 Broker,页缓存也能保留。
  7. 低延迟的持久化:
    • 设计: 消息写入磁盘后,并非立即 fsync 刷盘,而是异步刷盘或依赖操作系统定期刷盘。只在重要操作(如 Leader 选举、副本同步)时才确保数据持久化。
    • 优势: 减少了磁盘 IO 的阻塞,提升了写入性能。Kafka 通过副本机制来保证数据可靠性,即使少数数据未及时刷盘而 Broker 宕机,也能从其他副本恢复。
  8. 简洁的消费者和生产者 API:
    • 设计: 生产者和消费者客户端 API 简洁高效,减少了不必要的复杂逻辑和额外的开销。
    • 优势: 降低了使用门槛,也减少了客户端的计算开销。
  9. 去中心化的 Leader 选举和协调:
    • 设计: 虽然有 Controller,但每个分区的 Leader 选举和数据复制是独立的,这使得系统更加健壮和并行。Group Coordinator 也分散在不同的 Broker 上。
    • 优势: 避免了单点瓶颈。

这些设计原则和实现细节共同造就了 Kafka 作为高吞吐量、低延迟的分布式消息系统的卓越性能。