博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka 概览
阅读量:6421 次
发布时间:2019-06-23

本文共 14690 字,大约阅读时间需要 48 分钟。

hot3.png

与传统的mq区别

吞吐量

高吞吐是kafka需要实现的核心目标之一,为此kafka在消息写入,消息保存和消息发送的三个阶段均做了优化:

  1. 消息写入阶段
  • 数据批量发送
  • 数据压缩
  1. 消息保存阶段
  • 数据磁盘持久化:直接使用linux 文件系统的cache,来高效缓存数据。
  • zero-copy:减少IO操作步骤,传统的数据发送需要发送4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少为2次
  1. 消息发送阶段
  • Topic划分为多个partition,提高并行度

负载均衡

负载均衡主要包括消息的分割及消息的备份以及broker和consumer的动态加入和离开。

  • producer根据用户指定的算法,将消息发送到指定的partition
  • 存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上
  • 多个partition需要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over
  • 通过zookeeper管理broker与consumer的动态加入与离开

拉取系统

由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处:

  • 简化kafka设计
  • consumer根据消费能力自主控制消息拉取速度
  • consumer根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等

可扩展性

当需要增加broker结点时,新增的broker会向zookeeper注册,而producer及consumer会根据注册在zookeeper上的watcher感知这些变化,并及时作出调整。

img

kafka名词解释

  • Producer :消息生产者,就是向kafka broker发消息的客户端。
  • Consumer :消息消费者,向kafka broker取消息的客户端
  • Topic :topic可以理解为数据库的一张表或者文件系统里面的一个目录。
  • Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个partition只能被GC里面的一个消费者消费,不同消费者组可以消费同一个partition。
  • Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Partition:一个topic分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
  • Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。

Broker Leader的选举

Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。

Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR(除了controller之外其他broker节点信息),在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。

Consumer

Consumer处理partition里面的message的时候是o(1)顺序读取的。所以必须维护着上一次读到哪里的offsite信息。high level API,offset存于Zookeeper中,low level API的offset由自己维护。

Consumergroup

各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group )中的一个consumer(consumer 线程)消费,如果一个message可以被多个consumer(consumer 线程)消费的话,那么这些consumer必须在不同的组

如果想多个不同的业务都需要这个topic的数据,起多个consumer group就好了,大家都是顺序的读取message,offsite的值互不影响。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念

新启动的consumer默认从partition队列最头端最新的地方开始阻塞的读message

最优的设计就是,consumer group下的consumer thread的数量等于partition数量,一个consumer thread处理一个partition,这样效率是最高的。

如果producer的流量增大,当前的topic的parition数量=consumer数量,这时候的应对方式就是很想扩展:增加topic下的partition,同时增加这个consumer group下的consumer。

Consumer Rebalance

触发条件:消费者数量或者broker数量发生变化时

  • Consumer增加或删除会触发 Consumer Group的Rebalance
  • Broker的增加或者减少都会触发 Consumer Rebalance

Topic & Partition

Topic相当于传统消息系统MQ中的一个队列queue,producer端发送的message必须指定是发送到哪个topic,但是不需要指定topic下的哪个partition,因为kafka会把收到的message进行load balance,均匀的分布在这个topic下的不同的partition上( hash(message) % [broker数量] )。物理上存储上,这个topic会分成一个或多个partition,每个partiton相当于是一个子queue。在物理结构上,每个partition对应一个物理的目录(文件夹),文件夹命名是[topicname][partition][序号],一个topic可以有无数多的partition,根据业务需求和数据量来设置。在kafka配置文件中可随时更高num.partitions参数来配置更改topic的partition数量,在创建Topic时通过参数指定parittion数量。Topic创建之后通过Kafka提供的工具也可以修改partiton数量。

一般来说,(1)一个Topic的Partition数量大于等于Broker的数量,可以提高吞吐率。(2)同一个Partition的Replica尽量分散到不同的机器,高可用。

Partition&Replica

每个partition可以在其他的kafka broker节点上存副本,默认的副本数为1,可通过default.replication.factor配置副本数,以便某个kafka broker节点宕机不会影响这个kafka集群。存replica副本的方式是按照kafka broker的顺序存。例如有5个kafka broker节点,某个topic有3个partition,每个partition存2个副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此类推(replica副本数目不能大于kafka broker节点的数目,否则报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本)。这样如果某个broker宕机,其实整个kafka内数据依然是完整的。但是,replica副本数越高,系统虽然越稳定,但是回来带资源和性能上的下降;replica副本少的话,也会造成系统丢数据的风险。

  1. 怎样传送消息:producer先把message发送到partition leader,再由leader发送给其他partition follower。
  2. 在向Producer发送ACK前需要保证有多少个Replica已经收到该消息:根据ack配的个数而定
  3. 怎样处理某个Replica不工作的情况:如果这个部工作的partition replica不在ack列表中,就是producer在发送消息到partition leader上,partition leader向partition follower发送message没有响应而已,这个不会影响整个系统,也不会有什么问题。如果这个不工作的partition replica在ack列表中的话,producer发送的message的时候会等待这个不工作的partition replca写message成功,但是会等到time out,然后返回失败因为某个ack列表中的partition replica没有响应,此时kafka会自动的把这个部工作的partition replica从ack列表中移除,以后的producer发送message的时候就不会有这个ack列表下的这个部工作的partition replica了。
  4. 怎样处理Failed Replica恢复回来的情况:如果这个partition replica之前不在ack列表中,那么启动后重新受Zookeeper管理即可,之后producer发送message的时候,partition leader会继续发送message到这个partition follower上。如果这个partition replica之前在ack列表中,此时重启后,需要把这个partition replica再手动加到ack列表中。(ack列表是手动添加的,出现某个部工作的partition replica的时候自动从ack列表中移除的)

Partition leader&follower

partition也有leader和follower之分。leader是主partition,producer写kafka的时候先写partition leader,再由partition leader push给其他的partition follower。partition leader与follower的信息受Zookeeper控制,一旦partition leader所在的broker节点宕机,zookeeper会冲其他的broker的partition follower上选择follower变为parition leader。

Broker

  • Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。
  • Broker不保存订阅者的状态,由订阅者自己保存
  • Broker的无状态特点导致其需要定期删除数据以及当订阅者故障时需要从最小offset重新消费,也就是不能够保证exactly once

Consumer与topic关系

  • 每个group中可以有多个consumer,每个consumer属于一个consumer group;
  • 对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费
  • kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的。

img

Kafka消息的分发

Producer客户端负责消息的分发

  • kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含”集群中存活的servers列表”/”partitions leader列表”等信息;
  • 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;
  • 消息由producer直接通过socket发送到broker,中间不会经过任何”路由层”,事实上,消息被路由到哪个partition上由producer客户端决定;

比如可以采用”random”“key-hash”“轮询”等,如果一个topic中有多个partitions,那么在producer端实现”消息均衡分发”是必要的。

  • 在producer端的配置文件中,开发者可以指定partition路由的方式。

message

Producer消息发送的应答机制

设置发送数据是否需要服务端的反馈,有三个值0,1,-1

0: producer不会等待broker发送ack

1: 当leader接收到消息之后发送ack

-1: 当所有的follower都同步消息成功后发送ack

kafka中的zookeeper

kafka在zookeeper中存储路径

/broker/ids/[0...N]  ## broker node注册/broker/topics/[topic]/partitions/[0...N]  ## topic 和 partitions/consumers/[group_id]/ids/[consumer_id] ## consumer注册/consumers/[group_id]/offsets/[topic]/[partition_id] ## 每个consumer group目前所消费的partition中最大的offset/consumers/[group_id]/owners/[topic]/[partition_id] ## 表明parttion的消费者

img

控制器与zookeeper关系

在老版本的kafka中没有控制器,都是由各个broker直接和zookeeper通信,这样造成资源浪费和zookeeper脑裂和羊群效应的发生,所以在新的版本中Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),controller和zoookeeper进行通信,监听partition,topic,broker等变化并更新zookeeper上的节点信息。

控制器在竞选成功后会初始化一个上下文,并将上下文信息同步给其他broker节点。对于新增事件,统一维护在线程安全的linkedblockingQueue中,然后按照FIFO的原则顺序的处理这些事件,维护多线程的安全。

消息可靠性

消息投递可靠性:

一个消息如何算投递成功,Kafka提供了三种模式:

  1. ack=0:第一种是啥都不管,发送出去就当作成功,这种情况当然不能保证消息成功投递到broker;
  2. ack=-1:第二种是Master-Slave模型,只有当Master和所有Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,但是损伤了性能;
  3. ack=1:第三种模型,即只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数情况下都会中和可靠性和性能选择第三种模型

消息在broker上的可靠性,因为消息会持久化到磁盘上,所以如果正常stop一个broker,其上的数据不会丢失;但是如果broker不正常stop,可能会使存在页面缓存来不及写入磁盘的消息丢失,这可以通过配置flush页面缓存的周期、阈值缓解,但是同样会频繁的写磁盘会影响性能,又是一个选择题,根据实际情况配置。

消息消费的可靠性:

Kafka提供的是“At least once”模型,因为消息的读取进度由offset提供,offset可以由消费者自己维护也可以维护在zookeeper里,但是当消息消费后consumer挂掉,offset没有即时写回,就有可能发生重复读的情况。

文件存储机制

kafka文件存储优点:

  • 数据磁盘持久化:直接使用linux 文件系统的cache,来高效缓存数据。
  • zero-copy:减少IO操作步骤,传统的数据发送需要发送4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少为2次
  • 数据批量发送
  • 数据压缩
  • Topic划分为多个partition,提高并行度

从IO角度看kafka数据传输

实际场景中用户调整page cache的手段并不太多,更多的还是通过管理好broker端的IO来间接影响page cache从而实现高吞吐量

理想流程:

producer发送消息给broker—>broker将数据直接写入page cache中—>fllower broker从page cache中拉取数据—>consumer从page cache中拉取数据。

所以如果想提高IO的效率,就应该在数据未从page cache中删除之前去获取,从而避免走磁盘访问。具体有以下场景可能需要访问磁盘:

  • consumer消费速度过慢,在拉取数据时数据已经写入broker磁盘(具体写入时间由操作系统决定)
  • 老版本consumer,由于老版本consumer的消息格式和现有不同需要走JVM导致整个过程缓慢
  • 日志压缩,broker在定期做日志压缩时会消耗掉一定的IO和内存

page cache的调优

  • 设置合理(主要是偏小)的Java Heap size,Kafka对于JVM堆的需求并不是特别大,6~10GB大小的JVM堆是一个比较合理的数值
  • 调节内核的文件预取(prefetch):文件预取是指将数据从磁盘读取到page cache中,防止出现缺页中断(page fault)而阻塞。

topic中partition存储分布

假设实验环境中Kafka集群只有一个broker,xxx/message-folder为数据文件存储根目录,在Kafka broker中server.properties文件配置(参数log.dirs=xxx/message-folder),

例如创建2个topic名 称分别为report_push、launch_info, partitions数量都为partitions=4,存储路径和目录规则为:

xxx/message-folder  // 数据文件存储根目录  |--report_push-0  // topic report_push partition 0 目录  |--report_push-1  |--report_push-2  |--report_push-3  |--launch_info-0  |--launch_info-1  |--launch_info-2  |--launch_info-3

在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。消息发送时都被发送到一个topic,其本质就是一个目录

partiton中文件存储方式

  • 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
  • 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

partiton中segment文件结构

每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。

  • segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件.
  • segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个全局partion的最大offset(偏移message数)。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

kafka5

segment中index<—->data file对应关系物理结构:

index : message序号,物理偏移地址

log:

kafka5

如何通过offset查找message

例如读取offset=368776的message,需要通过下面2个步骤查找。

  • 第一步通过二分法查找所在segment file

    上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件 00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。

    当offset=368776时定位到00000000000000368769.index|log

  • 再定位到的segnent index中顺序查找

    当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和 00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到 offset=368776为止

segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它 比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

消息格式

对于日志来说,一条记录以"\n"结尾,或者通过其它特定的分隔符分隔,这样就可以从文件中拆分出一条一条的记录,不过这种格式更适用于文本,对于Kafka来说,需要的是二进制的格式。所以,Kafka使用了另一种经典的格式:在消息前面固定长度的几个字节记录下这条消息的大小(以byte记),所以Kafka的记录格式变成了:

Offset MessageSize Message

消息被以这样格式append到文件里,在读的时候通过MessageSize可以确定一条消息的边界。

MessageSet

MessageSet是由多条记录组成的,而不是消息,这就决定了一个MessageSet实际上不需要借助其它信息就可以从它对应的字节流中切分出消息,而这决定了更重要的性质:Kafka的压缩是以MessageSet为单位的。将MessageSet压缩后作为另一个Message的value。Kafka的消息是可以递归包含的,也就是前边"value"字段的说明“Kafka supports recursive messages in which case this may itself contain a message set"。但是注意:Kafka中的一个Message最多只含有一个MessageSe否则会报:MessageSizeTooLargeException

具体地说,对于Kafka来说,可以对一个MessageSet做为整体压缩,把压缩后得到的字节数组作为一条Message的value。于是,Message既可以表示未压缩的单条消息,也可以表示压缩后的MessageSet。

从0.8.x版本开始到现在的1.1.x版本,Kafka的消息格式也经历了3个版本。

v0版本 对于Kafka消息格式的第一个版本,我们把它称之为v0,在Kafka 0.10.0版本之前都是采用的这个消息格式。注意如无特殊说明,我们只讨论消息未压缩的情形。

img

上左图中的“RECORD”部分就是v0版本的消息格式,大多数人会把左图中的整体,即包括offset和message size字段都都看成是消息,因为每个Record(v0和v1版)必定对应一个offset和message size。每条消息都一个offset用来标志它在partition中的偏移量,这个offset是逻辑值,而非实际物理偏移值,message size表示消息的大小,这两者的一起被称之为日志头部(LOG_OVERHEAD),固定为12B。LOG_OVERHEAD和RECORD一起用来描述一条消息。与消息对应的还有消息集的概念,消息集中包含一条或者多条消息,消息集不仅是存储于磁盘以及在网络上传输(Produce & Fetch)的基本形式,而且是kafka中压缩的基本单元,详细结构参考上图。

下面来具体陈述一下消息(Record)格式中的各个字段,从crc32开始算起,各个字段的解释如下:

  • crc32(4B):crc32校验值。校验范围为magic至value之间。
  • magic(1B):消息格式版本号,此版本的magic值为0。
  • attributes(1B):消息的属性。总共占1个字节,低3位表示压缩类型:0表示NONE、1表示GZIP、2表示SNAPPY、3表示LZ4(LZ4自Kafka 0.9.x引入),其余位保留。
  • key length(4B):表示消息的key的长度。如果为-1,则表示没有设置key,即key=null。
  • key:可选,如果没有key则无此字段。
  • value length(4B):实际消息体的长度。如果为-1,则表示消息为空。
  • value:消息体。可以为空,比如tomnstone消息。

v0版本中一个消息的最小长度(RECORD_OVERHEAD_V0)为crc32 + magic + attributes + key length + value length = 4B + 1B + 1B + 4B + 4B =14B,也就是说v0版本中一条消息的最小长度为14B,如果小于这个值,那么这就是一条破损的消息而不被接受

v1版本

kafka从0.10.0版本开始到0.11.0版本之前所使用的消息格式版本为v1,其比v0版本就多了一个timestamp字段,表示消息的时间戳。

加时间戳的原因:在没有时间戳的情况下,移动replica后,replica的创建时间会时当前时间,会影响到log segment的删除策略,另外假如时间戳可以更好的支持流处理。

v1版本的magic字段值为1。v1版本的attributes字段中的低3位和v0版本的一样,还是表示压缩类型,而第4个bit也被利用了起来:0表示timestamp类型为CreateTime,而1表示tImestamp类型为LogAppendTime,其他位保留。v1版本的最小消息(RECORD_OVERHEAD_V1)大小要比v0版本的要大8个字节,即22B。

v2版本

kafka从0.11.0版本开始所使用的消息格式版本为v2,这个版本的消息相比于v0和v1的版本而言改动很大,同时还参考了Protocol Buffer而引入了变长整型(Varints)和ZigZag编码。Varints是使用一个或多个字节来序列化整数的一种方法,数值越小,其所占用的字节数就越少。ZigZag编码以一种锯齿形(zig-zags)的方式来回穿梭于正负整数之间,以使得带符号整数映射为无符号整数,这样可以使得绝对值较小的负数仍然享有较小的Varints编码值,比如-1编码为1,1编码为2,-2编码为3。

配置文件server.properties

  • broker.id=0 : #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
  • port=19092 #当前kafka对外提供服务的端口默认是9092
  • host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
  • num.network.threads=3 #这个是borker进行网络处理的线程数
  • num.io.threads=8 #这个是borker进行I/O处理的线程数
  • log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,可以配置多个目录,以“,”分割,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
  • socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
  • socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
  • socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
  • num.partitions=1 #默认的分区数,一个topic默认1个分区数
  • log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
  • message.max.byte=5242880 #消息保存的最大值5M
  • default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
  • replica.fetch.max.bytes=5242880 #取消息的最大直接数
  • log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
  • log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
  • log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
  • zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口

常用命令

1、启动kafka: ./bin/kafka-server-start.sh ./config/server.properties

2、创建主题:./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

3、查看主题: ./bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

4、删除主题:./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic test(默认不进行删除,只是打上了删除标记。设置server.properties文件内“delete.topic.enable=true”,并且重启Kafka就可以了。)

5、发送消息:./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

6、接收消息:./bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning

kafka优化思路

主要从4个方面考虑优化:吞吐量(throughput)、延时(latency)、持久性(durability)和可用性(availability)。“鱼和熊掌不可兼得”——你没有办法最大化所有目标。这4者之间必然存在着权衡(tradeoff)。常见的tradeoff包括:吞吐量和延时权衡、持久性和可用性之间权衡。但是当我们考虑整个系统时通常都不能孤立地只考虑其中的某一个方面,而是需要全盘考量。

partition

  • 使用随机分区
  • 增加partition

producer

producer端可通过配置batch大小,缓存大小,是否压缩和重试次数等来提高吞吐量

  • batch.size = 100000 - 200000,默认是16384,通常都太小了
  • compression.type = lz4,如果是低延迟则不压缩
  • acks = 1,如果是持久性,则配置为all
  • retries = 0,如果是持久性则配置相对较大的值,比如5 ~ 10默认为3
  • buffer.memory:如果分区数很多则适当增加 (默认是32MB)

consumer

  • fetch.min.bytes如果想达到高吞吐则配置为10-100,如果是低延迟则配置为1,默认为1
  • Consumer 的套接字缓冲区(socket buffers),以应对数据的高速流入,或者设置为-1,让底层根据网络情况自动设置
  • Consumers 应当使用固定大小的缓冲区,而且最好是使用堆外内存(off-heap),固定大小的缓冲区能够阻止 Consumer 将过多的数据拉到堆栈上,以至于 JVM 花费掉其所有的时间去执行垃圾回收,进而无法履行其处理消息的本质工作。
  • 在 JVM 上运行各种 Consumers 时,请警惕垃圾回收对它们可能产生的影响,长时间垃圾回收的停滞,可能导致 ZooKeeper 的会话被丢弃、或 Consumer group 处于再均衡状态。对于 Broker 来说也如此,如果垃圾回收停滞的时间太长,则会产生集群掉线的风险。

broker

  • num.replica.fetchers:如果发生ISR频繁进出的情况或follower无法追上leader的情况则适当增加该值,但通常不要超过CPU核数+1
  • num.recovery.threads.per.data.dir = log.dirs中配置的目录数

转载于:https://my.oschina.net/freelili/blog/3008959

你可能感兴趣的文章
设计一个算法,判断玩家是否赢了井字游戏
查看>>
Linux下*.tar.gz文件解压缩命令
查看>>
传统媒体、网络媒体的乐与悲!
查看>>
shell下操作快捷键
查看>>
Android流量统计TrafficStats类的使用(Android2.2后可用)
查看>>
Linux 命令用法
查看>>
我的友情链接
查看>>
Linux运维之linux下构建Cacti网络监控平台-清小小著作
查看>>
nohup命令
查看>>
陈松松:我是如何制定每一天的视频营销计划
查看>>
【我拼搏的2016】-Python进行时
查看>>
MySQL性能优化的最佳21条经验
查看>>
19.Swift中的闭包
查看>>
LI块级浮动margin或高度多出问题
查看>>
Android混淆打包
查看>>
提高企业内网服务器安全的五点建议
查看>>
第一个自己编写的面向对象的Python例子
查看>>
Horizon View 7 发布Win10桌面二:即时克隆桌面池配置
查看>>
mysql性能优化注意事项以及索引
查看>>
Java自带的线程池ThreadPoolExecutor详细介绍说明和实例应用
查看>>