前言
对于一家网络公司来说,可能有大量的数据产生,例如用户登录、浏览、点击等等事件,或者系统内部的服务调用、错误信息、服务器负载等等日志信息。这些数据可以用于搜索相关、推荐系统、广告相关等等,然而这些数据很多时候是和应用耦合在一起的。同时这些额外的日志数据,可能数据量比本身应用的数据还多,Facebook 每天都能从用户活动事件中产生 6TB 的数据。
传统的方法将这些数据存储到数仓中,后续进行离线的分析和计算,但是实时的分析和计算是需要的,所以 kafka 的提出就是为了解决这一需求。他提供了一种分布可拓展的高吞吐量实时消息队列,使得实时的数据分析成为可能。
总结地来说,Kafka 需要有以下特点:
- 高吞吐以支持海量数据流
- 低延时以满足部分高要求的场景
- 支持分区、分布式,以充分利用集群的性能
- 要有容错性,以应对分布式环境的常见问题
相关工作
例如 IBM 的 Websphere MQ 具有原子性保证和 JMS 有 ack 确认,但是在这样的场景中,一致性并不需要那么强的保证,丢失一些用户的点击数据并不会造成什么严重的后果。为了保证一致性,会导致系统复杂度上升,同时也限制了性能。
另外这些方法也没有很好地支持分布式和高吞吐,对于堆积的消息也没有很好的处理方法,当消息没有及时被消费产生堆积时,性能就会急剧下降。
实际上有一些系统提出来收集大量的日志信息,例如 Facebook 的 Scribe,将日志信息存储至一系列的 Scribe 节点,随后写入到 HDFS 中提供后续的分析。Flume 通过 pipes 和 sinks 可以灵活的处理流式的数据,同时也支持了分布式。但是这些方法通常是以推送(push)的方式将日志文件推送到各个节点上,但是也许拉取(pull)的方式更适合,每个节点主动去垃取自己可以处理的消息,从而不会向某个节点推送过多的内容,导致其无法处理。
kafka 结构
kafka 的流程中有以下几个角色:
- producer:数据的生产者,产生一些日志文件,并推送到指定的 topic 上
- topic:某一种流式的数据,生产者可以向一个 topic 发布数据
- broker:topic 中发布的数据实际存储的地方,会有多个 broker 服务器
- consumer:可以订阅一个或多个 topic,实际上就是订阅 broker 服务器,消费者从 broker 服务器中拉取进行消费
Kafka 应用
Kafka 中有四个核心的 API 来实现不同的应用场景:
- Producer API:允许一个应用程序发布一串流式数据到一个或多个 Kafka topic 上
- Consumer API:允许一个应用程序订阅一个活多个 topic,并且对发布给它们的流式数据进行处理
- Streams API:允许一个应用程序作为一个流处理器,消费一个或多个 topic 产生的输入流,然后生产一个输入流到一个或多个 topic 中,实际上是对流式数据进行某种中间处理
- Connector API:允许接入一些现有的系统,例如 Kafka 消息写入到 Mysql 中(消费者角色),或者 Mysql 的一些 log 文件作为实时消息发布到 Kafka 中(生产者角色)
单分区单消费者设计
kafka 中的 topic 的每一个分区都对应一个逻辑上的日志,物理上,一个日志可能划分成多个文件,这些文件差不多都是一个大小。每次生产者向该分区发布一个消息,broker 只会将该信息 append 到最后一个文件后面。为了更好的性能,只有当划分的文件数达到一定数量或过去一定时间之后才会写入到磁盘中,且只有消息写入磁盘之后,才对消费者可见。
每个消息在 kafka 中并没有一个消息 id,消息本身是使用 offset 来进行区分,也就是对于一个逻辑日志,其偏移量是多少,就可以唯一确定一个消息。这样可以避免维护 ID 及 ID 到实际偏移量的开销。在 kafka 中,id 指的就是 offset,二者是同一个概念。为了计算下一个消息的 offset,需要中当前 offset 的基础上加上当前消息的长度,然后才能获取到下一个消息的 offset。
消费者总是从一个特定的分区中顺序地消费消息,如果一个消费者确定了一个特定的消息偏移量,则默认在此前所有的偏移量的消息都被收到了。消费者实际上是无法异步拉取消息存在本地并整理成有序的,消费者必须向 broker 指定偏移量和要获取的数据长度,broker 本地会存有一个有序的 offset 列表,然后根据所需的 offset 和长度返回给消费者。消费者根据 offset 和当前消息的长度自己计算下一个要获取的 offset。
另一个不太常规的点是 kafka 尽量避免将消息缓存在内存中,其依赖底层文件系统的页面缓存,这可以避免双重缓存,消息只会缓存在页面缓存中,同时也减轻了 GC 的开销。
这里页面缓存的意思是操作系统会将常访问到的文件缓存在内存中,当应用程序请求文件时,如果该文件的数据页面在缓存中,则可以直接获取。由于页面缓存是系统级别的,所以即使进程重启,页面缓存中的消息仍然存在。而且生产者和消费者是顺序访问文件的,生产和消费的延迟也较小,可以很高效地利用操作系统的缓存。
broker 是无状态的,这样可以减少 broker 对消息的管理开销,但是也带来了问题,例如如何删除一个消息,broker 怎么知道所有订阅者都消费了那个消息呢?kafka 通过一个简单的超时机制来实现,当一个消息停留超过一个时间阈值,则删除,例如 7 天。这样的设计有另一个好处,消费者可以回头再消费一次之前的消息。当程序存在错误时,可以在修复之后重新消费一次,除此之外,如果消费者宕机,可以从新恢复,再消费没保存的那些数据。
消息副本
为了容错考虑,broker 中的每个分区可以有多个副本,其中每个分区都存在一个 leader,leader 会负责所有的写请求,读请求可以发送给 leader,也可以发送给 follower,follower 会不断地从 leader 中同步数据,尽可能达成一致。
当 leader 宕机时,会基于 zookeeper 从 follower 中选取出一个新的 leader,并生成一个新的副本,以此来实现副本数不减少。
仅有当前所有副本都已经将消息写入其日志中,才视为提交成功,才对消费者可见,这样就避免了 leader 宕机可能导致消息丢失的问题。对于生产者而言,可以选择等待消息提交,也可以不等待,取决于具体的应用场景。
分布式协调
每个生产者可以将消息推送到随机的分区或由分区键和分区函数确定的分区上。那么消费者如何消费数据呢?
kafka 有消费组的概念,每个消费组包含一个或多个消费者来消费一个订阅的 topic 集合,每个消息只会传递到消费组中的一个消费者。不同的消费组都是独立的,不需要考虑不同组之间的协调。目标是将存储在 broker 中的消息均匀地划分给消费者,同时不引入太多的额外开销。
我们第一个决定是对一个 topic 实现最小并行度单位的分区,这意味着任何时间,来自一个分区的所有消息只会被消费组中的一个消费者消费(可以有多个消费组),如果允许一个组内的多个消费者同时消费某个分区,他们必须协调谁消费什么消息,使用锁或状态来协调。在 kafka 中,只有消费者需要调整负载均衡时,才需要相互协调,但是这个操作并不频繁。为了真正实现负载均衡,一个 topic 的分区数量要远比一个消费组内的消费者多。
第二个决定是放弃 master 节点,而是让消费者自己协调。使用 master 设计需要担心 master 的宕机。kafka 使用 zookeeper 来检测 broker 和消费者的添加和删除;重新进行负载均衡,当上述情况发生。维护每个分区的消费 offset。
broker 向 zookeeper 注册主机名和端口号,和一系列 topic 和其分区。消费者注册其所在的消费组,消费组决定了该消费者应该消费哪些 topic。消费组会维护两个列表,一个是内部消费者和消费分区的映射,第二个是消费 offset。
如何看待基于磁盘的持久性
对磁盘不同的使用方法,可能会导致磁盘的速度慢的超出你的想象,也可能快的超出你的想象,在正确设计的情况下,磁盘可以提供比肩网络的速度,并不会成为一个瓶颈。
磁盘的顺序读写性能最多可以达到随机读写性能的上千倍,顺序读写通常更好预测,同时也能被操作系统优化得更好。
在磁盘中持久化数据,通常可以用到 B 树类似的数据结构,这是因为 B 树的查询时间复杂度可以达到 ,然而这类数据结构在硬盘中是随机访问的,即使查询次数更少,开销可能仍然很大。随着数据的增大,B 树的性能是超线性增长的。而线性存储的结构,虽然查询可能很慢,但是在面对大容量储存时,可以很容易地利用磁盘的大容量特性,单机数据量可以达到 TB 以上。同时也因为数据存在持久化的磁盘中,消费的内容可以留存,消费者可以消费很久之前的数据。
效率中的考量
有两个可能的因素导致效率变差,过多的小 IO 操作,过多的数据拷贝
为了避免过多的小 IO 操作,message set 的概念被提出来,这可以使多个消息打包成一个组一次发送,而不是很多小消息一个一个发送,生产者和消费者一次生产和消费一个组的数据,来避免频繁的小数据 IO。
通过这样的方式,更大的网络包、更大的顺序磁盘读写、更大的连续内存块,都可以使 Kafka 达到更高的效率。
对于数据拷贝,消息日志只是存储消息的文件名和目录,同时操作系统对于储存在 pagecache 中的数据发往网络有高度的优化,例如 Linux 中的 sendfile
,将四次拷贝的操作降低至两次。
Batch Compression
在一些情况下,瓶颈不出现在 CPU 或者磁盘上,而是在网络上,在这种情况下,Kafka 可以对一组数据实现高效的压缩,以减少网络带宽的压力。
broker 无状态的考量
虽然可以实现记录消费的数据,并删除被消费的数据,使得 broker 中的数据变少。但是让 broker 和 consumer 在是否消费这件事上达成一致有困难。
一个简单的方法是 broker 在发送消费数据之后就认为消费了,随后删除这个数据,但是如果 consumer 宕机或消息在网络中丢失,这个消息就永远不会再被消费了。一个简单的解决方法是让 consumer 在消费之后返回给 broker 一个 ack,broker 在收到这个 ack 之后才真正删除消息。
但是这样的方法也有问题,如果 consumer 消费这个数据之后,还来不及发送 ack 就宕机,那么这条消息在 broker 中仍然是未被消费的,最后这条消息就会被消费两次。另外,这样的机制使得 broker 要维护所有消息的状态,为系统带来了额外的复杂度。
Reference
[1] https://kafka.apache.org/documentation
[2] https://notes.stephenholiday.com/Kafka.pdf