Kafka进阶之请求处理流程介绍

我们知道Kafka的Client端可以发送各种请求给Broker,Broker在收到请求之后会进行相应的处理,然后返回response给对应的client,本文就来详细地聊一聊这个请求处理的过程。

概述

如我们上面提到,Client端会建立连接,然后发送request到broker,broker会进行处理再返回response到client。这里最基本的一个保证就是同一个client的不同request的处理是有序的,即先收到先处理。

整个请求的处理流程如下图所示:

简单来说,在broker中,当连接建立之后,会有一个Network的thread专门来接收从这个连接(client)发送过来的请求,当它收到请求之后,会把这个请求放到Request Queue中。然后I/O thread会不停地从Request Queue中读取请求并进行处理。在相应的请求处理完成之后,会把对应的response放到Response queue中,Network thread会从Response queue中来获取可以返回的response,如果有会把这个response返回给对应的client。

请求类型

Kafka的client端请求可以分为以下几类:

  1. Produce requests:顾名思义就是从produce发送过来的请求,包含了client要写到broker的message。
  2. Fetch requests:这个从consumer那边发送过来的请求,是用来从Kafka broker中读取message的。
  3. Admin requests:这个就是从admin client那边发送过来的metadata相关的操作,比如创建,删除topic等等。

我们在前面的《Kafka进阶之Replication》中提到,所有的produce请求(写请求)必须发送到leader partition,也就是说假如接受到produce请求的partition不是leader,它需要返回一个error response(一般来说是“not a leader for partition”的error message)。从另外一个角度来说,produce client是需要自行判断哪个是leader partition,然后再发送相关请求的。那么它是如何得到这个leader partition的信息的呢?

这里就引入了一个新的请求类型,它就是metadata request,当server端收到这个请求的时候,会返回topics中包含的partition,以及每个partition的leader等信息。这个request是可以发送给任何一个broker的,如下图所示:

所以总得来说,client会先发送metadata request到任何一个broker,得到partition的leader信息,然后再发送相应的produce request到leader partition。这里为了性能考虑,不可能每次发送produce request之前都去拿Metadata信息,因为一般来说Metadata信息更新的频率不会太高,所以我们直接cache这个信息,并定期进行refresh就可以了,当然如果某一个produce请求发送产生了错误,比如上面提到的“not a leader for partition”错误,我们也需要立即更新这个cache。

Produce request

我们在之前的《Kafka基础介绍之Producer》中详细介绍过,produce请求有一个重要的参数就是ack。简单回顾一下,就是当ack=1的时候,写请求只要在leader成功就可以返回了,当ack=all的时候,写请求需要在所有in-sync replica完成之后才能返回,或者ack=0,就是谁都不等。当我们回忆起了这个参数,我们先来看看当broker收到produce request之后需要进行的检查:

  1. 首先检查对应的client是否有写的权限。
  2. 检查acks的设置是否valid。(0,1或者all)
  3. 假如是all,那么还需要检查是否所有的all-sync replica数目满足最低的config值(min.insync.replicas)

当上面这些检查全部完成之后,broker就需要把message写到disk(可能会写到filesystem cache,何时flush到disk并不能保证,Kafka是通过replication来保证message durability的)。假如ack是0或者1的时候,broker就可以返回了。当ack是all的时候,请求会保存在一个称之为purgatory的buff中(我们后面会专门写一篇文章来介绍这个),直到leader发现所有的follower有了这个message才会返回给client。

Fetch request

Fetch request的处理和produce的request比较类似,唯一比较特殊的地方就是fetch会有一个offset的参数,这个参数用来表示从哪里开始获取message。所以broker在收到fetch request的时候需要先做这个参数的validate,假如请求的offset太老了,相关的message都被删除了,或者请求的offset还不存在,则需要返回对应的error response。假如offset是合理的,还有一个小的地方需要注意一下,这里有一个返回的limitation,就是最多返回多少的设置,不能一次性返回太多,在client端可能没法处理。

了解了上面的两点之后,Kafka在获取offset相关的message并进行返回的时候采用了一种称之为零拷贝(zero-copy)的技术,这种技术就是直接从file发送message到network channel,中间没有任何的buffer,这种技术可以去除中间的一次拷贝和memory的申请,从而提高了最终的性能。

对了,除了上面的设置最大可返回message的大小,我们还可以设置最小返回的message大小。这个主要用于做batch操作,如下图所示,等到有足够的message再返回(当然超时了也会返回)。

其他请求

除了我们上面提到的metadata request,produce request和fetch request。其实Kafka还有很多其它内部的一些请求,比如说一个controller声明某个partition有了一个新的leader,他会发送一个LeaderAndIsr的请求到新的leader。又比如我们在《Kafka基础介绍之消息commit》中提到的offsetCommitRequest等等。感兴趣的同学可以参考这个网址:https://kafka.apache.org/protocol.html#protocol_api_keys去看看所有的请求。

总结

本文详细介绍了Kafka的请求处理流程,相信有了这篇文章你会对Kafka的请求是如何在内部处理有所了解。也欢迎您继续阅读Kafka系列的相关文章,并给出宝贵意见。

You may also like...

Leave a Reply

Your email address will not be published.