Kafka笔记
https://www.bilibili.com/video/BV1vr4y1677k?p=79&vd_source=78951f3f7dcd752bebcfd9734a584537
什么是kafka
分布式的基于发布/订阅模式的消息队列
常见的消息队列:
KafKa、ActiveMQ、RabbitMQ、RocketMQ
消息队列的应用场景:
-
缓冲/消峰:使用一个消息队列缓存消息,解决消费者消费速度慢的问题
-
解耦:不同的数据源不用单独编写特定的发送数据的代码
-
异步通信:有些任务并不需要实时阻塞处理,可以将这些任务丢进任务队列,然后顺序处理
消息队列的两种形式
-
点对点模式:消息队列中间件维护一个消息队列,生产者向其他添加数据,消费者从里面拉取数据后删除消息队列中的数据
-
发布/订阅模式:消息队列中间件内部维护多个不同主题的消息队列,消费者按照主题拉取数据后,不删除队列中的数据,以便于其他消费者也能拉取同样的数据
架构
生产者 Producer
注:同一个topic分为多个partition,它们直接没有主从关系,只有同一个partition与其副本之间有主从关系,所说的leader或follower很多时候指的是它们所在的那个broker
数据发送过程:
主线程获取到数据后,调用send方法(分为同步和异步方式,同步模式下下一批数据必须等待上一批发送成功后才发)发送数据,数据在发送前会先经过拦截器(校验、修改等),然后经过序列化器,得到序列化数据之后,数据被传送到分区器(因为一个topic可能有多个分区),分区器中有一个发送缓冲区,有多少个分区,缓冲区中就有多少个双端队列,数据并不是直接放在双端队列中等待发送,而是将数据放到一个16k大小的内存块(batch.size)中,如果该内存块满了或者超过等待时间(linger.ms)才会将该内存块放入双端队列中等待发送(就好比寄快递,快递车不会一次只运一个快递,而是将多个快递丢到车上,车满了才走)
发送数据的任务由Sender线程做,它使用NIO的方法与每个broker保存连接,kafka集群收到数据后将根据配置做出响应,配置有三个可选值:
-
0:kafka集群不需要返回响应
-
1:leader收到数据则响应成功
-
-1:leader和副本(实际不是所有的副本,而是ISR中的副本,副本可以理解为一个broker)都收到数据则响应成功
Producer收到成功响应后才会将双端队列中的数据删掉,否则会不断重试
分区 Partition
为什么要分区?
每个Partition存放在一个Broker上,如此一来分区就有两个好处:
-
从空间上看,可以将一个大的数据分开放在多台服务器上
-
从时间上看,一次就能同时生产或消费多条数据,增加了吞吐量
分区数可以大于broker数,此时一个broker就可能有多个分区,可以自动分配也能手动分配
分区策略
默认分区策略:
-
如果发送数据时指定了分区,则使用该分区
-
如果发送数据指定了一个key,则计算该key的hash,并由hash映射到某分区
-
如果没有指定key,则使用上一次使用的分区,如果上一次使用的分区内存块满了(16kb),则再随机选一个分区发送数据
自定义分区器:
创建一个类实现Partitioner接口,并实现其中的partition方法即可。
生产者如何提高吞吐量
主要是对以下4个参数的调整:
-
batch.size 和 linger.ms 两个参数的调整
-
数据压缩参数
-
缓冲区大小
如何保证发送数据的可靠性
Producer向kafka集群发送数据,可以要求集群是否返回成功结果,以及何时返回成功结果,即上文中的 0,1,-1 三个配置,可靠性最高的为-1,但会出现一些其他的问题:
-
follower挂了:由于leader要等到所有的follower响应成功之后才会判定成功,如果此时有任意follower下线了,leader就阻塞住了。
所以并不是要等所有的follower都同步成功,kafka会维护一个ISR队列,里面保存了所有的Broker id,并由心跳机制检测所有的broker,如果30s后broker仍没响应,则将其从ISR中剔除(ISR中包含leader和活着的follower所在的broker)。
响应成功时,只有ISR中所有的节点都成功,才会返回保存成功 -
数据重复:如果leader收到数据后向副本同步数据后,leader还没来得及向producer返回成功信号,挂了,此时副本就会晋升为leader,由于producer没有收到成功信号,所以就会重传,但实际上此时由副本晋升的leader以及有数据了,重传后就会发生数据重复的问题
如何保证数据不重复发送
幂等配置
kafka中有个配置 enable.idempotence 用于配置是否开启幂等性,默认开启,如果开启的话,则会:
producer生产的每一条数据都会自动加上一个由 <PID, Partition, SeqNumber> 三个元素组成的tag,当该tag以及存在时,kafka集群就会将该数据丢掉。其中,PID为kafka的重启版本号,每次重启kafka都会重置,partition则为消息的分区号,SeqNumber为一个自增的数,每次生产一个消息都会自增
事务
事务是依赖幂等配置的,用于解决kafka重启后,仍然可能重复接收数据的问题(PID重置了)
大致意思就是在调用send方法后,数据并没有真正持久化到kafka集群,而是还需要调用一个commit方法来提交该事务
如何保证发送数据顺序
默认条件下,只有单分区内数据的拉取是有序的
但是,kafka有一个机制,发送数据时类似于滑动窗口机制,它首先从发送缓冲区拿到一个数据块(16kb)发送给kafka集群,但此时如果配置的发送缓存大于1,则发送方并不会等待第一个数据成功响应才发第二个,而是直接继续发送第二个数据块,这就造成了如果接收过程中,中间某个数据块发送失败,则后续重试发送时它就不在原来顺序了
kafka的一个解决方法是,在kafka集群维护一个长度为5个数据块的接收缓冲,只有当5个连续数据块都存在于该缓冲区时,才会持久化这5个数据块。如何知道5个数据块是否连续,这就依赖幂等性了。故,这种方法只要求开启幂等性并且发送缓冲小于等于5即可
Broker
注:对于一个topic来说,broker和partition等价,所以leader partition就是leader broker
zookeeper
早期的版本kafka会将broker信息保存到zookeeper中:
broker启动流程:
一个broker上线后,会先向zookeeper的ids节点中添加其brokerid(下线后会删除),多个broker会抢夺controller身份(zk中的contrller节点),该身份用于leader的选举,controller会监听broker的变化,一旦当前的leader挂了就会将其他follower晋升,晋升规则为:kafka维护一个有序数组(AR)记录所有的brokerid,并按照该数组顺序晋升(前提是其也存在ISR中)。
服役和退役
新节点加进来并对原有topic进行负载均衡称为服役
新节点加进来后,原有的topic partition并不会使用该节点,如果要负载均衡之前的topic,大致可执行以下两步:
-
先创建一个json文件,里面指明要重新分配的topic和broker ids
-
执行 reassign-partitions –generate 脚本并指明上述json文件,该命令会生成一个重新分配分区的计划
-
如果上述生成的计划满足要求,则将其保存到一个json文件
-
执行reassion-partitions –execute 脚本,指明上述json文件,完成分区
要退役某节点时,只需要将该节点的数据转移到其他broker(使用服役操作),然后删除该节点即可
Kafka副本 Replicas
默认每个partition副本为1个,kafka中所有的操作只针对leader,follower只负责容灾备份
所有的副本称为 AR,AR=ISR+OSR,其中:
-
ISR:所有可以保持心跳连接的broker
-
OSR:没能保持连接的broker
broker down机
如果某follower故障下线后又恢复了,显然它的数据以及和kafka集群的数据不一致了,如何才能使它重新进入ISR呢?
又或者leader下线了,如何保证消费者拉取的数据没有发送变化呢?
每个broker中的follower都会维护两个变量:HW(High WaterMark)和 LEO(Long End Offset),其中HW保存所有副本中,数据长度最小的偏移量,LEO保存当前follower的最大数据接收偏移量。用户只能看到HW前的数据
如果leader down 机了,则某follower会晋升为新的Leader,由于HW在所有的Broker中相同,所以用户看到的数据也不会变,并且其他的follower也要将数据同步到HW的位置
如果follower down机后恢复了,则它会从其自己的HW处向leader同步数据
Leader Partition自平衡
由于读写请求都发生在leader节点,所以某broker中leader越多,它越忙碌,默认情况下,Kafka会自动将leader均匀分布
但有时候,如果某broker down机,则它上面的leader就会转移到其他broker上,如果它又上线,则它上面仍然都只有follower,就会引起一个不平衡。
例如,某topic的AR为 [ 0, 1, 2 ],表示有三个副本分散在0、1、2三个broker上,初始情况下,0应该就是leader(因为leader的选举是按照AR的顺序来的),但如果此时0掉线了,则1成为新的leader,后续0又上线了,此时AR的首broker就与leader所在的broker不一致了,称之为1个不平衡,不平衡率为1/3
Kafka规定,每过300s检测一下平衡率,如果平衡率大于10%,则会发起自平衡机制,重新分配leader所在的broker。
文件存储机制
每个partition在broker中都以目录的方式存在,例如 data/topic-partition_num
该目录下存放三个文件:
-
00001024.log:保存实际是数据
-
00001024.index:保存索引数据所在log文件的位置
-
00001024.timeindex:kafka默认数据保持7天,过期删除,所以需要记录每条数据的创建时间
这三个文件组成一个segment,kafka规定,为了便于索引,一个segment不能大于1G。一个segment写满后,重新创建一个segment,并将起始的index偏移位置作为文件名保存(即上面的00001024)
稀疏索引
实际上,kafka并不会真的对每一条数据都在 index 文件中建立索引,这样可能是的该文件过大,不利于检索。
kafka会在记录1kb数据后才在index文件中记录一个索引,如果有检索请求进来,首先根据文件名找到对应哪一个segment,然后再根据该segment中的index文件找到目标index所在日志文件中大致的范围,然后,根据该范围的起始index进行搜索。整个过程类似于跳跃表
文件删除
kafka根据配置的检测时间来检测数据保存了多久(segment中最新的数据保存了多久),如果超过阈值则会进行删除segment,删除方式有两种:
-
物理删除segment
-
根据消息的key进行压缩,只保留相同key的最新值,例如用户年龄,每年都在+1,则只保留最新的
Kafka为什么这么快?
-
分布式架构,提高了吞吐量
-
稀疏索引,提高了索引速度
-
顺写日志
-
零拷贝和页缓存(见下)
页缓存:此为操作系统提供的功能,磁盘的读写会有一片内存区域作为页缓存,Kafka则只负责读写页缓存,而不关心数据的持久化策略,如此一来,就是纯内存操作
零拷贝:kafka作为消息中间件,它并不处理消息,所以当有消息过来的时候它只需要将其放到页缓存中,当有消费者读数据时,由于它不需要处理数据,所以直接返回页缓存的位置即可,避免内核态数据向用户态复制
消费者
kafka使用pull的方式,消费者自己从kafka集群拉取数据,这样可以适用不同消费速度的消费者。缺陷是如果kafka没有数据,则消费者会一直拉取null
消费者从partition中拉取数据,partition也会记录每个消费者拉取数据的offset用于防止消费者挂掉了。
早期该offset存储在zookeeper中,但如此一来对zookeeper压力较大,故后期kafka将offset保存在系统topic __consumer_offsets 中(该主题会被划分为50个分区进行存储),不用zk了。
__consumer_offsets 采用kv方式存储,其中k为 group_id + topic + partition,v为offset,每过一段时间,kafka就会对该主题进行压缩
offset可以自动提交,但它是隔一段时间自动提交一次,可能重复消费,所以也可以设定手动提交,手动提交也有同步和异步方式,也可以直接指定offset进行消费
消费者组
为什么要有消费者组?
如果一个消费者需要同时消费多个分区的数据,如果它轮询消费各个分区就比较慢,此时就可以为每个分区创建一个消费者,它们共同组成一个大的消费者,称为消费者组
消费者组由多个消费者组成,限制在于,消费者组中的消费者不能同时消费同一个分区的数据,事实上,可以简单将一个消费者组看作是一个消费者,一个消费者总不能同时消费一个分区的数据。如果消费者组中的消费者数量比partition都多,则有些消费者会处于空闲状态
当多个消费者指定了同一个 group-id 时,它们就处于一个消费者组
分区分配策略
如何将多个分区分配给消费者组中的多个消费者呢?kafka提供了四种策略:
-
Range:将partition和消费者分别指定序号,然后根据平均分配的策略给每个消费者分配一个范围的partition。
一个很大的问题是会产生数据倾斜:例如,如果有很多的topic需要进行分配,则第一个消费者永远都会被分配到消费任务,其压力会很大。并且,当某消费者下线后,通过再分配策略(下文重点),在Range模式下会将该消费者的消费任务统一分配给另一个消费者 -
Roundrobin:将所有topic的partition都列到一起进行编号,消费者组中的消费者也进行编号,根据partition编号的取消费者编号的模进行分配
-
Sticky:和Range策略类似,只不过不是给消费者分配一个范围的partition,而是随机选取该范围数的partition。如此一来,就算有多个topic需要分配,因为每次选择的都是随机的,所以不会让某一个消费者压力很大
再平衡
需百度补充
为了防止消费者下线后引起的消费者压力不平衡的情况,当出现以下两个条件时,kafka会为消费者组里面的消费者重新分配分区:
-
消费者45秒内未与kafka集群中的协调器(用于管理消费者分区)进行心跳连接的
-
消费者消费一条数据大于5分钟的
数据积压
消费者消费能力不足怎么办?
-
增加消费者
-
增加消费者一次拉取数据的数量(消费者可以一次拉取多条数据)