基本概念

HW(HighWater)

高水位,表示已经提交(commit)的最大日志偏移量(offset),已提交是指 ISRs 中所有节点都已同步到这条日志

LEO(Log endOffset)

日志最后偏移量,表示日志中下一条待写入消息的Offset

ISRs

每个 partition 有一个 leader 和多个 follower(副本因子>1),其中日志状态保持和 leader 同步的 follower 集合被称为 ISRs,于其对应的有落后于 leader 日志状态的副本集合称为 OSRs,ISRs + OSRs = AR(All Replication)


一、为什么不使用Quorum策略?

Quorum 机制

Quorum 机制对集群节点数量有要求,如果需要容忍N个节点故障,集群整体需要2N+1个节点;Quorum 主要能应对集群出现 脑裂问题

ISRs 机制

ISR 机制,如果要容忍N个节点故障,只需保证 ISR 中存在N+1个节点;ISR通过zookeeper来避免来保证leader的唯一

二、数据一致性如何保证?

  1. 关闭unclean,保证新的Leader副本,是从ISRs集合中进行选举
  2. 设置ISRs的最小副本数 >= 2(ISR中包括Leader副本)

以下两类特殊的数据一致性问题,通过引入Leader Epoch机制解决(类似Raft的Terms)

日志丢失问题

log_miss

  1. 某一个时刻,Leader A收到 producer 的消息m2,并成功写入本地Page Cache中。Follower B拉取到m2,写入Page Cache但未刷盘。同步给Leader A HW=2,Leader A更新自身HW=2。此时,A、B同时崩溃
  2. B先恢复,并成为leader 节点,A 恢复后成为follower,HW=2所以不需要截断自身的日志。
  3. Leader B 收到了 producer 的m3消息,此时offset=1的日志,两个节点出现了不一致现象。(A=m2, B=m3)

引入Leader Epoch解决

两个问题产生的原因,都是follower节点在异常情况恢复后,通过自身的HW来决定日志的状态。其实,上面分析的场景和raft中的日志恢复类似,raft中的follower是和leader的日志不一致时,会以leader的日志为准进行日志恢复。而raft中的日志恢复很重要的一点是follower根据leader任期号进行日志比对,快速进行日志恢复,follower需要判断新旧leader的日志(可能由于分区等问题,短暂出现2个leader的情况),以最新leader的数据为准。
这里的leader epoch和raft中的 term 类似,用一个严格单调递增的id来标志。follower每次奔溃重启后,都需要去leader那边确认下当前leader的日志是从哪个offset开始的。

介绍这几个概念各自的用处:

  • Leader Epoch:Leader纪元,单调递增的int值。
  • Leader Epoch Start Offset:Leader的第一个日志偏移,也标志了旧Leader最后一条日志的偏移
  • Leader Epoch Request:Follower向Leader发送请求时,Leader会判断当前纪元是否是自身的epoch,如果是则返回自己的LEO,否则返回下一个纪元的Leader Epoch Start Offset,follower通过这个值做日志的截断处理

    日志截断仅会发生在 follower 节点上

三、如何保证高性能?

  • 多副本:多副本机制,带来多节点的并发能力。由于内部的优先副本分配策略,可以尽可能的保证做到Leader副本的负载均衡(Leader副本被均匀的分配在不同的broker节点)。
  • 磁盘的顺序读写:由于topic的每个partition的消息是不可变的,新消息写入时,不断追加到partition日志的末尾。因此,可以利用磁盘顺序写的特点。数据的删除,因为每个log 被划分为多个segment,每个segment对应一个物理文件,通过删除文件的方式清理partition内的数据。
  • 利用Page Cache:引入Page Cache,由OS来决定刷盘的时机。Page Cache的使用,可以将多个非连续、小块写操作合并,提高磁盘的写入效率,同时减少对文件系统的频繁调用。Kafka本身是Java语言栈,如果使用堆内存做Cache,在kafka进程重启时,数据会丢失,而且堆内内存的占用比数据本身占用的内存大(因为有结构、辅助字段等必要信息存储)。
  • 零拷贝技术:生产者写入数据时,通过mmap提升写数据消息日志落盘的性能;消费者读取消息时。
  • 批量处理:在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO传输。kafka的producer在收到消息时,在积累足够多的消息或等待足够长的时间后,再发送到broker,批处理分摊了网络传输开销,提升带宽利用率,类似 TCP 的 Nagle 算法。
  • 数据压缩:合并消息,减少数据体积

四、如何保证消息的幂等性和顺序性?

幂等性

一般消息系统需要具备三种常见的语义,at most once(至多一次),at least once(至少一次),exactly once(恰好一次),大多数系统都可以做到at most once 和 at least once。

  1. PID (producer ID),用来表示每个 producer 的唯一性(在开启幂等时,每次发送给 broker 时消息中都携带)
  2. sequence numbers,producer 发送给 broker 的每条消息都会带响应的 sequence number,逐次递增
    在配置 enable.idempotence = true 时(开启幂等配置),通过 pid + sequence number,保证了同一个 producer 在 topic-partition 维度的幂等性

顺序性

kafka producer 采用异步发送机制。KafkaProducer.send(ProducerRecord) 方法仅仅是把这条消息放入一个缓存中(即RecordAccumulator,本质上使用了队列来缓存记录),同时后台的IO线程会不断扫描该缓存区,将满足条件的消息封装到某个 batch 中然后发送出去。在这个过程中有一个数据丢失的窗口:若IO线程发送之前 producer 挂掉了,累积在 Accumulator 中的数据的确有可能会丢失。而且当设置 max.in.flight.requests.per.connection > 1 并且 retries >= 1 时,发送到同一个 topic-partition 中的消息中,可能由于网络等其他问题,导致实际顺序与写入顺序不一致,max.in.flight.requests.per.connection = 1 保证有序。
在开启了幂等的情况下,可以保证写入的顺序性

工作流程

未设置幂等时

unidempotence

设置幂等时

idempotence