Kafka架构原理图

Kafka概述

  • 为什么有消息系统

    **解耦**
    允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
    
    **冗余**
    消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
    
    **扩展性**
    因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
    
    **灵活性 & 峰值处理能力**
    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
    
    **可恢复性**
    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
    
    **顺序保证**
    在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
    
    **缓冲**
    有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
    
    **异步通信**
    很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
    
  • 核心概念
    Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

    kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。Kafka就是一种发布-订阅模式。将消息保存在磁盘中,以顺序读写方式访问磁盘,避免随机读写导致性能瓶颈。

  • Kafka集群架构

kafka集群架构

* producer
   消息生产者,发布消息到Kafka集群的终端或服务

* broker
  Kafka集群中包含的服务器,一个borker表示kafka集群中的一个节点

* topic
  每条发布到Kafka集群的消息属于的类别,即Kafka是面向 topic 的。
  更通俗的说Topic就像一个消息队列,生产者可以向其写入消息,消费者可以从中读取消息,一个Topic支持多个生产者或消费者同时订阅它,所以其扩展性很好。

* partition
  每个 topic 包含一个或多个partition。Kafka分配的单位是partition

* replica
  partition 的副本,保障 partition 的高可用。

* consumer
  从Kafka集群中消费消息的终端或服务

* consumer group
  每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。

* leader
  每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。 producer 和 consumer 只跟 leader 交互

* follower
  Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。

* controller
  知道大家有没有思考过一个问题,就是Kafka集群中某个broker宕机之后,是谁负责感知到他的宕机,以及负责进行Leader Partition的选举?如果你在Kafka集群里新加入了一些机器,此时谁来负责把集群里的数据进行负载均衡的迁移?包括你的Kafka集群的各种元数据,比如说每台机器上有哪些partition,谁是leader,谁是follower,是谁来管理的?如果你要删除一个topic,那么背后的各种partition如何删除,是谁来控制?还有就是比如Kafka集群扩容加入一个新的broker,是谁负责监听这个broker的加入?如果某个broker崩溃了,是谁负责监听这个broker崩溃?这里就需要一个Kafka集群的总控组件,Controller。他负责管理整个Kafka集群范围内的各种东西。

* zookeeper
  (1)    Kafka 通过 zookeeper 来存储集群的meta元数据信息
  (2)一旦controller所在broker宕机了,此时临时节点消失,集群里其他broker会一直监听这个临时节点,发现临时节点消失了,就争抢再次创建临时节点,保证有一台新的broker会成为controller角色。

* offset
  消费者在对应分区上已经消费的消息数(位置),offset保存的地方跟kafka版本有一定的关系。
  kafka0.8 版本之前offset保存在zookeeper上。
  kafka0.8 版本之后offset保存在kafka集群上。

* ISR机制
  光是依靠多副本机制能保证Kafka的高可用性,但是能保证数据不丢失吗?不行,因为如果leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader,当时刚才的数据已经丢失了。
  ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的数量,只有处于ISR列表中的follower才可以在leader宕机之后被选举为新的leader,因为在这个ISR列表里代表他的数据跟leader是同步的。
  • kafka基本命令行
# 1、启动,停止
bin/kafka-server-start.sh config/server.properties
bin/kafka-server-stop.sh

# 2、创建topic
kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181

# 3、查询所有topic
kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 

# 4、查看topic描述信息
kafka-topics.sh --describe --topic test --zookeeper node01:2181,node02:2181,node03:2181  

# 5、删除topic
kafka-topics.sh --delete --topic test --zookeeper node01:2181,node02:2181,node03:2181 

# 6、模拟生产者写入数据
# 9092是 kafka中 conf/producer.properties  bootstrap.servers=localhost:9092地址
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test 

# 7、模拟消费者消费数据
#  conf/consumer.properties  bootstrap.servers=localhost:9092地址
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test --from-beginning
  • 生产者开发
//准备配置属性
Properties props = new Properties();
//kafka集群地址
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
//acks它代表消息确认机制
props.put("acks", "all");
//重试的次数
props.put("retries", 0);
//批处理数据的大小,每次写入多少数据到topic
props.put("batch.size", 16384);
//可以延长多久发送数据
props.put("linger.ms", 1);
//缓冲区的大小
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//这里需要三个参数,第一个:topic的名称,第二个参数:表示消息的key,第三个参数:消息具体内容
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello-kafka-"+i));
producer.close();

  • 消费者开发
//准备配置属性
Properties props = new Properties();
//kafka集群地址
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
//消费者组id
props.put("group.id", "test");
//自动提交偏移量
props.put("enable.auto.commit", "true");
//自动提交偏移量的时间间隔
props.put("auto.commit.interval.ms", "1000");
//默认是latest
//earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
props.put("auto.offset.reset","earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//指定消费哪些topic
consumer.subscribe(Arrays.asList("test"));
while (true) {
  //指定每个多久拉取一次数据
  ConsumerRecords<String, String> records = consumer.poll(100);
}
  • 手动提交偏移量
    //关闭自动提交,改为手动提交偏移量
  props.put("enable.auto.commit", "false");
  //定义一个数字,表示消息达到多少后手动提交偏移量
  final int minBatchSize = 20;
  if (buffer.size() >= minBatchSize) {
    //insertIntoDb(buffer); todo  拿到数据之后,进行消费
    consumer.commitSync();
    buffer.clear();
  }

分区策略

  • 指定具体分区
 // partition(指定存储在哪个分区)
public ProducerRecord(String topic, Integer partition, K key, V value) {
  this(topic, partition, null, key, value, null);
}
  • key.hashCode()
// 不指定分区,给定key值,通过key.hashCode() 分配到指定分区
public ProducerRecord(String topic, K key, V value) {
  this(topic, null, null, key, value, null);
}
  • 轮询方式
// 不指定分区,也不指定key的值,使用轮询方式依次存储到某个分区
public ProducerRecord(String topic, V value) {
  this(topic, null, null, null, value, null);
}
  • 自定义分区类
// 4. 类似于DefaultPartitioner,自定义分区类,实现Partitioner
public class MyPartitions implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0; // 返回放入哪个分区
    }
}
  • 源代码解析
// 下方是KafkaProducer<K, V>和DefaultPartitioner的源代码
    /**
     * computes partition for given record.
     * if the record has partition returns the value otherwise
     * calls configured partitioner class to compute the partition.
     */
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
  Integer partition = record.partition();
  // 1. 如果指定了partition则为partition指定值
  return partition != null ?
  partition :
  // 2. 没指定,则是调用以下方法,方法在下方
  partitioner.partition(
    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

/**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
      // 1.获取分区数量
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
      // 2. key 为空时,采取轮询
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
              // 使用key的hash值选择一个partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

文件存储机制

  • 文件结构
1. Kafka文件存储在Topic中
2. 一个Topic中包含多个分区(对应文件目录下Topic名称 + 0-partitions.size())
    1) 每个partition下的文件被等分成多个数据文件,默认1G,每一个数据文件被分为一个段(segment file)
    2)每个segment file分为 .log 和.index timeindex两个文件
        1. log文件名称是当前数据的序号,存储数据信息
        2. index文件名称同上,记录消息的offset和所在log文件的position稀疏索引。
        3. timeindex 存储消息 timestrap和稀疏索引
3. 一个partition有多个副本replication,分布在相同或不同的kafka节点中


结论:一个partition中的数据是有序的吗?回答:间隔有序,不连续。

针对一个topic里面的数据,只能做到partition内部有序,不能做到全局有序。特别是加入消费者的场景后,如何保证消费者的消费的消息的全局有序性,
这是一个伪命题,只有在一种情况下才能保证消费的消息的全局有序性,那就是只有一个partition。

image-20200104154450871

  • message消息结构

参数说明:

关键字 解释说明
8 byte offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校验message
1 byte “magic” 表示本次发布Kafka服务程序协议版本号
1 byte “attributes” 表示为独立版本、或标识压缩类型、或编码类型。
4 byte key length 表示key的长度,当key为-1时,K byte key字段不填
K byte key 可选
value bytes payload 表示实际消息数据。
    这个就需要涉及到消息的物理结构了,消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。
  • kafka优秀设计
    • 顺序写
一开始很多人质疑 kafka,大家认为一个架构在磁盘之上的系统,性能是如何保证的。这点需要跟大家解释一下,客户端写入到 Kafka 的数据首先是写入到操作系统缓存的(所以很快),然后缓存里的数据根据一定的策略再写入到磁盘,并且写入到磁盘的时候是顺序写,顺序写如果磁盘的个数和转数跟得上的话,都快赶上写内存的速度了!
  • PageCache
    为了优化读写性能,Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存。这样做的好处有:

(1)避免Object消耗:如果是使用Java堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。
(2)避免GC问题:随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题。
  • 跳 表
在 kafka 的代码里,我们一个的 log 文件是存储是 ConcurrentSkipListMap 里的,是一个 map 结构,key 用的是文件名(也就是 offset),value 就是 log 文件内容。而 ConcurrentSkipListMap 是基于跳表的数据结构设计的。想要消费某个大小的 offset,可以根据跳表快速的定位到这个 log 文件了。
  • 稀疏索引
.index 存储稀疏索引
假设刚刚我们定位要消费的偏移量是在 00000000000000368769.log 文件里。
如何根据 index 文件定位呢?
(1)首先在 index 文件里找,index 文件存储的数据都是成对出现的,比如我们到的 1,0 代表的意思是,offset=368769+1=368770 这条信息存储的物理位置是 0 这个位置。那现在我们现在想要定位的消息是 368776 这条消息,368776 减去 368769 等于 7,我们就在 index 文件里找 offset 等于 7 对应的物理位置,但是因为是稀松索引,我们没找到,不过我们找到了 offset 等于 6 的物理值 1407。
(2)接下来就到 log 文件里读取文件的 1407 的位置,然后遍历后面的 offset,很快就可以遍历到 offset 等于 7(368776)的数据了,然后从这儿开始消费即可
  • 零拷贝
    • 零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数。通常是说在IO读写过程中。
 Kafka利用linux操作系统的 "零拷贝(zero-copy)" 机制在消费端做的优化。
  • 首先来了解下数据从文件发送到socket网络连接中的常规传输路径
比如:读取文件,再用socket发送出去
传统方式实现:
先读取、再发送,实际经过1~4四次copy。
buffer = File.read 
Socket.send(buffer)
  * 第一步:操作系统从磁盘读取数据到内核空间(kernel space)的Page Cache缓冲区
  * 第二步:应用程序读取内核缓冲区的数据copy到用户空间(user space)的缓冲区
  * 第三步:应用程序将用户空间缓冲区的数据copy回内核空间到socket缓冲区
  * 第四步:操作系统将数据从socket缓冲区copy到网卡,由网卡进行网络传输

传统IO

传统方式,读取磁盘文件并进行网络发送,经过的四次数据copy是非常繁琐的。实际IO读写,需要进行IO中断,需要CPU响应中断(带来上下文切换),尽管后来引入DMA来接管CPU的中断请求,但四次copy是存在“不必要的拷贝”的。

重新思考传统IO方式,会注意到实际上并不需要第二个和第三个数据副本。应用程序除了缓存数据并将其传输回套接字缓冲区之外什么都不做。相反,数据可以直接从读缓冲区传输到套接字缓冲区。

显然,第二次和第三次数据copy 其实在这种场景下没有什么帮助反而带来开销,这也正是零拷贝出现的意义。

这种场景:是指读取磁盘文件后,不需要做其他处理,直接用网络发送出去。试想,如果读取磁盘的数据需要用程序进一步处理的话,必须要经过第二次和第三次数据copy,让应用程序在内存缓冲区处理。

sendfile

    此时我们会发现用户态“空空如也”。数据没有来到用户态,而是直接在核心态就进行了传输,但这样依然还是有多次复制。首先数据被读取到read buffer中,然后发到socket buffer,最后才发到网卡。虽然减少了用户态和核心态的切换,但依然存在多次数据复制。

如果可以进一步减少数据复制的次数,甚至没有数据复制是不是就会做到最快呢?
  • DMA

    • DMA,全称叫Direct Memory Access,一种可让某些硬件子系统去直接访问系统主内存,而不用依赖CPU的计算机系统的功能。听着是不是很厉害,跳过CPU,直接访问主内存。传统的内存访问都需要通过CPU的调度来完成。如下图:

    access memory

    • DMA,则可以绕过CPU,硬件自己去直接访问系统主内存。如下图

    1577687862577

    • 回到本文中的文件传输,有了DMA后,就可以实现绝对的零拷贝了,因为网卡是直接去访问系统主内存的。如下图:

    零拷贝

    Kafka采用顺序读写、Page Cache、零拷贝以及分区分段等这些设计,再加上在索引方面做的优化,另外Kafka数据读写也是批量的而不是单条的,使得Kafka具有了高性能、高吞吐、低延时的特点。这样Kafka提供大容量的磁盘存储也变成了一种优点

Java的NIO提供了FileChannle,它的transferTo、transferFrom方法就是Zero Copy。

内核原理

ISR机制

    光是依靠多副本机制能保证Kafka的高可用性,但是能保证数据不丢失吗?
    不行,因为如果leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader,当时刚才的数据已经丢失了。

    ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的数量,只有处于ISR列表中的follower才可以在leader宕机之后被选举为新的leader,因为在这个ISR列表里代表他的数据跟leader是同步的。

    如果要保证写入kafka的数据不丢失,首先需要保证ISR中至少有一个follower,其次就是在一条数据写入了leader partition之后,要求必须复制给ISR中所有的follower partition,才能说代表这条数据已提交,绝对不会丢失,这是Kafka给出的承诺
  • HW&LEO原理

    • LEO
    last end offset,日志末端偏移量,标识当前日志文件中下一条待写入的消息的offset。举一个例子,若LEO=10,那么表示在该副本日志上已经保存了10条消息,位移范围是[0,9]。
    
    • HW
        Highwatermark,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。任何一个副本对象的HW值一定不大于其LEO值。
        小于或等于HW值的所有消息被认为是“已提交的”或“已备份的”。HW它的作用主要是用来判断副本的备份进度.
    
        下图表示一个日志文件,这个日志文件中只有9条消息,第一条消息的offset(LogStartOffset)为0,最有一条消息的offset为8,offset为9的消息使用虚线表示的,代表下一条待写入的消息。日志文件的 HW 为6,表示消费者只能拉取offset在 0 到 5 之间的消息,offset为6的消息对消费者而言是不可见的。
    

    img

  leader持有的HW即为分区的HW,同时leader所在broker还保存了所有follower副本的leo

  (1)关系:leader的leo >= follower的leo >= leader保存的follower的leo >= leader的hw >= follower的hw
  (2)原理:上面关系反应出各个值的更新逻辑的先后
  • ==更新LEO的机制==

    • 注意
      • follower副本的LEO保存在2个地方
    (1)follower副本所在的broker缓存里。
    (2)leader所在broker的缓存里,也就是leader所在broker的缓存上保存了该分区所有副本的LEO。
    
    • 更新LEO的时机

      • follower更新LEO
      (1)follower的leo更新时间
          每当follower副本写入一条消息时,leo值会被更新
      
      (2)leader端的follower副本的leo更新时间
          当follower从leader处fetch消息时,leader获取follower的fetch请求中offset参数,更新保存在leader端follower的leo。
      
      • leader更新LEO
      (1)leader本身的leo的更新时间:leader向log写消息时
      
  • ==更新HW的机制==

    • follower更新HW

      follower更新HW发生在其更新完LEO后,即follower向log写完数据,它就会尝试更新HW值。具体算法就是比较当前LEO(已更新)与fetch响应中leader的HW值,取两者的小者作为新的HW值。
      
    • leader更新HW

      • leader更新HW的时机
      (1)producer 向 leader 写消息时
      (2)leader 处理 follower 的 fetch 请求时
      (3)某副本成为leader时
      (4)broker 崩溃导致副本被踢出ISR时
      
      • leader更新HW的方式
        当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(当然也包括leader自己的LEO),并选择最小的LEO值作为HW值。
        这里的满足条件主要是指副本要满足以下两个条件之一:
        (1)处于ISR中
        (2)副本LEO落后于leader LEO的时长不大于replica.lag.time.max.ms参数值(默认值是10秒)
      

HW和LEO的更新

  • producer消息发送原理
    • producer核心流程概览

Producer流程分析

  • 1、ProducerInterceptors是一个拦截器,对发送的数据进行拦截

    ps:说实话这个功能其实没啥用,我们即使真的要过滤,拦截一些消息,也不考虑使用它,我们直接发送数据之前自己用代码过滤即可
    
  • 2、Serializer 对消息的key和value进行序列化

  • 3、通过使用分区器作用在每一条消息上,实现数据分发进行入到topic不同的分区中

  • 4、RecordAccumulator收集消息,实现批量发送

    它是一个缓冲区,可以缓存一批数据,把topic的每一个分区数据存在一个队列中,然后封装消息成一个一个的batch批次,最后实现数据分批次批量发送。
    
  • 5、Sender线程从RecordAccumulator获取消息

  • 6、构建ClientRequest对象

  • 7、将ClientRequest交给 NetWorkClient准备发送

  • 8、NetWorkClient 将请求放入到KafkaChannel的缓存

  • 9、发送请求到kafka集群

  • 10、调用回调函数,接受到响应

producer核心参数

  • 常见异常处理

    • 不管是异步还是同步,都可能让你处理异常,常见的异常如下:
    1)LeaderNotAvailableException:这个就是如果某台机器挂了,此时leader副本不可用,会导致你写入失败,要等待其他follower副本切换为leader副本之后,才能继续写入,此时可以重试发送即可。如果说你平时重启kafka的broker进程,肯定会导致leader切换,一定会导致你写入报错,是LeaderNotAvailableException
    
    2)NotControllerException:这个也是同理,如果说Controller所在Broker挂了,那么此时会有问题,需要等待Controller重新选举,此时也是一样就是重试即可
    
    3)NetworkException:网络异常,重试即可
    我们之前配置了一个参数,retries,他会自动重试的,但是如果重试几次之后还是不行,就会提供Exception给我们来处理了。
    
    • retries

      • 重新发送数据的次数
    • retry.backoff.ms

      • 两次重试之间的时间间隔
  • 提升消息吞吐量

    • buffer.memory
      • 设置发送消息的缓冲区,默认值是33554432,就是32MB
    如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住
    
    • compression.type

      • producer用于压缩数据的压缩类型。默认是none表示无压缩。可以指定gzip、snappy
      • 压缩最好用于批量处理,批量处理消息越多,压缩性能越好。
    • batch.size

      • producer将试图批处理消息记录,以减少请求次数。这将改善client与server之间的性能。
      • 默认是16384Bytes,即16kB,也就是一个batch满了16kB就发送出去
    如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里。
    
    • linger.ms
      • 这个值默认是0,就是消息必须立即被发送
        一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kB,自然就会发送出去。
        但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。
    
  • 请求超时

    • ==max.request.size==
      • 这个参数用来控制发送出去的消息的大小,默认是1048576字节,也就1mb
      • 这个一般太小了,很多消息可能都会超过1mb的大小,所以需要自己优化调整,把他设置更大一些(企业一般设置成10M)
    • ==request.timeout.ms==
      • 这个就是说发送一个请求出去之后,他有一个超时的时间限制,默认是30秒
      • 如果30秒都收不到响应,那么就会认为异常,会抛出一个TimeoutException来让我们进行处理

ACK参数

acks参数,其实是控制发送出去的消息的持久化机制的。

  • ==acks=0==

    • 生产者只管发数据,不管消息是否写入成功到broker中,数据丢失的风险最高
        producer根本不管写入broker的消息到底成功没有,发送一条消息出去,立马就可以发送下一条消息,这是吞吐量最高的方式,但是可能消息都丢失了。
    你也不知道的,但是说实话,你如果真是那种实时数据流分析的业务和场景,就是仅仅分析一些数据报表,丢几条数据影响不大的。会让你的发送吞吐量会提升很多,你发送弄一个batch出去,不需要等待人家leader写成功,直接就可以发送下一个batch了,吞吐量很大的,哪怕是偶尔丢一点点数据,实时报表,折线图,饼图。
    
  • ==acks=1==

    • 只要leader写入成功,就认为消息成功了.
        默认给这个其实就比较合适的,还是可能会导致数据丢失的,如果刚写入leader,leader就挂了,此时数据必然丢了,其他的follower没收到数据副本,变成leader.
    
  • ==acks=all,或者 acks=-1==

    • 这个leader写入成功以后,必须等待其他ISR中的副本都写入成功,才可以返回响应说这条消息写入成功了,此时你会收到一个回调通知.
    这种方式数据最安全,但是性能最差。
    
  • ==如果要想保证数据不丢失,得如下设置==

    (1)min.insync.replicas = 2
        ISR里必须有2个副本,一个leader和一个follower,最最起码的一个,不能只有一个leader存活,连一个follower都没有了。
    
    (2)acks = -1
        每次写成功一定是leader和follower都成功才可以算做成功,这样leader挂了,follower上是一定有这条数据,不会丢失。
    
    (3)retries = Integer.MAX_VALUE
        无限重试,如果上述两个条件不满足,写入一直失败,就会无限次重试,保证说数据必须成功的发送给两个副本,如果做不到,就不停的重试。
        除非是面向金融级的场景,面向企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失
    

重试乱序

  • max.in.flight.requests.per.connection
    • 每个网络连接可以忍受 producer端发送给broker 消息然后消息没有响应的个数
消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了,你现在收到回调失败了才在重试,此时消息就会乱序,所以可以使用“max.in.flight.requests.per.connection”参数设置为1,这样可以保证producer同一时间只能发送一条消息

broker核心参数

  • server.properties配置文件核心参数

    【broker.id】
    每个broker都必须自己设置的一个唯一id
    
    【log.dirs】
    这个极为重要,kafka的所有数据就是写入这个目录下的磁盘文件中的,如果说机器上有多块物理硬盘,那么可以把多个目录挂载到不同的物理硬盘上,然后这里可以设置多个目录,这样kafka可以数据分散到多块物理硬盘,多个硬盘的磁头可以并行写,这样可以提升吞吐量。
    
    【zookeeper.connect】
    连接kafka底层的zookeeper集群的
    
    【Listeners】
    broker监听客户端发起请求的端口号,默认是9092
    
    【unclean.leader.election.enable】
    默认是false,意思就是只能选举ISR列表里的follower成为新的leader,1.0版本后才设为false,之前都是true,允许非ISR列表的follower选举为新的leader
    
    【delete.topic.enable】
    默认true,允许删除topic
    
    【log.retention.hours】
    可以设置一下,要保留数据多少个小时(默认168小时),这个就是底层的磁盘文件,默认保留7天的数据,根据自己的需求来就行了
    

consumer消费原理

Offset管理

​ 每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,老版本是写入zk,但是那样高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储。所以后来就是提交offset发送给内部topic:consumer_offsets,提交过去的时候,key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact。也就是每个group.id+topic+分区号就保留最新的那条数据即可。而且因为这个 consumer_offsets可能会接收高并发的请求,所以默认分区50个,这样如果你的kafka部署了一个大的集群,比如有50台机器,就可以用50台机器来抗offset提交的请求压力,就好很多。

Coordinator

  • Coordinator的作用

        每个consumer group都会选择一个broker作为自己的coordinator,他是负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance.
        根据内部的一个选择机制,会挑选一个对应的Broker,Kafka总会把你的各个消费组均匀分配给各个Broker作为coordinator来进行管理的.
        consumer group中的每个consumer刚刚启动就会跟选举出来的这个fconsumer group对应的coordinator所在的broker进行通信,然后由coordinator分配分区给你的这个consumer来进行消费。coordinator会尽可能均匀的分配分区给各个consumer来消费。
    
  • 如何选择哪台是coordinator

        首先对消费组的groupId进行hash,接着对consumer_offsets的分区数量取模,默认是50,可以通过offsets.topic.num.partitions来设置,找到你的这个consumer group的offset要提交到consumer_offsets的哪个分区。
        比如说:groupId,"membership-consumer-group" -> hash值(数字)-> 对50取模 -> 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,找到consumer_offsets的一个分区,consumer_offset的分区的副本数量默认来说1,只有一个leader,然后对这个分区找到对应的leader所在的broker,这个broker就是这个consumer group的coordinator了,consumer接着就会维护一个Socket连接跟这个Broker进行通信。
    

39 GroupCoordinator原理剖析

Rebalance策略

比如我们消费的一个topic主题有12个分区:p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11
假设我们的消费者组里面有三个消费者。
  • range策略
range策略就是按照partiton的序号范围
    p0~3             consumer1
    p4~7             consumer2
    p8~11            consumer3
默认就是这个策略
  • round-robin策略
consumer1:    0,3,6,9
consumer2:    1,4,7,10
consumer3:    2,5,8,11

但是前面的这两个方案有个问题:
    假设consuemr1挂了:p0-5分配给consumer2,p6-11分配给consumer3
    这样的话,原本在consumer2上的的p6,p7分区就被分配到了 consumer3上

  • sticky策略
    最新的一个sticky策略,就是说尽可能保证在rebalance的时候,让原本属于这个consumer
的分区还是属于他们,然后把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配的策略

consumer1: 0-3
consumer2:  4-7
consumer3:  8-11 

假设consumer3挂了
consumer1:0-3,+8,9
consumer2: 4-7,+10,11

consumer核心参数

【heartbeat.interval.ms】
默认值:3000
consumer心跳时间,必须得保持心跳才能知道consumer是否故障了,然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作

【session.timeout.ms】
默认值:10000    
kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒

【max.poll.interval.ms】
默认值:300000
如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,一遍来说结合你自己的业务处理的性能来设置就可以了

【fetch.max.bytes】
默认值:1048576
获取一条消息最大的字节数,一般建议设置大一些

【max.poll.records】
默认值:500条
一次poll返回消息的最大条数,

【connections.max.idle.ms】
默认值:540000
consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收

【auto.offset.reset】
  earliest
        当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费          
    latest
        当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从当前位置开始消费
    none
        topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
注:我们生产里面一般设置的是latest

【enable.auto.commit】
默认值:true
设置为自动提交offset

【auto.commit.interval.ms】
默认值:60 * 1000
每隔多久更新一下偏移量

官网查看kafka参数http://kafka.apache.org/10/documentation.html