Kafka基础介绍之consumers

我们在前面的《Kafka基本架构和概述》中知道Kafka最基本的两个组成部分就是producer和Consumers,producer已经在之前的《Kafka基础介绍之Producer》中介绍过了,本文就来继续和大家详细聊一聊Consumers的概念。

Consumers和Consumer Groups

假如我们有一个应用需要从Kafka中读数据,并且把它写到一个data store中,我们首先就需要创建一个consumer的object,然后subscribe到对应的topic中,然后就可以开始接收数据了。这就是最基本的consumer形式,但是假如我们的数据写入的速度很快,而一个consumer处理不过来,这个时候就需要多个consumer一起来接收数据,我们把它称之为consumer group。同一个consumer group中的不同consumer只能接受不同的partition数据。

我们用下面这个图来详细介绍一下,Topic T1有四个partition,我们开始只用一个consumer来进行读,如下所示:

随着时间推移,我们发现一个consumer处理不了这么多数据,可以在同一个consumer group中加入更多的consumer,这个时候我们有了两个consumer,consumer 1负责处理partition 0和partition 1的数据, consumer 2负责处理partition2和partition 3的数据。

我们甚至可以给每个不同的partition都安排一个consumer,如下所示:

我们上面有提到同一个consumer group中的不同consumer必须接受不同的partition的数据,所以这个时候哪怕你有更多的consumer,也不能同时处理同一个partition的数据,这也就意味着,当consumer数目大于partition的时候,就有一些consumer处于ideal的状态,如下所示, consumer 5就是处于ideal的状态。

从这个角度来看,在scale的时候,我们可以增加consumer,但是有效的consumer的数目又被partition的数目所限制,这就是为什么我们通常会看到一个topic有就很多partition,也是为了方便最终增加consumer来进行scale。

另外一个很常见的情况就是我们有多个应用都需要从同一个topic中读取数据,这个时候就可以创建多个consumer group,不同consumer group中的consumer是可以访问同一个partition的,如下所示,我们引入了consumer group2:

Rebalance

在我们理解了上面Consumer group的概念之后,你也许会问假如我们在Consumer group中加入一个新的consumer,或者一个原有consumer crash了会发生什么?这是一个很好的问题,当有上面的事情发生后,我们会把partition的owner进行transfer,比如把crash的consumer所对应的partition重新分配给别的还active的consumer,我们称这个过程为rebalance。正是因为有了这些rebalance的存在,我们才能够保证高可靠性和扩展性。

通常来说rebalance的方式有下面两种:

Eager rebalance:这种方法比较绝对,就是当我们要做rebalance的时候,所有的consumer都停止,然后重新join consumer group并进行partition的分配,最终恢复数据的consume。这种方式就意味着中间其实是有一个完全不consume的时间的,也就是停止consume的时间。整体时间线如下所示:

Cooperative rebalance:这种方式就是我们通过consumer group的leader把有影响的partition停止了,其它partition还在继续consumer,对有影响的partition再进行重新分配,有时为了达到最终的均衡,这个过程可能会发生好几次才最终stable,如下图所示,在第一个rebalance过程中,我们把partition3停止了,这个时候1和2还是被正常consume的。然后在第二个rebalance的过程中,partition3重新被assign给consumer C。所以这种方式相比于Eager来说,它不会出现全部停止的情况。

这个时候也许你会问,我们怎么才能知道有一个新的consumer加入或者已有的consumer crash了,并需要进行rebalance呢?方法其实也很简单,就是我们让consumer不停地发送heartbeat,当一段时间没有收到对应consumer的heartbeat信息的时候就会认为它出问题了,从而就可以进行rebalance的操作。

假如你的应用希望哪怕consumer停止一段时间,它负责的partition也不会重新rebalance(比如说你的应用不在意延时consumer,然后有一些cache的策略等等)。这个时候你可以把consumer设置成为一个static的member,这个static的member不会进行rebalance直到session timeout。一般来说当你把session timeout设置得比较大的时候,就可以保证即使停止一段时间,也不会进行rebalance。

创建consumer

和创建producer类似,我们首先也需要创建一个property,然后通过该它来创建对应的consumer,简单的代码如下所示:

这个代码没有什么特别需要说明的,在创建了consumer之后,我们还需要把这个consumer subscribe到对应的topic (“customerCountries”):

假如你想subscribe不同的topic,你甚至可以使用一个正则表达式来表示:

在subscribe之后,一个最重要的事情就是循环地读取topic的数据,我们一般可以使用如下所示的poll loop:

一般来说,consumer会不停地读取数据,所以我们会用while(true)这样的代码,然后这段代码最重要的函数就是这个poll(timeout)了,它会不停的poll数据,假如没有数据就会等待直到timeout。当然这个poll函数还有一些别的功能,比如你第一次调用它的时候,他会发现GroupCoordinator,并且join consumer group,接受它assign的partition等待。后面的rebalance都会通过它来体现,所以可以说它会处理所有和consumer相关的操作。

一般来说为了thread safe每个consumer group中的consumer必须有其单独的一个thread来进行处理。

Consumer的设置

和之前的Producer类似,Consumer property也有很多设置来实现各种功能,我们简单罗列一些重要的设置如下:

Fetch.min.bytes

Consumer利用这个设置来告诉Kafka一次允许的最小数据大小,说白了就是至少有了这么多数据再发送给我,这样一来不会一有数据就返回,也就有效降低了consumer端的压力。当然从另外一个角度来说,这个值设置得比较大的时候就会导致consumer读的延时,所以你可以根据你的应用的情况来具体设置。另外这个设置还需要和下面的max wait 的时间来一起看。

Fetch.max.wait.ms

我们上面说你至少有了那么多数据再发送给我,假如一直就只有一点点数据是不是就一直不发送呢?显然是不可能的,所以就有了这个设置,它的意思是最长的等待时间,这个时间一到,哪怕你没有足够的数据满足min.bytes,你也需要发送数据回来。

Fetch.max.bytes

这个也比较好理解,和min.bytes是相反的,就是一次最多只能返回这么多数据给consumer。

Max.poll.records

和max.bytes比较类似,前者限制的数据大小,这个限制的record的数目。

Session.timeout.ms以及heartbeat.interval.ms

这两个参数我们在前面的rebalance中也有提及,session timeout用来表示一个consumer多长时间没有heartbeat,我们才认为它出问题了(默认是10s)。Heartbeat.interval.ms用来表示每次heartbeat发送的间隔。

Max.poll.interval.ms

在consumer上面其实有两个thread一个是在poll函数调用的这个层级,另外一个是heartbeat(background thread)来发送heartbeat,有一种可能就是后台的heartbeat还是好的,但是前台的poll thread出问题了,这种情况下我们也需要有一定的机制来探测,而这个参数就是用来设置前台thread有多久没有调用poll函数,我们就认为consumer出问题了。

Auto.offset.reset

这个参数主要用来设置当consumer没有设置offset或者设置的offset无效的情况下如何进行处理。默认值是”latest”,也就是说没有设置offset就从最新的开始。另外一个设置是”earliest”,顾名思义,就是从最早的数据来进行读取。你也可以设置成”none”,会直接报错。

Enable.auto.commit

这个参数用来设置consumer是否会自动commit。我们会在后面专门写一篇文章来介绍consumer的commit。

Partition.assignment.strategy

这个参数还比较有意思,我们知道不同的partition可以分配给同一个consumer group中的不同consumer,而这个分配的算法就是由这个设置来决定,它主要有下面几种方法:

  1. Range: 这种设置下,每个consumer都会被分配一个topic中连续的partition。比如说有两个topic T1和T2,每个topic下面有三个partition 0,1,2,然后有两个consumer C1和C2.那么这个分配的结果就是T1的0,1以及T2的0,1会分配给C1,T1和T2的2会分配给C2。所以这种策略下如果数据正好无法平分,第一个consumer总是会被多分。
  2. RoundRobin:这个很好理解,就是大家一个一个分,一轮结束继续下一轮。所以这种策略下上面的例子的结果会是:T1的0,2以及T2的1分配给C1。T1的1以及T2的0,2会分配给C2。所以通常假如我们的consumer只subscribe到一个topic的时候,这种策略是最balance的。
  3. Sticky:这种策略下的思想首先要尽量balance,然后假如有rebalance出现,尽量少地在consumer之间move partition。所以开始就和round robin类似,但是有rebalance出现的时候会尽量少move partition到不同的consumer,它在一个consumer group subscribe到不同的topic的时候会比round robin更加平衡一点。
  4. Cooperative sticky:和sticky类似只是说支持了cooperative的rebalance。

总结

至此本文就把Kafka中consumer相关的基本概念介绍完毕了,希望对大家有所帮助。

You may also like...

2 Responses

  1. December 3, 2021

    […] 我们在《Kafka基础介绍之Consumer》中有提到可以使用poll函数来获取消息,而每次调用poll函数的时候,它返回的其实是Kafka中你这个consumer所在group还没有读的message。那这个还没有读是通过什么来判断的呢?它在Kafka中是通过offset来决定的,本文就来详细和大家聊聊Kafka中是如何通过offset来进行commit的。 […]

  2. January 23, 2022

    […] 一个Consumer Group可以consume from 一个topic的多个partition吗?可以的,可以参考这篇文章:Kafka基础介绍之consumers […]

Leave a Reply

Your email address will not be published.