Kafka 总结

请说明什么是Apache Kafka?

Apache Kafka是由Apache开发的一种发布订阅消息系统,它是一个分布式的、分区的和重复的日志服务

如何获取 topic 主题的列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

生产者和消费者的命令行是什么?

生产者在主题上发布消息:

bin/kafka-console-producer.sh --broker-list 192.168.43.49:9092 --topicHello-Kafka

注意这里的 IP 是 server.properties 中的 listeners 的配置。接下来每个新行就是输入一条新消息。

消费者接受消息:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topicHello-Kafka --from-beginning

Kafka 都有哪些特点?

高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
可扩展性:kafka集群支持热扩展
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
高并发:支持数千个客户端同时读写

请简述下你在哪些场景下会选择 Kafka?

日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、HBase、Solr等。
消息系统:解耦和生产者和消费者、缓存消息等。
用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
流式处理:比如spark streaming和 Flink

Kafka 的设计架构你知道吗?

简单架构如下
img_7.png
详细如下
img_8.png
Kafka 架构分为以下几个部分

Producer :消息生产者,就是向 kafka broker 发消息的客户端。
Consumer :消息消费者,向 kafka broker 取消息的客户端。
Topic :可以理解为一个队列,一个 Topic 又分为一个或多个分区,
Consumer Group:这是 kafka 用来实现一个 topic 消息的广播(发给所有的 consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个 Consumer Group。
Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker上,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的id(offset)。将消息发给 consumer,kafka 只保证按一个 partition 中的消息的顺序,不保证一个 topic 的整体(多个 partition 间)的顺序。
Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是 00000000000.kafka。

Kafka 分区的目的?

分区对于 Kafka 集群的好处是:实现负载均衡。分区对于消费者来说,可以提高并发度,提高效率。

请说明什么是传统的消息传递方法?

传统的消息传递方法包括两种:

  • 队列:在队列中,一组用户可以从服务器中读取消息,每条消息都发送给其中一个人。
  • 发布-订阅:在这个模型中,消息被广播给所有的用户。

请说明Kafka相对于传统的消息传递方法有什么优势?

高性能:单一的Kafka代理可以处理成千上万的客户端,每秒处理数兆字节的读写操作,Kafka性能远超过传统的ActiveMQ、RabbitMQ等,而且Kafka支持Batch操作;

  • 可扩展:Kafka集群可以透明的扩展,增加新的服务器进集群;
  • 容错性: Kafka每个Partition数据会复制到几台服务器,当某个Broker失效时,Zookeeper将通知生产者和消费者从而使用其他的Broker。

解释下Kafka中位移(offset)的作用

在Kafka中,每个主题分区下的每条消息都被赋予了一个唯一的ID数值,用于标识它在分区中的位置。这个ID数值,就被称为位移,或者叫偏移量。一旦消息被写入到分区日志,它的位移值将不能被修改。

kafka 为什么那么快?

  • Cache Filesystem Cache PageCache缓存
  • 顺序写:由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快
  • Zero-copy:零拷技术减少拷贝次数
  • Batching of Messages:批量量处理。合并小的请求,然后以流的方式进行交互,直顶网络上限
  • Pull 拉模式:使用拉模式进行消息的获取消费,与消费端处理能力相符。

Kafka的用途有哪些?使用场景如何?

Kafka 持久化日志,这些日志可以被重复读取和无限期保留

Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性

Kafka 支持实时的流式处理

Kafka是一个分布式的消息系统,作为消息系统,他具备系统解耦,冗余存储,流量削峰,缓冲、异步通信等功能。同时他还是一个存储系统和流式处理平台,作为存储系统他能够把消息持久化到磁盘,降低数据丢失的风险;作为流式处理平台,不仅能够为各个流式处理框架提供稳定的数据来源,还提供了一些流式处理的库

在Kafka中broker的意义是什么?

在Kafka集群中,broker指Kafka服务器。
img.png

Kafka服务器能接收到的最大信息是多少?

Kafka服务器可以接收到的消息的最大大小是1000000字节。

在Kafka中,ZooKeeper的作用是什么?

目前,Kafka使用ZooKeeper存放集群元数据、成员管理、Controller选举,以及其他一些管理类任务。之后,等KIP-500提案完成后,Kafka将完全不再依赖于ZooKeeper。

“存放元数据”是指主题分区的所有数据都保存在 ZooKeeper 中,且以它保存的数据为权威,其他 “人” 都要与它保持对齐。
“成员管理” 是指 Broker 节点的注册、注销以及属性变更,等等。
“Controller 选举” 是指选举集群 Controller,而其他管理类任务包括但不限于主题删除、参数配置等。
KIP-500 思想,是使用社区自研的基于Raft的共识算法,替代ZooKeeper,实现Controller自选举。

Kafka中的ZooKeeper是什么?Kafka是否可以脱离ZooKeeper独立运行?

Zookeeper是一个开放源码的、高性能的协调服务,它用于Kafka的分布式应用。

不可以,不可能越过Zookeeper直接联系Kafka broker,一旦Zookeeper停止工作,它就不能服务客户端请求。

Zookeeper主要用于在集群中不同节点之间进行通信,在Kafka中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取,
除此之外,它还执行其他活动,如: leader检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。

解释Kafka的用户如何消费信息?

在Kafka中传递消息是通过使用sendfile API完成的。它支持将字节Socket转移到磁盘,通过内核空间保存副本,并在内核用户之间调用内核。

解释如何提高远程用户的吞吐量?

如果用户位于与broker不同的数据中心,则可能需要调优Socket缓冲区大小,以对长网络延迟进行摊销。

为什么要使用 kafka?

  • 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
  • 解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
  • 冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
  • 健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
  • 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

Kafka消费过的消息如何再消费?

kafka消费消息的offset是定义在zookeeper中的,
如果想重复消费kafka的消息,可以在redis中自己记录offset的checkpoint点(n个),当想重复消费消息时,通过读取redis中的checkpoint点进行zookeeper的offset重设,这样就可以达到重复消费消息的目的了

kafka的数据是放在磁盘上还是内存上,为什么速度会快?

kafka使用的是磁盘存储。

速度快是因为:

  • 顺序写入:因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是耗时的。所以硬盘 “讨厌”随机I/O, 喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。
  • Memory Mapped Files(内存映射文件):64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。
  • Kafka高效文件存储设计:Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。通过索引信息可以快速定位
    message和确定response的 大 小。通过index元数据全部映射到memory(内存映射文件),
    可以避免segment file的IO磁盘操作。通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

注:

  1. Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中 小的offset命名。这样在查找指定offset的
    Message的时候,用二分查找就可以定位到该Message在哪个段中。
  2. 为数据文件建 索引数据文件分段 使得可以在一个较小的数据文件中查找对应offset的Message 了,但是这依然需要顺序扫描才能找到对应offset的Message。
    为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。

请说明Kafka 的消息投递保证(delivery guarantee)机制以及如何实现?

Kafka支持三种消息投递语义:

① At most once 消息可能会丢,但绝不会重复传递
② At least one 消息绝不会丢,但可能会重复传递
③ Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户想要的

consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset,该consumer下一次再读该partition时会从下一条开始读取。
如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。
可以将consumer设置为autocommit,即consumer一旦读到数据立即自动commit。
如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。
但实际上实际使用中consumer并非读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。

  • 读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once。
  • 读完消息先处理再commit消费状态(保存offset)。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了,这就对应于At least once。
  • 如果一定要做到Exactly once,就需要协调offset和实际操作的输出。经典的做法是引入两阶段提交,但由于许多输出系统不支持两阶段提交,更为通用的方式是将offset和操作输入存在同一个地方。
    比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。
    (目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)。
    总之,Kafka默认保证At least once,并且允许通过设置producer异步提交来实现At most once,而Exactly once要求与目标存储系统协作,Kafka提供的offset可以较为容易地实现这种方式。

如何保证Kafka的消息有序

kafka 中的每个 partition 中的消息在写入时都是有序的,而且单独一个 partition 只能由一个消费者去消费,可以在里面保证消息的顺序性。但是分区之间的消息是不保证有序的。

Kafka对于消息的重复、丢失、错误以及顺序没有严格的要求。

Kafka只能保证一个partition中的消息被某个consumer消费时是顺序的,事实上,从Topic角度来说,当有多个partition时,消息仍然不是全局有序的。

kafka 的 ack 机制

request.required.acks 有三个值 0 1 -1

  • 0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候就会丢数据

  • 1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader 挂掉后他
    不确保是否复制完成新 leader 也会导致数据丢失

  • -1:同样在 1 的基础上 服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的 ack,这样数据不会丢失

kafka数据丢失问题,及如何保证?

首先需要弄明白消息为什么会丢失,对于一个消息队列,会有 生产者、MQ、消费者 这三个角色,在这三个角色数据处理和传输过程中,都有可能会出现消息丢失。
img_3.png

生产者数据传输导致的消息丢失

对于生产者数据传输导致的数据丢失主常见情况是生产者发送消息给 Kafka,由于网络等原因导致消息丢失,对于这种情况也是通过在 producer 端设置 acks=all 来处理,这个参数是要求 leader 接收到消息后,需要等到所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试。

Kafka 导致的消息丢失

Kafka 导致的数据丢失一个常见的场景就是 Kafka 某个 broker 宕机,,而这个节点正好是某个 partition 的 leader 节点,这时需要重新重新选举该 partition 的 leader。如果该 partition 的 leader 在宕机时刚好还有些数据没有同步到 follower,此时 leader 挂了,在选举某个 follower 成 leader 之后,就会丢失一部分数据。

对于这个问题,Kafka 可以设置如下 4 个参数,来尽量避免消息丢失:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本;
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个参数的含义是一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 节点。
  • 在 producer 端设置 acks=all,这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了;
  • 在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思):这个参数的含义是一旦写入失败,就无限重试,卡在这里了

消费者异常导致的消息丢失

消费者可能导致数据丢失的情况是:消费者获取到了这条消息后,还未处理,Kafka 就自动提交了 offset,这时 Kafka 就认为消费者已经处理完这条消息,其实消费者才刚准备处理这条消息,这时如果消费者宕机,那这条消息就丢失了。

消费者引起消息丢失的主要原因就是消息还未处理完 Kafka 会自动提交了 offset,那么只要关闭自动提交 offset,消费者在处理完之后手动提交 offset,就可以保证消息不会丢失。但是此时需要注意重复消费问题,比如消费者刚处理完,还没提交 offset,这时自己宕机了,此时这条消息肯定会被重复消费一次,这就需要消费者根据实际情况保证幂等性。

kafka的balance是怎么做的?

生产者将数据发布到他们选择的主题。生产者可以选择在主题中分配哪个分区的消息。
这可以通过循环的方式来完成,只是为了平衡负载,或者可以根据一些语义分区功能(比如消息中的一些键)来完成。更多关于分区在一秒钟内的使用。

Kafka 消息是采用 Pull 模式,还是 Push 模式?

生产者使用push模式将消息发布到Broker,消费者使用pull模式从Broker订阅消息。

push模式很难适应消费速率不同的消费者,如果push的速度太快,容易造成消费者拒绝服务或网络拥塞;如果push的速度太慢,容易造成消费者性能浪费。但是采用pull的方式也有一个缺点,就是当Broker没有消息时,消费者会陷入不断地轮询中,为了避免这点,kafka有个参数可以让消费者阻塞知道是否有新消息到达。

kafka的消费者方式?

consumer采用pull(拉)模式从broker中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
pull模式则可以根据consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞。

Kafka数据怎么保障不丢失?

分三个点说,一个是生产者端,一个消费者端,一个broker端

生产者数据的不丢失

kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到,其中状态有0,1,-1。

如果是同步模式:
ack设置为0,风险很大,一般不建议设置为0。即使设置为1,也会随着leader宕机丢失数据。所以如果要严格保证生产端数据不丢失,可设置为-1

如果是异步模式
也会考虑ack的状态,除此之外,异步模式下的有个buffer,通过buffer来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,如果buffer满了数据还没有发送出去,
有个选项是配置是否立即清空buffer。可以设置为-1,永久阻塞,也就数据不再生产。异步模式下,即使设置为-1。也可能因为程序员的不科学操作,操作数据丢失,比如kill -9,但这是特别的例外情况。

注:
ack=0:producer不等待broker同步完成的确认,继续发送下一条(批)信息。
ack=1(默认):producer要等待leader成功收到数据并得到确认,才发送下一条message。
ack=-1:producer得到follwer确认,才发送下一条数据。

消费者数据的不丢失

通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。

而offset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消费者在运行过程中挂掉了,再次启动的时候会找到offset的值,找到之前消费消息的位置,接着消费,由于 offset 的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。

唯一例外的情况是,我们在程序中给原本做不同功能的两个consumer组设置
KafkaSpoutConfig.bulider.setGroupid的时候设置成了一样的groupid,这种情况会导致这两个组共享同一份数据,就会产生组A消费partition1,partition2中的消息,组B消费partition3的消息,这样每个组消费的消息都会丢失,都是不完整的。为了保证每个组都独享一份消息数据,groupid一定不要重复才行。

kafka集群中的broker的数据不丢失

每个broker中的partition我们一般都会设置有replication(副本)的个数,生产者写入的时候首先根据分发策略(有partition按partition,有key按key,都没有轮询)写入到leader中,follower(副本)再跟leader同步数据,这样有了备份,也可以保证消息数据的不丢失。

采集数据为什么选择kafka?

采集层 主要可以使用Flume, Kafka等技术。

  • Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.

  • Kafka:Kafka是一个可持久化的分布式的消息队列。Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。

相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。

所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。

kafka 重启是否会导致数据丢失?

kafka是将数据写到磁盘的,一般数据不会丢失。
但是在重启kafka过程中,如果有消费者消费消息,那么kafka如果来不及提交offset,可能会造成数据的不准确(丢失或者重复消费)。

kafka 宕机了如何解决?

  • 先考虑业务是否受到影响
    kafka 宕机了,首先我们考虑的问题应该是所提供的服务是否因为宕机的机器而受到影响,如果服务提供没问题,如果实现做好了集群的容灾机制,那么这块就不用担心了。
  • 节点排错与恢复
    想要恢复集群的节点,主要的步骤就是通过日志分析来查看节点宕机的原因,从而解决,重新恢复节点。

为什么Kafka不支持读写分离?

在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:

  • 数据一致性问题:数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
  • 延时问题:类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历 网络→主节点内存→网络→从节点内存 这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历 网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘 这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

而kafka的主写主读的优点就很多了:

1.可以简化代码的实现逻辑,减少出错的可能;
2.将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;
3.没有延时的影响;
4.在副本稳定的情况下,不会出现数据不一致的情况。

Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么?

  • ISR:In-Sync Replicas 副本同步队列
  • AR:Assigned Replicas 所有副本

    ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度,当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。

AR=ISR+OSR。

分区中的所有副本统称为**AR(Assigned Replicas)
所有与 leader副本保持一定程度同步的副本(包括 leader副本在内)组成
ISR(In-Sync Replicas)**,ISR集合是AR集合中的一个子集。
消息会先发送到 leader副本,然后 follower副本才能从 leader副本中拉取消息进行同步,同步期间内 follower副本相对于 leader副本而言会有一定程度的滞后。

前面所说的“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。与 leader副本同步滞后过多的副本(不包括 leader副本)组成**OSR(Out-of-Sync Replicas)**,由此可见,AR=ISR+OSR。

在正常情况下,所有的 follower副本都应该与 leader副本保持一定程度的同步,即AR=ISR,OSR集合为空。

ISR的伸缩是指leader副本负责跟踪ISR集合中所有follower副本的滞后状态,有follower副本滞后太多的时候将他从ISR中剔除,OSR集合中有follower副本”追上“了leader副本将其加入ISR集合中

Kafka中的HW、LEO、LSO、LW等分别代表什么?

HW是 High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。

LEO是 Log End Offset的缩写它标识当前日志文件中下一条待写入消息的 offset。LEO的大小相当于当前日志分区中最后一条消息的 offset值加1。
分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW,对消费者而言只能消费HW之前的消息。
LSO(Last Stable Offset) 对未完成的事务而言,LSO 的值等于事务中第一条消息的位(firstUnstableOffset),对已完成的事务而言,它的值同 HW 相同
LW:Low Watermark 低水位, 代表 AR 集合中最小的 logStartOffset 值

Kafka中是怎么体现消息顺序性的?

通过Topic和Partition来提现,Topic是消息归类的主题是逻辑概念,Topic下有多个Partition,其在存储层面可以看成是一个可追加的日志文件,消息在被追加到Partition日志文件中的时候会分配一个特定的偏移量(offset)。offset是消息在分区中的唯一标识,Kafka通过offset来保证消息在分区内的顺序性

Kafka 如何保证消息的顺序性

在某些业务场景下,我们需要保证对于有逻辑关联的多条MQ消息被按顺序处理,比如对于某一条数据,正常处理顺序是新增-更新-删除,最终结果是数据被删除;如果消息没有按序消费,处理顺序可能是删除-新增-更新,最终数据没有被删掉,可能会产生一些逻辑错误。对于如何保证消息的顺序性,主要需要考虑如下两点:

  • 如何保证消息在 Kafka 中顺序性;
  • 如何保证消费者处理消费的顺序性。

    如何保证消息在 Kafka 中顺序性

    对于 Kafka,如果我们创建了一个 topic,默认有三个 partition。生产者在写数据的时候,可以指定一个 key,比如在订单 topic 中我们可以指定订单 id 作为 key,那么相同订单 id 的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。消费者从 partition 中取出来数据的时候,也一定是有顺序的。通过制定 key 的方式首先可以保证在 kafka 内部消息是有序的。

如何保证消费者处理消费的顺序性

对于某个 topic 的一个 partition,只能被同组内部的一个 consumer 消费,如果这个 consumer 内部还是单线程处理,那么其实只要保证消息在 MQ 内部是有顺序的就可以保证消费也是有顺序的。但是单线程吞吐量太低,在处理大量 MQ 消息时,我们一般会开启多线程消费机制,那么如何保证消息在多个线程之间是被顺序处理的呢?对于多线程消费我们可以预先设置 N 个内存 Queue,具有相同 key 的数据都放到同一个内存 Queue 中;然后开启 N 个线程,每个线程分别消费一个内存 Queue 的数据即可,这样就能保证顺序性。当然,消息放到内存 Queue 中,有可能还未被处理,consumer 发生宕机,内存 Queue 中的数据会全部丢失,这就转变为上面提到的如何保证消息的可靠传输的问题了。

Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?

分区器、序列化器、拦截器都是生产者客户端中的东西。

  • 分区器是将消息发送给指定的分区的,如果在发送的消息中指定了Partition,就不需要分区器了,默认的分区器中对key进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),同时也可以自定义分区器
  • 序列化器把消息对象转换成字节数组,这样才能够通过网络发送给Kafka。
  • 生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
  • 他们之间工作的顺序是 拦截器 -> 序列化器 -> 分区器

Kafka生产者客户端的整体结构是什么样子的?

img_1.png

Kafka生产者客户端中使用了几个线程来处理?分别是什么?

使用了两个线程来进行处理:主线程和Sender线程。

  • 主线程负责由KafkaProducer创建消息,通过拦截器、序列化器和分区器作用以后缓存到消息累加器;
  • Sender线程负责从消息累加器中获取消息并将其发送到Kafka中

“消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?如果正确,那么有没有什么hack的手段?

正确的,可以让多个消费线程消费同一个分区来hack,通过assign()、seek()等方法实现,这样可以打破原有的消费线程的个数不能超过分区的限制。

消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

提交的是offset + 1,表示下一条需要拉取的消息的位置

有哪些情形会造成重复消费?

  • 位移提交动作在消费完所有拉取到的消息后才执行会造成重复消费,因为批量拉取一批消息以后,中间消费处理的过程中可能会出现异常,这样的会前面消费的消息就会再消费一次

  • Rebalance 的时候也会出现,一个分区在原有消费者的位移没有上传的时候分配给另外一个消费者,另外一个消费会从上一次位移的地方继续拉取消息进行消费,这样就造成了消息的重复消费,后续可以通过添加Rebalance的监听器来做一些Rebalance的工作来解决位移未提交的问题

那些情景下会造成消息漏消费?

位移提交动作在消费消息之前会造成消息漏消费,因为提交新的位移以后,可能拉取到的那一批消息中间出现异常,那么那一批消息后面的那一部分消息就不会被消费到,这样就导致了消息的漏消费

KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?

第一种方式:线程封闭,每个线程实例化一个KafkaConsumer对象,一个消费线程可以消费一个或者多个分区中的消息,所有的消费线程都隶属于同一个消费者组。
第二种方式:多个消费线程同时消费同一个分区,通过assign()、seek()等方法实现,这样可以打破原有的消费线程的个数不能超过分区的限制,进一步提高消费能力。但是这种方式会导致位移提交和顺序控制的处理变得更加复杂。

什么是消费者组?

消费者组是Kafka独有的概念,即消费者组是Kafka提供的可扩展且具有容错性的消费者机制。

但实际上,消费者组(Consumer Group)其实包含两个概念,作为队列,消费者组允许你分割数据处理到一组进程集合上(即一个消费者组中可以包含多个消费者进程,他们共同消费该topic的数据),这有助于你的消费能力的动态调整;作为发布-订阅模型(publish-subscribe),Kafka允许你将同一份消息广播到多个消费者组里,以此来丰富多种数据使用场景。

需要注意的是:在消费者组中,多个实例共同订阅若干个主题,实现共同消费。同一个组下的每个实例都配置有相同的组ID,被分配不同的订阅分区。当某个实例挂掉的时候,其他实例会自动地承担起它负责消费的分区。 因此,消费者组在一定程度上也保证了消费者程序的高可用性。

img_6.png

简述消费者与消费组之间的关系

每一个消费者都属于一个消费者组中,消费者组能够消费到一个主题中的所有消息(实现多播),然后把这些消息通过Partition负载均衡分配给具体的消费者进行消费。
img_2.png

Kafka 的设计是什么样的?

Kafka 将消息以 topic 为单位进行归纳

将向 Kafka topic 发布消息的程序成为 producers.

将预订 topics 并消费消息的程序成为 consumer.

Kafka 以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个 broker.

producers 通过网络将消息发送到 Kafka 集群,集群向消费者提供消息

Kafka 如何保证高可用?

副本机制

Kafka 的基本架构组成是:由多个 broker 组成一个集群,每个 broker 是一个节点;当创建一个 topic 时,这个 topic 会被划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 只存放一部分数据。

这就是天然的分布式消息队列,就是说一个** topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据**。

在 Kafka 0.8 版本之前,是没有 HA 机制的,当任何一个 broker 所在节点宕机了,这个 broker 上的 partition 就无法提供读写服务,所以这个版本之前,Kafka 没有什么高可用性可言。

在 Kafka 0.8 以后,提供了 HA 机制,就是 replica 副本机制。每个 partition 上的数据都会同步到其它机器,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,消息的生产者和消费者都跟这个 leader 打交道,其他 replica 作为 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。Kafka 负责均匀的将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。

img_4.png

拥有了 replica 副本机制,如果某个 broker 宕机了,这个 broker 上的 partition 在其他机器上还存在副本。如果这个宕机的 broker 上面有某个 partition 的 leader,那么此时会从其 follower 中重新选举一个新的 leader 出来,这个新的 leader 会继续提供读写服务,这就有达到了所谓的高可用性。

写数据的时候,生产者只将数据写入 leader 节点,leader 会将数据写入本地磁盘,接着其他 follower 会主动从 leader 来拉取数据,follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。

消费数据的时候,消费者只会从 leader 节点去读取消息,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。

img_5.png

截断机制

如果出现 Leader 故障下线的情况,就需要从所有的 Follower 中选举新的 Leader,以便继续提供服务。为了保证一致性,通常只能从 ISR 列表中选取新的 Leader (上面已经介绍,ISR 列表中的 Follower 与原 Leader 保持同步),因此,无论 ISR 中哪个 Follower 被选为新的 Leader,它都知道 HW 之前的数据,可以保证在切换了 Leader 后,Consumer 可以继续“看到”之前已经由 Producer 提交的数据。

如下图所示,如果 Leader 宕机,Follower1 被选为新的 Leader,而新 Leader (原 Follower1 )并没有完全同步之前 Leader 的所有数据(少了一个消息 6),之后,新 Leader 又继续接受了新的数据,此时,原本宕机的 Leader 经修复后重新上线,它将发现新 Leader 中的数据和自己持有的数据不一致,怎么办呢?

为了保证一致性,必须有一方妥协,显然旧的 Leader 优先级较低,因此, 它会将自己的数据截断到宕机之前的 HW 位置(HW 之前的数据,与 Leader 一定是相同的),然后同步新 Leader 的数据。这便是所谓的 “截断机制”。

img_9.png

描述下 Kafka 中的领导者副本(Leader Replica)和追随者副本(Follower Replica)的区别

Kafka副本当前分为领导者副本和追随者副本。只有Leader副本才能对外提供读写服务,响应Clients端的请求。Follower副本只是采用拉(PULL)的方式,被动地同步Leader副本中的数据,并且在Leader副本所在的Broker宕机后,随时准备应聘Leader副本。

加分点:

  • 强调Follower副本也能对外提供读服务。自Kafka 2.4版本开始,社区通过引入新的Broker端参数,允许Follower副本有限度地提供读服务
  • 强调Leader和Follower的消息序列在实际场景中不一致。通常情况下,很多因素可能造成Leader和Follower之间的不同步,比如程序问题,网络问题,broker问题等,短暂的不同步我们可以关注(秒级别),但长时间的不同步可能就需要深入排查了,因为一旦Leader所在节点异常,可能直接影响可用性

注意:之前确保一致性的主要手段是高水位机制(HW),但高水位值无法保证Leader连续变更场景下的数据一致性,因此,社区引入了Leader Epoch机制,来修复高水位值的弊端。

分区Leader选举策略有几种?

分区的Leader副本选举对用户是完全透明的,它是由Controller独立完成的。你需要回答的是,在哪些场景下,需要执行分区Leader选举。每一种场景对应于一种选举策略。

  • OfflinePartition Leader选举每当有分区上线时,就需要执行Leader选举。所谓的分区上线,可能是创建了新分区,也可能是之前的下线分区重新上线。这是最常见的分区Leader选举场景。
  • ReassignPartition Leader选举当你手动运行kafka-reassign-partitions命令,或者是调用Admin的alterPartitionReassignments方法执行分区副本重分配时,可能触发此类选举。假设原来的AR是[1,2,3],Leader是1,当执行副本重分配后,副本集合AR被设置成[4,5,6],显然,Leader必须要变更,此时会发生Reassign Partition Leader选举。
  • PreferredReplicaPartition Leader选举:当你手动运行kafka-preferred-replica-election命令,或自动触发了Preferred Leader选举时,该类策略被激活。所谓的Preferred Leader,指的是AR中的第一个副本。比如AR是[3,2,1],那么,Preferred Leader就是3。
  • ControlledShutdownPartition Leader选举:当Broker正常关闭时,该Broker上的所有Leader副本都会下线,因此,需要为受影响的分区执行相应的Leader选举。

这4类选举策略的大致思想是类似的,即从AR中挑选首个在ISR中的副本,作为新Leader。

Kafka的哪些场景中使用了零拷贝(Zero Copy)?

在Kafka中,体现Zero Copy使用场景的地方有两处:基于mmap的索引和日志文件读写所用的TransportLayer。

先说第一个。索引都是基于MappedByteBuffer的,也就是让用户态和内核态共享内核态的数据缓冲区,此时,数据不需要复制到用户态空间。不过,mmap虽然避免了不必要的拷贝,但不一定就能保证很高的性能。在不同的操作系统下,mmap的创建和销毁成本可能是不一样的。很高的创建和销毁开销会抵消Zero Copy带来的性能优势。由于这种不确定性,在Kafka中,只有索引应用了mmap,最核心的日志并未使用mmap机制。

再说第二个。TransportLayer是Kafka传输层的接口。它的某个实现类使用了FileChannel的transferTo方法。该方法底层使用sendfile实现了Zero Copy。对Kafka而言,如果I/O通道使用普通的PLAINTEXT,那么,Kafka就可以利用Zero Copy特性,直接将页缓存中的数据发送到网卡的Buffer中,避免中间的多次拷贝。相反,如果I/O通道启用了SSL,那么,Kafka便无法利用Zero Copy特性了。

当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?

创建:在zk上/brokers/topics/下节点 kafkabroker会监听节点变化创建主题
删除:调用脚本删除topic会在zk上将topic设置待删除标志,kafka后台有定时的线程会扫描所有需要删除的topic进行删除

Kafka 分区数可以增加或减少吗?为什么?

我们可以使用 bin/kafka-topics.sh 命令对 Kafka 增加 Kafka 的分区数据,但是 Kafka 不支持减少分区数。
Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,还是保留?删除的话,那么这些没消费的消息不就丢了。如果保留这些消息如何放到其他分区里面?追加到其他分区后面的话那么就破坏了 Kafka 单个分区的有序性。如果要保证删除分区数据插入到其他分区保证有序性,那么实现起来逻辑就会非常复杂。

topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?

可以

topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?

不可以

创建topic时如何选择合适的分区数?

根据集群的机器数量和需要的吞吐量来决定适合的分区数

Kafka目前有那些内部topic,它们都有什么特征?各自的作用又是什么?

__consumer_offsets 以下划线开头,保存消费组的偏移

优先副本是什么?它有什么特殊的作用?

优先副本 会是默认的leader副本 发生leader变化时重选举会优先选择优先副本作为leader

Kafka有哪几处地方有分区分配的概念?简述大致的过程及原理

创建主题时
如果不手动指定分配方式 有两种分配方式

消费组内分配

怎么解决rebalance中遇到的问题呢?

要避免 Rebalance,还是要从 Rebalance 发生的时机入手。我们在前面说过,Rebalance 主要发生的时机有三个:

  • 组成员数量发生变化

  • 订阅主题数量发生变化

  • 订阅主题的分区数发生变化

后两个我们大可以人为的避免,发生rebalance最常见的原因是消费组成员的变化。

消费者成员正常的添加和停掉导致rebalance,这种情况无法避免,但是时在某些情况下,Consumer 实例会被 Coordinator 错误地认为 “已停止” 从而被“踢出”Group。从而导致rebalance。

当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经 “死” 了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。这个时间可以通过Consumer 端的参数 session.timeout.ms进行配置。默认值是 10 秒。

除了这个参数,Consumer 还提供了一个控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。

除了以上两个参数,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 “离开组” 的请求,Coordinator 也会开启新一轮 Rebalance。

通过上面的分析,我们可以看一下那些rebalance是可以避免的:

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的。这种情况下我们可以设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽量避免rebalance的出现。(以下的配置是在网上找到的最佳实践,暂时还没测试过)

设置 session.timeout.ms = 6s。设置 heartbeat.interval.ms = 2s。要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer,早日把它们踢出 Group。

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。

总之,要为业务处理逻辑留下充足的时间。这样,Consumer 就不会因为处理这些消息的时间太长而引发 Rebalance 。

请谈一谈 Kafka 数据一致性原理

一致性就是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。

img_10.png

假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。

这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。

当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。

谈一谈 Kafka 的再均衡

在Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:
第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。
第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。
所以对于Rebalance来说,Coordinator起着至关重要的作用

参考文章

评论