Kafka是现在业界选择比较多的一种消息队列解决方案。我这里主要也是选Kafka作为消息队列这块需求的解决方案。下面的技术调研会先从消息队列本身开始,然后做一个横向比较,最后再聚焦到Kafka本身上。
调研用的Kafka版本为:
2.2.0
官方文档:
一本中文手册,版本较老,看看范例代码尚可:Apache kafka中文手册。
在聊Kafka之前,其实更需要关注下消息队列本身。什么需求下需要使用消息队列、消息队列的功能点有哪些、消息队列的模式有哪些、消息队列的实现难点有哪些,等等。如果什么都不了解,那技术选型就无从谈起了,也没办法理解为什么消息队列要做好是非常困难的一件事情。展开了说,也就没办法举一反三,理解其他软件系统的设计精髓了。
下面会简单列举下消息队列的一系列知识点,细节内容这里就不展开了,篇幅太大。
消息队列的需求:
消息队列的模型:
消息队列的投递模式:
消息队列的性能指标:
消息队列的投递策略:
消息队列的功能性:
几篇值得一读的博文:
一篇通论形式的消息队列设计博文,讲解了一些消息队列通用的知识点,值得一读。
本质上也是一篇对于消息队列进行设计和讲解的博文,但结合了美团公司内部的实际项目经验,对于大型系统的设计者有更好的贴近应用场景的指导作用。
接上一篇的博文,更多讲到了性能和高级功能相关的实现以及性能的优化。
排版比较粗糙,但内容非常不错。以贴近项目经验的内容讲解了RabbitMQ和Kafka的:
消息队列的选型一直都是比较困难的,业界现在比较主流的选择有:ActiveMQ、RabbitMQ、Kafka、RocketMQ、ZeroMQ 这几种。
有几篇不错的博文可以看下:
消息中间件选型分析:从 Kafka 与 RabbitMQ 的对比看全局
这篇时间还算比较接近2018年5月2日
,主要是以Kafka和RabbitMQ为范例进行了消息队列选型的要点讲解。虽然讲解的时候使用了Kafka以及RabbitMQ作为范例,但实际上讲解的是通用的消息队列选型要点。结合本文上一节所讲到的消息队列的一系列知识点,应该能有更深刻的理解。
滴滴出行基于 RocketMQ 构建企业级消息队列服务的实践
这篇时间稍微有点久,文中提到的Kafka版本还是0.8左右的(虽然文章的发布时间倒是不久之前),所以文中提到的性能指标之类的仅可作为参考。这篇文章是对Kafka和RocketMQ做的一个技术选型比较,文章中有相当多的性能指标比较,并结合滴滴公司的实际情况讲解了下消息队列系统进行替换时候所做的Migration工作,非常有借鉴价值。此外,文中还有一小节讲解了下RocketMQ使用的经验。
Why is Kafka pull-based instead of push-based?
为什么Kafka消息队列使用的是Pull投递模式。
Modern Open Source Messaging: NATS, RabbitMQ, Apache Kafka, hmbdc, Synapse, NSQ and Pulsar
老外写的一篇,提到了不少国内较少提到的开源项目,可以作为参考。
关于Kafka和后起新秀Pulsar的横向比较,可以看:
大略的要点:
老样子从基本的概念开始梳理:
broker
:kafka集群中的任意一台提供服务的物理服务器即被称为brokertopic
:消息队列的一个队列,一般做架构的时候按需求将不同的消息分别派送到不同的管道里,而其中的任意一个管道在kafka里就是一个topicpartition
:topic下的概念,任意一个topic根据启动时的设定,会分成多个partition,而每个partition会由集群安排在某一台物理机broker上,以此做到集群的横向扩展offset
:partition下的概念,topic被分为很多partition,而每个partition则都会存储消息,每个partition存储的消息都是从0开始进行标号,而这个每条消息一个的标号,即为offset;可以理解为消息在partition里的唯一id;不同的partition都是从0开始标号,因此offset只对其自身的partition生效,不可混用replica
:某个partition的完整备份(在接收写入请求时数据可能会落后于leader),用在集群的高可用服务上,同时也可以提供partition读操作的负载均衡producer
:向topic里输入消息数据的角色consumer
:从topic里消费消息数据的角色consumer group
:可扩展且具有容错性的consumer机制,多个consumer共享一个group id,组内的所有consumer一起来消费topic的所有partition;但一个partition仍旧只能由一个consumer进行消费;主要是为了解决consumer和partition配对balance的问题,将consumer做成组由集群自动进行平衡并分配到partition关于consumer group,更多的可以看:Kafka消费组(consumer group)。
一张图基本上就能说明问题:
四台
broker3个
partition:P1_leader
(broker1)、P2_leader
(broker2)、P3_leader
(broker4)P1_follower
(broker2、broker3)、P2_follower
(broker3、broker4)、P3_follower
(broker1、broker2)恢复
选举
产生,成为新的leaderKafka在分布式解决方案上也有痛点,主要是每个partition的offset问题。
为了性能,kafka将一个partition视作一个管道,拥有一个从0开始的offset。这样做的好处就是不再需要在每一个消息体上做metadata存放该消息是否被消费掉等信息,而是通过partition的offset来标记整个管道中的消息被消费到了哪里。消息体更简单,流程上也更简单,不会出现乱序的消费情况。但也正是因为这种设计,导致每个partition无法由多个consumer并发消费,每个partition绑定只能同时允许一个consumer消费。
因此,从机制上来说,kafka并不适合用来做consumer重消耗(CPU)类型的消息通道。虽然partition可以做很多,极端点来说可以做到和consumer实际需求1:1的配比,但超过适配当前需求的partition数量设置也会造成吞吐量的下降(参见:6.2.4 场景4)。
consumer和partition在进行消息是否被消费掉的确认行为上,有两种方式:
enable.auto.commit=true
auto.commit.interval.ms
来定,这种方式被称为at most once
enable.auto.commit=false
at least once
consumer.commitSync()
,手动通知partition更新offset关于partition和consumer之间的关系,更多可以看下:
How multiple consumer group consumers work across partition on the same topic in Kafka?
If you want to have more consumers than partitions and still have performance enhancement and process only each message once, then you should increase the number of partitions in the topics so that there are at least as many partitions as consumers. Often topics are created with 2 times as many partitions needed to start, just so more consumers can be added later if needed without having to repartition the topic.
1个partition只能被同组的一个consumer消费,同组的consumer则起到均衡效果
以及自动提交相关:Understanding the ‘enable.auto.commit’ Kafka Consumer property。
如果因某些原因,之前设计的partition分片数量不够导致能够上工的consumer数量不够,且consumer的消费速率跟不上的时候,就会发生消息堆积。如果堆积的情况比较严重的话,就需要程序介入处理。
思路如下:
这样老的topic里的消息就全部分发到新的topic里了,且新的topic的partition数量很高,可以挂载很多consumer处理堆积的消息。
监控方面,datadog的几篇文章非常不错,值得一读,不过版本有点老了,文章是2016年的:
详细的指标列表及其含义,可以看这篇:Monitoring Kafka。
Prometheus的Exporter分为两块:
此外,还有一个confluent公司的grafana dashboard配置,没找到对应的labs dashboard地址,但该配置文件本身也有参考价值:kafka-confluent-platform/grafana-kafka-dashboard.json。
可参考的jmx_exporter配置:
实际运行范例参见:
Exporter方面,jmx_exporter和kafka_exporter同时使用,两者侧重不同。jmx_exporter主要用来进行JVM监控,针对java runtime自身的状态进行监控,而kafka_exporter则针对kafka运行状况进行监控(其实只要不怕麻烦自己处理dashboard,则jmx_exporter就完全足够了)。
jmx_exporter的部署有两种选择,一是独立的容器进行部署,好处是可观察性强,且不会对kafka容器造成影响,缺点是部署更繁杂,配置内容更多;二是修改kafka容器,添加几项配置项,就可以直接在kafka容器内启动一个java进程,进行metrics输出。这里实践使用的是第二种方法,配置修改如下:
kafka_1:
image: ...
...
volumes:
- /private/tmp/jmx_prometheus_javaagent-0.9.jar:/usr/local/bin/jmx_prometheus_javaagent-0.9.jar
- /private/tmp/jmx-kafka-2_0_0.yaml:/etc/jmx-exporter/jmx-kafka-2_0_0.yaml
- ...
networks:
...
ports:
- "19092:9092" # client port
- "17071:7071" # jmx prometheus metrics
expose:
- "9092" # client port
- "9093" # internal traffic
- "9991" # jmx
- "7071" # jmx prometheus metrics
restart: "always"
logging:
<<: *KAFKA_LOGGING_DEFAULTS
environment:
...
JMX_PORT: 9991
KAFKA_OPTS: "-javaagent:/usr/local/bin/jmx_prometheus_javaagent-0.9.jar=7071:/etc/jmx-exporter/jmx-kafka-2_0_0.yaml"
<<: *KAFKA_ENV_DEFAULTS
JMX_PORT
告知kafka在启动的时候需要一并启动jmx,KAFKA_OPTS
告知kafka在启动时一并需要启动jmx_exporter,而这个exporter的jar包以及配置文件则是通过bind mount放入容器内的。
配置文件主要来自于:kafka-grafana/jmx-exporter/kafka-2_0_0.yml。此外,因为使用通用jmx dashboard:JVM dashboard来进行数据展示,该配置文件中也放入了部分这个dashboard要求的数据转换规则:
lowercaseOutputLabelNames: true
rules:
# JMX common
- pattern: 'java.lang<type=OperatingSystem><>(committed_virtual_memory|free_physical_memory|free_swap_space|total_physical_memory|total_swap_space)_size:'
name: os_$1_bytes
type: GAUGE
attrNameSnakeCase: true
- pattern: 'java.lang<type=OperatingSystem><>((?!process_cpu_time)\w+):'
name: os_$1
type: GAUGE
attrNameSnakeCase: true
...
此外还有prometheus集群采集添加以及grafana dashboard导入,都是常规操作,就不多说了。
在之前的4.2 集群中,已经介绍过了Kafka的集群模式,可以看到Kafka天然就是分布式的,而且在设计之初就考虑到了集群的高可用。本文之前章节内关于集群的内容偏简单了,只介绍了大致的设计。如有需要更深入了解分布式和高可用的细节,则可以阅读唯品会的:Kafka 数据可靠性深度解读,这篇讲解非常透彻,基本上可以当经典来看了。
Kafka的高可用配置,有几个配置项控制了集群的安全程度。这里需要记住一点,整个集群越安全集群的吞吐量就越低,这是等价交换的,不可能不付出任何代价就获得数据的安全性。唯品会的文章讲解的细节非常多,但我们真正执行操作的时候可以只知其然,懂得哪几个配置项需要操作即可:
replication.factor
:每个partition会拥有多少个replicamin.insync.replicas
:partition的leader节点要求当前在线的replica节点的数量,如果实际在线数量少于这个数值,客户端的请求会被拒绝:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required
unclean.leader.election.enable
:是否允许out-of-sync replica成为leadersync
拥有很好的安全性async
拥有更高的吞吐量,可以批量进行消息发送1(默认)
:partition的leader节点收取完成即反馈,如果在follower同步数据之前leader宕机则数据丢失0
:producer不等待kafka服务器反馈即直接进行后续发送,拥有最高的吞吐量以及最低的安全性-1
:partition的所有节点包括了leader以及所有的follower全部都确认数据同步完成再反馈,最高的安全性以及最低的吞吐量要保证数据写入到Kafka是安全的,高可靠的,需要如下配置:
这里的内容全部是转载自唯品会的:Kafka 数据可靠性深度解读。网上的资料很容易失效,因此在这里做下转载。
Benchmark的Kafka版本是:
0.10.1.0
Kafka broker 用到了 4 台机器,分别为 broker[0/1/2/3] 配置如下:
broker 端 JVM 参数设置:
-Xmx8G -Xms8G -server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/apps/service/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999
客户端机器配置:
测试不同的副本数、min.insync.replicas 策略以及 request.required.acks 策略(以下简称 acks 策略)对于发送速度(TPS)的影响。
具体配置:
具体测试数据如下表(min.insync.replicas 只在 acks=-1 时有效):
测试结果分析:
下面将 partition 的个数设置为 1,来进一步确认下不同的 acks 策略、不同的 min.insync.replicas 策略以及不同的副本数对于发送速度的影响,详细请看情景 2 和情景 3。
在 partition 个数固定为 1,测试不同的副本数和 min.insync.replicas 策略对发送速度的影响。
具体配置:
测试结果如下:
测试结果分析:
副本数越高,TPS 越低(这点与场景 1 的测试结论吻合),但是当 partition 数为 1 时差距甚微。min.insync.replicas 不影响 TPS。
在 partition 个数固定为 1,测试不同的 acks 策略和副本数对发送速度的影响。
具体配置:
测试结果如下:
测试结果分析(与情景 1 一致):
测试不同 partition 数对发送速率的影响
具体配置:
测试结果:
测试结果分析:
partition 的不同会影响 TPS,随着 partition 的个数的增长 TPS 会有所增长,但并不是一直成正比关系,到达一定临界值时,partition 数量的增加反而会使 TPS 略微降低。
通过将集群中部分 broker 设置成不可服务状态,测试对客户端以及消息落盘的影响。
具体配置:
具体测试数据如下表:
出错信息:
测试结果分析:
测试单个 producer 的发送延迟,以及端到端的延迟。
具体配置:
测试数据及结果(单位为 ms):
实验使用的还是下载下来的Kafka,版本如头部申明的是2.2.0
。范例代码可以在Github查看:dist-system-practice/experiment/kafka/。
一般来说部署一个以上的docker容器组成集群,可以使用bash脚本的方法,自己编写命令一个个启动,但更好的方法是使用docker-compose
命令,读取配置文件,将整个集群按设定的模式启动起来。可运行范例可以查看:dist-system-practice/conf/dev/kafka-cluster.yaml。
启动脚本:dist-system-practice/bash/dev/docker_kafka.sh。
主要有几个点需要注意下。
services:
kafka_1:
image: "wurstmeister/kafka:2.12-2.2.0" # 一定需要
hostname: "kafka_1"
container_name: "kafka_1" # 使用这个名字来让同network的其他服务进行访问
depends_on:
- "zookeeper" # 保证启动先后顺序
volumes:
- /private/tmp/jmx_prometheus_javaagent-0.9.jar:/usr/local/bin/jmx_prometheus_javaagent-0.9.jar
- /private/tmp/jmx-kafka-2_0_0.yaml:/etc/jmx-exporter/jmx-kafka-2_0_0.yaml
- kafka_vol_1:/tmp/kafka/data
networks:
- "net"
ports:
- "19092:9092" # client port # 保证非同一network的其他服务也能访问,但这么做要求每个kafka service都监听不同的端口,相互不能冲突
- "17071:7071" # jmx prometheus metrics
expose:
- "9092" # client port
- "9093" # internal traffic # kafka集群内部流量走的端口号
- "9991" # jmx # jmx进程本身监听的地址,提供给jmx_exporter连接用
- "7071" # jmx prometheus metrics # jmx_exporter监听的地址,提供给prometheus抓取metrics
restart: "always"
logging:
driver: "json-file" # 日志输出以json格式,容易让聚合工具使用
options:
max-size: "512m" # 保证日志最大尺寸不会过大占用过多系统磁盘
environment:
KAFKA_LISTENERS: "INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092" # kafka的实际监听地址,一般不是0.0.0.0就是容器名
KAFKA_ADVERTISED_LISTENERS: "INSIDE://kafka_1:9093,OUTSIDE://127.0.0.1:19092" # kafka发布到zookeeper,让客户端获取并凭之连接kafka的地址,一定需要是客户端能访问到的地址,INSIDE部分供集群内部流量使用,OUTSIDE供客户端使用
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT" # 明确内部外部的通讯协议
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE # 告诉kafka集群上面的INSIDE和OUTSIDE配置哪个才是供集群内部流量使用的,按语义一般是INSIDE
KAFKA_BROKER_ID: 0 # 必须每个kafka broker单独一个,不可重复
JMX_PORT: 9991 # jmx监听的地址
KAFKA_OPTS: "-javaagent:/usr/local/bin/jmx_prometheus_javaagent-0.9.jar=7071:/etc/jmx-exporter/jmx-kafka-2_0_0.yaml" # kafka在启动时一并会给予启动
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" # 告知kafka zookeeper的地址
KAFKA_LOG_DIRS: "/tmp/kafka-logs" # kafka实际的数据存放位置,推荐使用volume进行该地址的local映射
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 # kafka内部用来保存topic的offset的数据,其replication数量
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 # kafka内部用来保存transaction state log的数据,其replication数量
KAFKA_MIN_INSYNC_REPLICAS: 1 # partition的leader节点要求当前在线的replica节点的数量
关于LISTENER
相关的几个配置(用于集群访问)更多的可以看下:8.2 对容器外应用提供服务。
参见:
在docker中部署集群有一个比较大的问题就是容器内部和外部的网络访问地址是隔离的,而kafka集群部分节点的相互发现是根据配置:KAFKA_ADVERTISED_LISTENERS
来进行通知的。
如果该配置内的地址填写的是127.0.0.1
这样的回环地址或容器外的IP,那么容器内各kafka节点之间的通讯就会有问题(因为容器内各节点拿到的配置地址都是容器外的IP)。而如果将该配置内的地址都配置成docker容器的名字,那么在容器内部的流量是没问题了,但外部访问kafka服务的应用程序拿到的地址则是容器的名字,就没法访问了。
所以对于需要向容器外部环境的应用提供服务的情况来说,需要做好几项配置的调整(这里说的配置都是kafka的配置,不涉及到zookeeper的配置,事实上集群部署的配置中zookeeper相关的内容非常简单):
KAFKA_LISTENERS: "INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092"
KAFKA_ADVERTISED_LISTENERS: "INSIDE://kafka_1:9093,OUTSIDE://127.0.0.1:19092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
kafka的配置中,现在可以将内部流量和外部流量的监听地址分离开来,在配置LISTENERS
相关的配置时,使用,
将INSIDE
和OUTSIDE
隔离开来即可。一般来说,INSIDE
的地址会配置成当前节点的容器名,或者就直接设置成0.0.0.0
。而OUTSIDE
则根据自身需求设置即可,这里的范例是给本机开发测试用,因此设置的是127.0.0.1
。
KAFKA_INTER_BROKER_LISTENER_NAME
配置项则是告诉kafka,集群各broker节点之间是使用INSIDE还是OUTSIDE配置作为集群内部流量的通讯地址。此外需要注意的是,该配置的INSIDE
和OUTSIDE
监听的端口是不可以重复
的,因为实际上kafka就是开了两个端口都在监听。
一般第一次在本地设置集群进行开发和调试的时候,这里是个大坑,非常麻烦。配置不正确就会发现kafka完全无法通讯。
一般来说,kafka service的配置不适合在配置文件中只设置一份,然后使用docker-compose scale的方法进行横向扩展。
适用scale的场景只有完全无状态,且各个节点之间的配置完全相同的情况,而kafka各节点一般最好设置自身的container_name
,然后在监听地址中使用正确的容器名作为地址,来进行相互通讯。
因为无法使用scale的缘故,必须手写大量的kafka service节点yaml配置,这倒也不算是大问题。关键的问题在于有大量重复的配置项,如果后面要修改,就很麻烦,怕漏改或者改错。
因此最好的方法是使用yaml的配置重用,来设置共通的配置,然后像变量一样重用。这可以参见我之前提到的范例配置文件:
x-kafka-environment-defaults: &KAFKA_ENV_DEFAULTS
...
# 在这里放默认的配置
...
services:
kafka_1:
...
environment:
...
<<: *KAFKA_ENV_DEFAULTS # 这样就把作为变量的配置全部导入到这个点上了
在进行测试的过程中,发现写入的速度比较慢,基本上每次写入在kafka集群这边会耗时1秒多,完全不能接受了。之前的博文中也提到了,集群安全性和写入性能是反比的,主要有几项因素:
后续进行了一些研究和测试。
集群共3个broker,topic分片如下:
Topic:work-topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: work-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: work-topic Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: work-topic Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
查了下官方配置,producer.acks
这个现在放在3.3 Producer Configs
里,命名为acks
(链接)。需要注意的是,这一项配置并不是服务端的配置项,而是连接上来的客户端producer所进行的设置,是否acks根据客户端producer的要求而变动。这很反直觉,我一开始一直都以为这是服务端的配置项,没想到居然是客户端的。
acks在kafka-go的使用,可以查看注释:kafka-go/writer.go,默认是-1,也就是最安全但最慢的那种。
此外,写入模式是同步还是异步也是客户端设置,见:kafka-go/writer.go。
在上面提到的几项要素中,集群规模和partition数量之类都是业务要求,一般不会轻易变动,所以测试主要是实验在不同写入模式和acks情况下的性能变动。
测试:
同步 -1 1秒+
同步 1 1秒+
同步 0 1秒+
异步 -1 0.2ms 几乎瞬间完成
异步 1 0.2ms 几乎瞬间完成
异步 0 0.2ms 几乎瞬间完成
结论:性能只在于是否将客户端设置成异步写入,acks对性能的影响微乎其微。
但上述的测试是在MAC本地环境进行的,有几项因素不够稳定:
因此上述测试的结论主要是参考价值,不能作为最终结论。如果后续需要严谨结论的话,需要在真实机器上测试,并需要开启go的profiling,观察下kafka类库中的写入到底是怎么完成的,以及kafka集群的响应情况。此外,即便是模拟集群1秒左右的同步写入速度也实在是太慢了,后续还需要深入profiling,到底这1秒做了什么(无论是kafka还是go客户端)。
EOF