Kafka进阶之Exactly once保证
Kafka在可靠性这一块有很灵活的配置,我们可以根据自己的应用需求来设置相应的参数,有时候我们希望Kafka不要丢失message,但是不在意是否有重复的message,这个时候也许at least once的保证就足够了。当然对于我们通常来说的producer产生message,consumer消费message的应用来说,处理重复message还是很容易的,比如通过一些unique的id来进行判断是否是重复的信息等,但对于stream的process,比如说计算一些信息的平均值,这种process就很难去一个个看是否有重复的message了,这个时候就希望Kafka能够保证不要有重复的信息,这也就是本文要来聊一聊的Exactly once保证。
我们来简单想一想,如果让你来做,怎么才能做到exactly once呢?聪明的你肯定可以想到在producer写的时候我们就不要有重复的message,那我们就先从这个方面来看看有没有什么好的方法解决。
幂等Producer
所谓的幂等就是假如我们执行一个操作很多次和执行一次得到的结果是一样的。这个很好理解,举个简单例子,x = x+1; 你执行多次和执行一次,x的结果是不同的。但假如是x=5;那么不管你执行几次,最终x的值都是5,所以它就是幂等的。
有了幂等的概念之后,就很容易想到假如我们能让producer也幂等就好了,这样就相当于一个producer的message就相当于只写入了一次。
有人也许会有疑问,一个producer怎么会写入多次呢?我们看一个简单的例子,比如说你写入了一个message到leader,然后leader会把这个message replicate到不同的followers,所有的followers都成功replicate了,leader准备发送成功的response给producer了,但就在这个时候leader crash了,所以这个producer的写操作就收到了一个timeout的错误,你为了保证这个message写入成功,就做了一次retry,很可惜因为之前的message其实已经写入成功了,所以你这次的retry就产生了一个不好的后果,那就是有了重复了message。也许你说那我不retry就好了,不retry你就会有丢失message的风险。所以有时为了保证at least once,就需要进行retry,而这在exactly once的世界里显然是不够的。
这个时候幂等producer的概念就出现了,它是如何实现的呢?其实idea也很简单,假如我们在写入message之前能够判断同样的message有没有被写入过就好了。要想实现这个功能,我们首先需要一个东西来判断两个message是否是同一个message,Kafka使用的是一个统一的producer ID加一个sequence number。这样就可以区分每个producer产生的message了。Kafka会默认保存收到的最近的5个message (每个partition),假如有新的message到来就可以进行比较,看是否已经接收过相应的message,假如已经收到过,就返回对应的error来reject这个message。如下图所示:
在这个图中,client(producer)发送pid=5,seq=0的message,然后一切ok,下一个message就会发送pid=5,seq=1的message,假如说broker收到了这个message,但是ack却因为各种原因没有能够返回给client端,这个时候client端会进行重试,它会再次发送pid=5,seq=1的message给broker,这个时候broker收到这个message就可以进行比较了,会发现原来这个message已经收到过了,所以就返回一个duplicate的response给client端,这样就可以解决重发的问题了。
那么Kafka中如何enable幂等producer呢?其实很简单,你只需要设置enable.idempotence=true即可。
Zombie的处理
现在假如producer fail了出现什么问题呢?一个producer fail之后,一般都会有一个新的producer来继续发送相关的message,但是这个新的producer会重新申请一个新的producer ID,也就是说这个时候假如我们发送同样的message,producer ID变了,当然sequence number也不会接着之前producer之后了,broker在收到message之后其实就很难再把这个message和之前的message进行区分了。
Kafka这里引入了一个transactional producer的概念,这是一个producer的config,同一个transaction中的每个producer会有一个相同的transactional.id,我们之前提到过producer虽然有producerId,但是这个producerId是随机分配的,所以出问题之后就很难区分是不是重复的message了。但是有了transactional.id之后,我们可以给producer设置成同一个tansactional.id,然后再维护一个transaction.id和producer.id的mapping关系,这样当一个producer挂了之后,新的producer因为有同样的transaction Id,我们就可以通过它来判断是否是同样的message。
这个时候还有一个问题就是新的producer运行之后,假如旧的有同样transaction id的producer回来之后怎么block这个producer的数据传输,其实在每次启动一个新的producer之后都会递增一个epoch的值,这样旧的回来之后它的epoch值比较小,所以会被拒绝。
Transaction
上面的幂等producer只是解决了一个写入的不重复问题,对于后续的处理,尤其是stream的处理则没有任何保证。Kafka中的transaction其实主要是为stream的process来服务的。所以它一般应用于“consume-process-produce”这样的pattern中。我们从一个例子来看看这里的transaction究竟可以解决什么问题。
假设我们有一个stream process的应用,它的功能也很简单就是从一个source topic读数据,然后处理这些数据,在把结果写到另外几个topic中。如下图所示,首先要从T1,T2,T3的topics中读取数据,然后进行process,并把它们写入到T4,T5,T6,然后有一个observer再读这几个topic。
我们来放大看这里的processor到observer,会发现这里其实一个到多个topics的写,我们的目标就是希望这几个topics的写是一个整体,要么写成功,要么写失败,对observer来说就像什么都没有发生。
一个最简单的思想如下图所示,就是我们在所有的topics的数据都写完成之后加一个mark(图中S所示),当observer看到这个mark的时候就表示它前面的数据是一起的,他们是一个transaction。所以observer可以等到它关心的topic上全部读到了这个mark,再进行一起处理。
正常情况下这个处理看起来是没有问题的,但是假如在某个中间时刻processor crash了该怎么办呢?如下图所示,这个时候我们可以看到几个topic中有了多个message还没有被mark,但是processor却crash了。
一个解决这个问题的方法就是当Processor重启之后,我们可以发送一个新的mark(图中R所示),用来告诉Observer这个mark和上次的S之间的数据都是不可用的,所以你直接丢弃就好了。
这里的S和R其实就是COMMIT和ABORT两种mark。我们下面的图会用C和A来代替他们。
这个时候我们再仔细一想,假如我们的Processor是在下图这种状态下crash的会发生什么?
这里和之前的Crash不一样的地方在于已经有一个Topic中有了Commit的mark,所以这个时候假如我们还按照原来的做法,在重启之后发送Abort的mark就会如下图所示:
你可以看到中间那个topic会有问题,因为我们只会认为C和A之间的数据是abort的,但是这里我们其实已经commit了之前的数据,它就相当于我们没有能够通过这个Abort的mark把之前的commit的数据去除掉。那怎么解决这个问题呢?
这里会引用two-phase commit的想法来进行解决,我们可以加入了一个transaction log的概念,每次想要commit和abort之前,先写入你想要做commit或者abort,然后再把相应的mark写到对应的数据topic,最后再把完成的commit或者abort写到transaction log中。
我们来看一个正常的情况,首先processor会往transaction log里面写一个ongoing的log,表示正在写各个topic的数据,当transaction完成的时候,准备写commit mark之前,会往transaction log中写入一条prepare commit的log,然后再开始写commit的mark,当所有的commit mark都写好了,会再插入一条complete commit到transaction log中。
在processor重启之后,它会首先看之前最后一次的操作是什么,像我们上面提到的case,其实它是写了topic2的commit但是别的commit还没有写的情况,那么在transaction log中的状态其实是prepare commit,当重启的processor看到这个log之后,就直接给所有的topic加一个commit的mark就好了,然后再写入complete commit到transaction log中。这样一来我们就解决了processor重启的问题了。
总结
本文大概从三个方面介绍了Kafka中是如何实现exactly once保证的,希望对你的学习有所帮助。
Recent Comments