消息隊列的性能好壞,其文件存儲機制設(shè)計是衡量一個消息隊列服務(wù)技術(shù)水平和最關(guān)鍵指標之一。下面將從Kafka文件存儲機制和物理結(jié)構(gòu)角度,分析Kafka是如何實現(xiàn)高效文件存儲,及實際應(yīng)用效果。
1.1 Kafka的特性:
- 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。
- 可擴展性:kafka集群支持熱擴展
- 持久性、可靠性:消息被持久化到本地磁盤,并且支持數(shù)據(jù)備份防止數(shù)據(jù)丟失
- 容錯性:允許集群中節(jié)點失敗(若副本數(shù)量為n,則允許n-1個節(jié)點失?。?/span>
- 高并發(fā):支持數(shù)千個客戶端同時讀寫
1.2 Kafka的使用場景:
- 日志收集:一個公司可以用Kafka可以收集各種服務(wù)的log,通過kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
- 消息系統(tǒng):解耦和生產(chǎn)者和消費者、緩存消息等。
- 用戶活動跟蹤:Kafka經(jīng)常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網(wǎng)頁、搜索、點擊等活動,這些活動信息被各個服務(wù)器發(fā)布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉庫中做離線分析和挖掘。
- 運營指標:Kafka也經(jīng)常用來記錄運營監(jiān)控數(shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報警和報告。
- 流式處理:比如spark streaming和storm
- 事件源
1.3 Kakfa的設(shè)計思想
- Kakfa Broker Leader的選舉:Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker節(jié)點一起去Zookeeper上注冊一個臨時節(jié)點,因為只有一個Kafka
Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節(jié)點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper注冊Watch)。這個Controller會監(jiān)聽其他的Kafka Broker的所有信息,如果這個kafka broker controller宕機了,在zookeeper上面的那個臨時節(jié)點就會消失,此時所有的kafka
broker又會一起去Zookeeper上注冊一個臨時節(jié)點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節(jié)點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。例如:一旦有一個broker宕機了,這個kafka
broker controller會讀取該宕機broker上所有的partition在zookeeper上的狀態(tài),并選取ISR列表中的一個replica作為partition leader(如果ISR列表中的replica全掛,選一個幸存的replica作為leader; 如果該partition的所有的replica都宕機了,則將新的leader設(shè)置為-1,等待恢復(fù),等待ISR中的任一個Replica“活”過來,并且選它作為Leader;或選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader),這個broker宕機的事情,kafka
controller也會通知zookeeper,zookeeper就會通知其他的kafka broker。
這里曾經(jīng)發(fā)生過一個bug,TalkingData使用Kafka0.8.1的時候,kafka controller在Zookeeper上注冊成功后,它和Zookeeper通信的timeout時間是6s,也就是如果kafka controller如果有6s中沒有和Zookeeper做心跳,那么Zookeeper就認為這個kafka controller已經(jīng)死了,就會在Zookeeper上把這個臨時節(jié)點刪掉,那么其他Kafka就會認為controller已經(jīng)沒了,就會再次搶著注冊臨時節(jié)點,注冊成功的那個kafka
broker成為controller,然后,之前的那個kafka controller就需要各種shut down去關(guān)閉各種節(jié)點和事件的監(jiān)聽。但是當(dāng)kafka的讀寫流量都非常巨大的時候,TalkingData的一個bug是,由于網(wǎng)絡(luò)等原因,kafka controller和Zookeeper有6s中沒有通信,于是重新選舉出了一個新的kafka controller,但是原來的controller在shut down的時候總是不成功,這個時候producer進來的message由于Kafka集群中存在兩個kafka
controller而無法落地。導(dǎo)致數(shù)據(jù)淤積。
這里曾經(jīng)還有一個bug,TalkingData使用Kafka0.8.1的時候,當(dāng)ack=0的時候,表示producer發(fā)送出去message,只要對應(yīng)的kafka broker topic partition leader接收到的這條message,producer就返回成功,不管partition
leader 是否真的成功把message真正存到kafka。當(dāng)ack=1的時候,表示producer發(fā)送出去message,同步的把message存到對應(yīng)topic的partition的leader上,然后producer就返回成功,partition leader異步的把message同步到其他partition replica上。當(dāng)ack=all或-1,表示producer發(fā)送出去message,同步的把message存到對應(yīng)topic的partition的leader和對應(yīng)的replica上之后,才返回成功。但是如果某個kafka
controller 切換的時候,會導(dǎo)致partition leader的切換(老的 kafka controller上面的partition leader會選舉到其他的kafka broker上),但是這樣就會導(dǎo)致丟數(shù)據(jù)。
- Consumergroup:各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer
group )中的一個consumer(consumer 線程)消費,如果一個message可以被多個consumer(consumer
線程)消費的話,那么這些consumer必須在不同的組。Kafka不支持一個partition中的message由兩個或兩個以上的同一個consumer group下的consumer thread來處理,除非再啟動一個新的consumer
group。所以如果想同時對一個topic做消費的話,啟動多個consumer group就可以了,但是要注意的是,這里的多個consumer的消費都必須是順序讀取partition里面的message,新啟動的consumer默認從partition隊列最頭端最新的地方開始阻塞的讀message。它不能像AMQ那樣可以多個BET作為consumer去互斥的(for update悲觀鎖)并發(fā)處理message,這是因為多個BET去消費一個Queue中的數(shù)據(jù)的時候,由于要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for
update),這就導(dǎo)致了consume的性能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許同一個consumer group下的一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數(shù)量來橫向擴展,那么再加新的consumer thread去消費。如果想多個不同的業(yè)務(wù)都需要這個topic的數(shù)據(jù),起多個consumer group就好了,大家都是順序的讀取message,offsite的值互不影響。這樣沒有鎖競爭,充分發(fā)揮了橫向的擴展性,吞吐量極高。這也就形成了分布式消費的概念。
當(dāng)啟動一個consumer group去消費一個topic的時候,無論topic里面有多個少個partition,無論我們consumer group里面配置了多少個consumer thread,這個consumer group下面的所有consumer thread一定會消費全部的partition;即便這個consumer group下只有一個consumer
thread,那么這個consumer thread也會去消費所有的partition。因此,最優(yōu)的設(shè)計就是,consumer group下的consumer thread的數(shù)量等于partition數(shù)量,這樣效率是最高的。
同一partition的一條message只能被同一個Consumer Group內(nèi)的一個Consumer消費。不能夠一個consumer group的多個consumer同時消費一個partition。
一個consumer group下,無論有多少個consumer,這個consumer group一定回去把這個topic下所有的partition都消費了。當(dāng)consumer group里面的consumer數(shù)量小于這個topic下的partition數(shù)量的時候,如下圖groupA,groupB,就會出現(xiàn)一個conusmer thread消費多個partition的情況,總之是這個topic下的partition都會被消費。如果consumer
group里面的consumer數(shù)量等于這個topic下的partition數(shù)量的時候,如下圖groupC,此時效率是最高的,每個partition都有一個consumer thread去消費。當(dāng)consumer group里面的consumer數(shù)量大于這個topic下的partition數(shù)量的時候,如下圖GroupD,就會有一個consumer thread空閑。因此,我們在設(shè)定consumer
group的時候,只需要指明里面有幾個consumer數(shù)量即可,無需指定對應(yīng)的消費partition序號,consumer會自動進行rebalance。
多個Consumer Group下的consumer可以消費同一條message,但是這種消費也是以o(1)的方式順序的讀取message去消費,,所以一定會重復(fù)消費這批message的,不能向AMQ那樣多個BET作為consumer消費(對message加鎖,消費的時候不能重復(fù)消費message)
- Consumer Rebalance的觸發(fā)條件:(1)Consumer增加或刪除會觸發(fā) Consumer Group的Rebalance(2)Broker的增加或者減少都會觸發(fā) Consumer
Rebalance
- Consumer: Consumer處理partition里面的message的時候是o(1)順序讀取的。所以必須維護著上一次讀到哪里的offsite信息。high level API,offset存于Zookeeper中,low level API的offset由自己維護。一般來說都是使用high level api的。Consumer的delivery
gurarantee,默認是讀完message先commmit再處理message,autocommit默認是true,這時候先commit就會更新offsite+1,一旦處理失敗,offsite已經(jīng)+1,這個時候就會丟message;也可以配置成讀完消息處理再commit,這種情況下consumer端的響應(yīng)就會比較慢的,需要等處理完才行。
一般情況下,一定是一個consumer group處理一個topic的message。Best Practice是這個consumer group里面consumer的數(shù)量等于topic里面partition的數(shù)量,這樣效率是最高的,一個consumer thread處理一個partition。如果這個consumer group里面consumer的數(shù)量小于topic里面partition的數(shù)量,就會有consumer thread同時處理多個partition(這個是kafka自動的機制,我們不用指定),但是總之這個topic里面的所有partition都會被處理到的。。如果這個consumer
group里面consumer的數(shù)量大于topic里面partition的數(shù)量,多出的consumer thread就會閑著啥也不干,剩下的是一個consumer thread處理一個partition,這就造成了資源的浪費,因為一個partition不可能被兩個consumer thread去處理。所以我們線上的分布式多個service服務(wù),每個service里面的kafka consumer數(shù)量都小于對應(yīng)的topic的partition數(shù)量,但是所有服務(wù)的consumer數(shù)量只和等于partition的數(shù)量,這是因為分布式service服務(wù)的所有consumer都來自一個consumer
group,如果來自不同的consumer group就會處理重復(fù)的message了(同一個consumer group下的consumer不能處理同一個partition,不同的consumer group可以處理同一個topic,那么都是順序處理message,一定會處理重復(fù)的。一般這種情況都是兩個不同的業(yè)務(wù)邏輯,才會啟動兩個consumer group來處理一個topic)。
如果producer的流量增大,當(dāng)前的topic的parition數(shù)量=consumer數(shù)量,這時候的應(yīng)對方式就是很想擴展:增加topic下的partition,同時增加這個consumer group下的consumer。
- Delivery Mode : Kafka producer 發(fā)送message不用維護message的offsite信息,因為這個時候,offsite就相當(dāng)于一個自增id,producer就盡管發(fā)送message就好了。而且Kafka與AMQ不同,AMQ大都用在處理業(yè)務(wù)邏輯上,而Kafka大都是日志,所以Kafka的producer一般都是大批量的batch發(fā)送message,向這個topic一次性發(fā)送一大批message,load
balance到一個partition上,一起插進去,offsite作為自增id自己增加就好。但是Consumer端是需要維護這個partition當(dāng)前消費到哪個message的offsite信息的,這個offsite信息,high level api是維護在Zookeeper上,low level api是自己的程序維護。(Kafka管理界面上只能顯示high level api的consumer部分,因為low level api的partition offsite信息是程序自己維護,kafka是不知道的,無法在管理界面上展示
)當(dāng)使用high level api的時候,先拿message處理,再定時自動commit offsite+1(也可以改成手動), 并且kakfa處理message是沒有鎖操作的。因此如果處理message失敗,此時還沒有commit offsite+1,當(dāng)consumer thread重啟后會重復(fù)消費這個message。但是作為高吞吐量高并發(fā)的實時處理系統(tǒng),at least once的情況下,至少一次會被處理到,是可以容忍的。如果無法容忍,就得使用low level api來自己程序維護這個offsite信息,那么想什么時候commit
offsite+1就自己搞定了。
- Topic & Partition:Topic相當(dāng)于傳統(tǒng)消息系統(tǒng)MQ中的一個隊列queue,producer端發(fā)送的message必須指定是發(fā)送到哪個topic,但是不需要指定topic下的哪個partition,因為kafka會把收到的message進行l(wèi)oad balance,均勻的分布在這個topic下的不同的partition上(
hash(message) % [broker數(shù)量] )。物理上存儲上,這個topic會分成一個或多個partition,每個partiton相當(dāng)于是一個子queue。在物理結(jié)構(gòu)上,每個partition對應(yīng)一個物理的目錄(文件夾),文件夾命名是[topicname]_[partition]_[序號],一個topic可以有無數(shù)多的partition,根據(jù)業(yè)務(wù)需求和數(shù)據(jù)量來設(shè)置。在kafka配置文件中可隨時更高num.partitions參數(shù)來配置更改topic的partition數(shù)量,在創(chuàng)建Topic時通過參數(shù)指定parittion數(shù)量。Topic創(chuàng)建之后通過Kafka提供的工具也可以修改partiton數(shù)量。
一般來說,(1)一個Topic的Partition數(shù)量大于等于Broker的數(shù)量,可以提高吞吐率。(2)同一個Partition的Replica盡量分散到不同的機器,高可用。
當(dāng)add a new partition的時候,partition里面的message不會重新進行分配,原來的partition里面的message數(shù)據(jù)不會變,新加的這個partition剛開始是空的,隨后進入這個topic的message就會重新參與所有partition的load balance
- Partition Replica:每個partition可以在其他的kafka broker節(jié)點上存副本,以便某個kafka broker節(jié)點宕機不會影響這個kafka集群。存replica副本的方式是按照kafka broker的順序存。例如有5個kafka broker節(jié)點,某個topic有3個partition,每個partition存2個副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此類推(replica副本數(shù)目不能大于kafka
broker節(jié)點的數(shù)目,否則報錯。這里的replica數(shù)其實就是partition的副本總數(shù),其中包括一個leader,其他的就是copy副本)。這樣如果某個broker宕機,其實整個kafka內(nèi)數(shù)據(jù)依然是完整的。但是,replica副本數(shù)越高,系統(tǒng)雖然越穩(wěn)定,但是回來帶資源和性能上的下降;replica副本少的話,也會造成系統(tǒng)丟數(shù)據(jù)的風(fēng)險。
(1)怎樣傳送消息:producer先把message發(fā)送到partition leader,再由leader發(fā)送給其他partition follower。(如果讓producer發(fā)送給每個replica那就太慢了)
(2)在向Producer發(fā)送ACK前需要保證有多少個Replica已經(jīng)收到該消息:根據(jù)ack配的個數(shù)而定
(3)怎樣處理某個Replica不工作的情況:如果這個部工作的partition replica不在ack列表中,就是producer在發(fā)送消息到partition leader上,partition leader向partition
follower發(fā)送message沒有響應(yīng)而已,這個不會影響整個系統(tǒng),也不會有什么問題。如果這個不工作的partition replica在ack列表中的話,producer發(fā)送的message的時候會等待這個不工作的partition replca寫message成功,但是會等到time out,然后返回失敗因為某個ack列表中的partition replica沒有響應(yīng),此時kafka會自動的把這個部工作的partition replica從ack列表中移除,以后的producer發(fā)送message的時候就不會有這個ack列表下的這個部工作的partition
replica了。
(4)怎樣處理Failed Replica恢復(fù)回來的情況:如果這個partition replica之前不在ack列表中,那么啟動后重新受Zookeeper管理即可,之后producer發(fā)送message的時候,partition
leader會繼續(xù)發(fā)送message到這個partition follower上。如果這個partition replica之前在ack列表中,此時重啟后,需要把這個partition replica再手動加到ack列表中。(ack列表是手動添加的,出現(xiàn)某個部工作的partition replica的時候自動從ack列表中移除的)
- Partition leader與follower:partition也有l(wèi)eader和follower之分。leader是主partition,producer寫kafka的時候先寫partition leader,再由partition leader push給其他的partition follower。partition leader與follower的信息受Zookeeper控制,一旦partition
leader所在的broker節(jié)點宕機,zookeeper會沖其他的broker的partition follower上選擇follower變?yōu)閜arition leader。
- Topic分配partition和partition replica的算法:(1)將Broker(size=n)和待分配的Partition排序。(2)將第i個Partition分配到第(i%n)個Broker上。(3)將第i個Partition的第j個Replica分配到第((i
+ j) % n)個Broker上
- 消息投遞可靠性
一個消息如何算投遞成功,Kafka提供了三種模式:
- 第一種是啥都不管,發(fā)送出去就當(dāng)作成功,這種情況當(dāng)然不能保證消息成功投遞到broker;
- 第二種是Master-Slave模型,只有當(dāng)Master和所有Slave都接收到消息時,才算投遞成功,這種模型提供了最高的投遞可靠性,但是損傷了性能;
- 第三種模型,即只要Master確認收到消息就算投遞成功;實際使用時,根據(jù)應(yīng)用特性選擇,絕大多數(shù)情況下都會中和可靠性和性能選擇第三種模型
消息在broker上的可靠性,因為消息會持久化到磁盤上,所以如果正常stop一個broker,其上的數(shù)據(jù)不會丟失;但是如果不正常stop,可能會使存在頁面緩存來不及寫入磁盤的消息丟失,這可以通過配置flush頁面緩存的周期、閾值緩解,但是同樣會頻繁的寫磁盤會影響性能,又是一個選擇題,根據(jù)實際情況配置。
消息消費的可靠性,Kafka提供的是“At least once”模型,因為消息的讀取進度由offset提供,offset可以由消費者自己維護也可以維護在zookeeper里,但是當(dāng)消息消費后consumer掛掉,offset沒有即時寫回,就有可能發(fā)生重復(fù)讀的情況,這種情況同樣可以通過調(diào)整commit offset周期、閾值緩解,甚至消費者自己把消費和commit offset做成一個事務(wù)解決,但是如果你的應(yīng)用不在乎重復(fù)消費,那就干脆不要解決,以換取最大的性能。
- Partition ack:當(dāng)ack=1,表示producer寫partition leader成功后,broker就返回成功,無論其他的partition follower是否寫成功。當(dāng)ack=2,表示producer寫partition leader和其他一個follower成功的時候,broker就返回成功,無論其他的partition
follower是否寫成功。當(dāng)ack=-1[parition的數(shù)量]的時候,表示只有producer全部寫成功的時候,才算成功,kafka broker才返回成功信息。這里需要注意的是,如果ack=1的時候,一旦有個broker宕機導(dǎo)致partition的follower和leader切換,會導(dǎo)致丟數(shù)據(jù)。
- message狀態(tài):在Kafka中,消息的狀態(tài)被保存在consumer中,broker不會關(guān)心哪個消息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味著如果consumer處理不好的話,broker上的一個消息可能會被消費多次。
- message持久化:Kafka中會把消息持久化到本地文件系統(tǒng)中,并且保持o(1)極高的效率。我們眾所周知IO讀取是非常耗資源的性能也是最慢的,這就是為了數(shù)據(jù)庫的瓶頸經(jīng)常在IO上,需要換SSD硬盤的原因。但是Kafka作為吞吐量極高的MQ,卻可以非常高效的message持久化到文件。這是因為Kafka是順序?qū)懭雘(1)的時間復(fù)雜度,速度非???。也是高吞吐量的原因。由于message的寫入持久化是順序?qū)懭氲?,因此message在被消費的時候也是按順序被消費的,保證partition的message是順序消費的。一般的機器,單機每秒100k條數(shù)據(jù)。
- message有效期:Kafka會長久保留其中的消息,以便consumer可以多次消費,當(dāng)然其中很多細節(jié)是可配置的。
- Produer : Producer向Topic發(fā)送message,不需要指定partition,直接發(fā)送就好了。kafka通過partition ack來控制是否發(fā)送成功并把信息返回給producer,producer可以有任意多的thread,這些kafka服務(wù)器端是不care的。Producer端的delivery guarantee默認是At
least once的。也可以設(shè)置Producer異步發(fā)送實現(xiàn)At most once。Producer可以用主鍵冪等性實現(xiàn)Exactly once
- Kafka高吞吐量: Kafka的高吞吐量體現(xiàn)在讀寫上,分布式并發(fā)的讀和寫都非常快,寫的性能體現(xiàn)在以o(1)的時間復(fù)雜度進行順序?qū)懭?。讀的性能體現(xiàn)在以o(1)的時間復(fù)雜度進行順序讀取, 對topic進行partition分區(qū),consume
group中的consume線程可以以很高能性能進行順序讀。
- Kafka delivery guarantee(message傳送保證):(1)At most once消息可能會丟,絕對不會重復(fù)傳輸;(2)At least once 消息絕對不會丟,但是可能會重復(fù)傳輸;(3)Exactly once每條信息肯定會被傳輸一次且僅傳輸一次,這是用戶想要的。
- 批量發(fā)送:Kafka支持以消息集合為單位進行批量發(fā)送,以提高push效率。
- push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,兩者對消息的生產(chǎn)和消費是異步的。
- Kafka集群中broker之間的關(guān)系:不是主從關(guān)系,各個broker在集群中地位一樣,我們可以隨意的增加或刪除任何一個broker節(jié)點。
- 負載均衡方面: Kafka提供了一個 metadata API來管理broker之間的負載(對Kafka0.8.x而言,對于0.7.x主要靠zookeeper來實現(xiàn)負載均衡)。
- 同步異步:Producer采用異步push方式,極大提高Kafka系統(tǒng)的吞吐率(可以通過參數(shù)控制是采用同步還是異步方式)。
- 分區(qū)機制partition:Kafka的broker端支持消息分區(qū)partition,Producer可以決定把消息發(fā)到哪個partition,在一個partition 中message的順序就是Producer發(fā)送消息的順序,一個topic中可以有多個partition,具體partition的數(shù)量是可配置的。partition的概念使得kafka作為MQ可以橫向擴展,吞吐量巨大。partition可以設(shè)置replica副本,replica副本存在不同的kafka
broker節(jié)點上,第一個partition是leader,其他的是follower,message先寫到partition leader上,再由partition leader push到parition follower上。所以說kafka可以水平擴展,也就是擴展partition。
- 離線數(shù)據(jù)裝載:Kafka由于對可拓展的數(shù)據(jù)持久化的支持,它也非常適合向Hadoop或者數(shù)據(jù)倉庫中進行數(shù)據(jù)裝載。
- 實時數(shù)據(jù)與離線數(shù)據(jù):kafka既支持離線數(shù)據(jù)也支持實時數(shù)據(jù),因為kafka的message持久化到文件,并可以設(shè)置有效期,因此可以把kafka作為一個高效的存儲來使用,可以作為離線數(shù)據(jù)供后面的分析。當(dāng)然作為分布式實時消息系統(tǒng),大多數(shù)情況下還是用于實時的數(shù)據(jù)處理的,但是當(dāng)cosumer消費能力下降的時候可以通過message的持久化在淤積數(shù)據(jù)在kafka。
- 插件支持:現(xiàn)在不少活躍的社區(qū)已經(jīng)開發(fā)出不少插件來拓展Kafka的功能,如用來配合Storm、Hadoop、flume相關(guān)的插件。
- 解耦: 相當(dāng)于一個MQ,使得Producer和Consumer之間異步的操作,系統(tǒng)之間解耦
- 冗余: replica有多個副本,保證一個broker node宕機后不會影響整個服務(wù)
- 擴展性: broker節(jié)點可以水平擴展,partition也可以水平增加,partition replica也可以水平增加
- 峰值: 在訪問量劇增的情況下,kafka水平擴展, 應(yīng)用仍然需要繼續(xù)發(fā)揮作用
- 可恢復(fù)性: 系統(tǒng)的一部分組件失效時,由于有partition的replica副本,不會影響到整個系統(tǒng)。
- 順序保證性:由于kafka的producer的寫message與consumer去讀message都是順序的讀寫,保證了高效的性能。
- 緩沖:由于producer那面可能業(yè)務(wù)很簡單,而后端consumer業(yè)務(wù)會很復(fù)雜并有數(shù)據(jù)庫的操作,因此肯定是producer會比consumer處理速度快,如果沒有kafka,producer直接調(diào)用consumer,那么就會造成整個系統(tǒng)的處理速度慢,加一層kafka作為MQ,可以起到緩沖的作用。
- 異步通信:作為MQ,Producer與Consumer異步通信
1.持久化
kafka使用文件存儲消息(append only log),這就直接決定kafka在性能上嚴重依賴文件系統(tǒng)的本身特性.且無論任何OS下,對文件系統(tǒng)本身的優(yōu)化是非常艱難的.文件緩存/直接內(nèi)存映射等是常用的手段.因為kafka是對日志文件進行append操作,因此磁盤檢索的開支是較小的;同時為了減少磁盤寫入的次數(shù),broker會將消息暫時buffer起來,當(dāng)消息的個數(shù)(或尺寸)達到一定閥值時,再flush到磁盤,這樣減少了磁盤IO調(diào)用的次數(shù).對于kafka而言,較高性能的磁盤,將會帶來更加直接的性能提升.
2.性能
除磁盤IO之外,我們還需要考慮網(wǎng)絡(luò)IO,這直接關(guān)系到kafka的吞吐量問題.kafka并沒有提供太多高超的技巧;對于producer端,可以將消息buffer起來,當(dāng)消息的條數(shù)達到一定閥值時,批量發(fā)送給broker;對于consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置文件來指定.對于kafka broker端,似乎有個sendfile系統(tǒng)調(diào)用可以潛在的提升網(wǎng)絡(luò)IO的性能:將文件的數(shù)據(jù)映射到系統(tǒng)內(nèi)存中,socket直接讀取相應(yīng)的內(nèi)存區(qū)域即可,而無需進程再次copy和交換(這里涉及到"磁盤IO數(shù)據(jù)"/"內(nèi)核內(nèi)存"/"進程內(nèi)存"/"網(wǎng)絡(luò)緩沖區(qū)",多者之間的數(shù)據(jù)copy).
其實對于producer/consumer/broker三者而言,CPU的開支應(yīng)該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對于kafka而言,網(wǎng)絡(luò)IO更應(yīng)該需要考慮.可以將任何在網(wǎng)絡(luò)上傳輸?shù)南⒍冀?jīng)過壓縮.kafka支持gzip/snappy等多種壓縮方式.
3.負載均衡
kafka集群中的任何一個broker,都可以向producer提供metadata信息,這些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(請參看zookeeper中的節(jié)點信息). 當(dāng)producer獲取到metadata信息之后, producer將會和Topic下所有partition leader保持socket連接;消息由producer直接通過socket發(fā)送到broker,中間不會經(jīng)過任何"路由層".
異步發(fā)送,將多條消息暫且在客戶端buffer起來,并將他們批量發(fā)送到broker;小數(shù)據(jù)IO太多,會拖慢整體的網(wǎng)絡(luò)延遲,批量延遲發(fā)送事實上提升了網(wǎng)絡(luò)效率;不過這也有一定的隱患,比如當(dāng)producer失效時,那些尚未發(fā)送的消息將會丟失。
4.Topic模型
其他JMS實現(xiàn),消息消費的位置是有prodiver保留,以便避免重復(fù)發(fā)送消息或者將沒有消費成功的消息重發(fā)等,同時還要控制消息的狀態(tài).這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態(tài)的控制,也沒有復(fù)雜的消息確認機制,可見kafka broker端是相當(dāng)輕量級的.當(dāng)消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并間歇性的向zookeeper注冊offset.由此可見,consumer客戶端也很輕量級。
kafka中consumer負責(zé)維護消息的消費記錄,而broker則不關(guān)心這些,這種設(shè)計不僅提高了consumer端的靈活性,也適度的減輕了broker端設(shè)計的復(fù)雜度;這是和眾多JMS prodiver的區(qū)別.此外,kafka中消息ACK的設(shè)計也和JMS有很大不同,kafka中的消息是批量(通常以消息的條數(shù)或者chunk的尺寸為單位)發(fā)送給consumer,當(dāng)消息消費成功后,向zookeeper提交消息的offset,而不會向broker交付ACK.或許你已經(jīng)意識到,這種"寬松"的設(shè)計,將會有"丟失"消息/"消息重發(fā)"的危險.
5.消息傳輸一致
Kafka提供3種消息傳輸一致性語義:最多1次,最少1次,恰好1次。
最少1次:可能會重傳數(shù)據(jù),有可能出現(xiàn)數(shù)據(jù)被重復(fù)處理的情況;
最多1次:可能會出現(xiàn)數(shù)據(jù)丟失情況;
恰好1次:并不是指真正只傳輸1次,只不過有一個機制。確保不會出現(xiàn)“數(shù)據(jù)被重復(fù)處理”和“數(shù)據(jù)丟失”的情況。
at most once: 消費者fetch消息,然后保存offset,然后處理消息;當(dāng)client保存offset之后,但是在消息處理過程中consumer進程失效(crash),導(dǎo)致部分消息未能繼續(xù)處理.那么此后可能其他consumer會接管,但是因為offset已經(jīng)提前保存,那么新的consumer將不能fetch到offset之前的消息(盡管它們尚沒有被處理),這就是"at most once".
at least once: 消費者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異?;蛘遚onsumer失效,導(dǎo)致保存offset操作未能執(zhí)行成功,這就導(dǎo)致接下來再次fetch時可能獲得上次已經(jīng)處理過的消息,這就是"at least once".
"Kafka Cluster"到消費者的場景中可以采取以下方案來得到“恰好1次”的一致性語義:
最少1次+消費者的輸出中額外增加已處理消息最大編號:由于已處理消息最大編號的存在,不會出現(xiàn)重復(fù)處理消息的情況。
6.副本
kafka中,replication策略是基于partition,而不是topic;kafka將每個partition數(shù)據(jù)復(fù)制到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數(shù)可以通過broker配置文件來設(shè)定。leader處理所有的read-write請求,follower需要和leader保持同步.Follower就像一個"consumer",消費消息并保存在本地日志中;leader負責(zé)跟蹤所有的follower狀態(tài),如果follower"落后"太多或者失效,leader將會把它從replicas同步列表中刪除.當(dāng)所有的follower都將一條消息保存成功,此消息才被認為是"committed",那么此時consumer才能消費它,這種同步策略,就要求follower和leader之間必須具有良好的網(wǎng)絡(luò)環(huán)境.即使只有一個replicas實例存活,仍然可以保證消息的正常發(fā)送和接收,只要zookeeper集群存活即可.
選擇follower時需要兼顧一個問題,就是新leader server上所已經(jīng)承載的partition leader的個數(shù),如果一個server上有過多的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要考慮到"負載均衡",partition leader較少的broker將會更有可能成為新的leader.
7.log
每個log entry格式為"4個字節(jié)的數(shù)字N表示消息的長度" + "N個字節(jié)的消息內(nèi)容";每個日志都有一個offset來唯一的標記一條消息,offset的值為8個字節(jié)的數(shù)字,表示此消息在此partition中所處的起始位置..每個partition在物理存儲層面,有多個log file組成(稱為segment).segment file的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
獲取消息時,需要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大獲取消息的總長度(間接的表示消息的條數(shù)).根據(jù)offset,可以找到此消息所在segment文件,然后根據(jù)segment的最小offset取差值,得到它在file中的相對位置,直接讀取輸出即可.
8.分布式
kafka使用zookeeper來存儲一些meta信息,并使用了zookeeper watch機制來發(fā)現(xiàn)meta信息的變更并作出相應(yīng)的動作(比如consumer失效,觸發(fā)負載均衡等)
Broker node registry: 當(dāng)一個kafka broker啟動后,首先會向zookeeper注冊自己的節(jié)點信息(臨時znode),同時當(dāng)broker和zookeeper斷開連接時,此znode也會被刪除.
Broker Topic Registry: 當(dāng)一個broker啟動時,會向zookeeper注冊自己持有的topic和partitions信息,仍然是一個臨時znode.
Consumer and Consumer group: 每個consumer客戶端被創(chuàng)建時,會向zookeeper注冊自己的信息;此作用主要是為了"負載均衡".一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了性能考慮,讓partition相對均衡的分散到每個consumer上.
Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置文件指定,也可以由系統(tǒng)生成),此id用來標記消費者信息.
Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.此znode為持久節(jié)點,可以看出offset跟group_id有關(guān),以表明當(dāng)group中一個消費者失效,其他consumer可以繼續(xù)消費.
Partition Owner registry: 用來標記partition正在被哪個consumer消費.臨時znode。此節(jié)點表達了"一個partition"只能被group下一個consumer消費,同時當(dāng)group下某個consumer失效,那么將會觸發(fā)負載均衡(即:讓partitions在多個consumer間均衡消費,接管那些"游離"的partitions)
當(dāng)consumer啟動時,所觸發(fā)的操作:
A) 首先進行"Consumer id Registry";
B) 然后在"Consumer id Registry"節(jié)點下注冊一個watch用來監(jiān)聽當(dāng)前group中其他consumer的"leave"和"join";只要此znode path下節(jié)點列表變更,都會觸發(fā)此group下consumer的負載均衡.(比如一個consumer失效,那么其他consumer接管partitions).
C) 在"Broker id registry"節(jié)點下,注冊一個watch用來監(jiān)聽broker的存活情況;如果broker列表變更,將會觸發(fā)所有的groups下的consumer重新balance.
總結(jié):
1) Producer端使用zookeeper用來"發(fā)現(xiàn)"broker列表,以及和Topic下每個partition leader建立socket連接并發(fā)送消息.
2) Broker端使用zookeeper用來注冊broker信息,已經(jīng)監(jiān)測partition leader存活性.
3) Consumer端使用zookeeper用來注冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發(fā)現(xiàn)broker列表,并和partition leader建立socket連接,并獲取消息。
9.Leader的選擇
Kafka的核心是日志文件,日志文件在集群中的同步是分布式數(shù)據(jù)系統(tǒng)最基礎(chǔ)的要素。
如果leaders永遠不會down的話我們就不需要followers了!一旦leader down掉了,需要在followers中選擇一個新的leader.但是followers本身有可能延時太久或者crash,所以必須選擇高質(zhì)量的follower作為leader.必須保證,一旦一個消息被提交了,但是leader down掉了,新選出的leader必須可以提供這條消息。大部分的分布式系統(tǒng)采用了多數(shù)投票法則選擇新的leader,對于多數(shù)投票法則,就是根據(jù)所有副本節(jié)點的狀況動態(tài)的選擇最適合的作為leader.Kafka并不是使用這種方法。
Kafka動態(tài)維護了一個同步狀態(tài)的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節(jié)點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每個節(jié)點讀取并追加到日志中了,才回通知外部這個消息已經(jīng)被提交了。因此這個集合中的任何一個節(jié)點隨時都可以被選為leader.ISR在ZooKeeper中維護。ISR中有f+1個節(jié)點,就可以允許在f個節(jié)點down掉的情況下不會丟失消息并正常提供服。ISR的成員是動態(tài)的,如果一個節(jié)點被淘汰了,當(dāng)它重新達到“同步中”的狀態(tài)時,他可以重新加入ISR.這種leader的選擇方式是非??焖俚?,適合kafka的應(yīng)用場景。
一個邪惡的想法:如果所有節(jié)點都down掉了怎么辦?Kafka對于數(shù)據(jù)不會丟失的保證,是基于至少一個節(jié)點是存活的,一旦所有節(jié)點都down了,這個就不能保證了。
實際應(yīng)用中,當(dāng)所有的副本都down掉時,必須及時作出反應(yīng)??梢杂幸韵聝煞N選擇:
1. 等待ISR中的任何一個節(jié)點恢復(fù)并擔(dān)任leader。
2. 選擇所有節(jié)點中(不只是ISR)第一個恢復(fù)的節(jié)點作為leader.
這是一個在可用性和連續(xù)性之間的權(quán)衡。如果等待ISR中的節(jié)點恢復(fù),一旦ISR中的節(jié)點起不起來或者數(shù)據(jù)都是了,那集群就永遠恢復(fù)不了了。如果等待ISR意外的節(jié)點恢復(fù),這個節(jié)點的數(shù)據(jù)就會被作為線上數(shù)據(jù),有可能和真實的數(shù)據(jù)有所出入,因為有些數(shù)據(jù)它可能還沒同步到。Kafka目前選擇了第二種策略,在未來的版本中將使這個策略的選擇可配置,可以根據(jù)場景靈活的選擇。
這種窘境不只Kafka會遇到,幾乎所有的分布式數(shù)據(jù)系統(tǒng)都會遇到。
10.副本管理
以上僅僅以一個topic一個分區(qū)為例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分區(qū).Kafka盡量的使所有分區(qū)均勻的分布到集群所有的節(jié)點上而不是集中在某些節(jié)點上,另外主從關(guān)系也盡量均衡這樣每個幾點都會擔(dān)任一定比例的分區(qū)的leader.
優(yōu)化leader的選擇過程也是很重要的,它決定了系統(tǒng)發(fā)生故障時的空窗期有多久。Kafka選擇一個節(jié)點作為“controller”,當(dāng)發(fā)現(xiàn)有節(jié)點down掉的時候它負責(zé)在游泳分區(qū)的所有節(jié)點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區(qū)節(jié)點的主從關(guān)系。如果controller down掉了,活著的節(jié)點中的一個會備切換為新的controller.
11.Leader與副本同步
對于某個分區(qū)來說,保存正分區(qū)的"broker"為該分區(qū)的"leader",保存?zhèn)浞莘謪^(qū)的"broker"為該分區(qū)的"follower"。備份分區(qū)會完全復(fù)制正分區(qū)的消息,包括消息的編號等附加屬性值。為了保持正分區(qū)和備份分區(qū)的內(nèi)容一致,Kafka采取的方案是在保存?zhèn)浞莘謪^(qū)的"broker"上開啟一個消費者進程進行消費,從而使得正分區(qū)的內(nèi)容與備份分區(qū)的內(nèi)容保持一致。一般情況下,一個分區(qū)有一個“正分區(qū)”和零到多個“備份分區(qū)”??梢耘渲谩罢謪^(qū)+備份分區(qū)”的總數(shù)量,關(guān)于這個配置,不同主題可以有不同的配置值。注意,生產(chǎn)者,消費者只與保存正分區(qū)的"leader"進行通信。
Kafka允許topic的分區(qū)擁有若干副本,這個數(shù)量是可以配置的,你可以為每個topic配置副本的數(shù)量。Kafka會自動在每個副本上備份數(shù)據(jù),所以當(dāng)一個節(jié)點down掉時數(shù)據(jù)依然是可用的。
Kafka的副本功能不是必須的,你可以配置只有一個副本,這樣其實就相當(dāng)于只有一份數(shù)據(jù)。
創(chuàng)建副本的單位是topic的分區(qū),每個分區(qū)都有一個leader和零或多個followers.所有的讀寫操作都由leader處理,一般分區(qū)的數(shù)量都比broker的數(shù)量多的多,各分區(qū)的leader均勻的分布在brokers中。所有的followers都復(fù)制leader的日志,日志中的消息和順序都和leader中的一致。followers向普通的consumer那樣從leader那里拉取消息并保存在自己的日志文件中。
許多分布式的消息系統(tǒng)自動的處理失敗的請求,它們對一個節(jié)點是否著(alive)”有著清晰的定義。Kafka判斷一個節(jié)點是否活著有兩個條件:
1. 節(jié)點必須可以維護和ZooKeeper的連接,Zookeeper通過心跳機制檢查每個節(jié)點的連接。
2. 如果節(jié)點是個follower,他必須能及時的同步leader的寫操作,延時不能太久。
符合以上條件的節(jié)點準確的說應(yīng)該是“同步中的(in sync)”,而不是模糊的說是“活著的”或是“失敗的”。Leader會追蹤所有“同步中”的節(jié)點,一旦一個down掉了,或是卡住了,或是延時太久,leader就會把它移除。至于延時多久算是“太久”,是由參數(shù)replica.lag.max.messages決定的,怎樣算是卡住了,怎是由參數(shù)replica.lag.time.max.ms決定的。
只有當(dāng)消息被所有的副本加入到日志中時,才算是“committed”,只有committed的消息才會發(fā)送給consumer,這樣就不用擔(dān)心一旦leader down掉了消息會丟失。Producer也可以選擇是否等待消息被提交的通知,這個是由參數(shù)acks決定的。
Kafka保證只要有一個“同步中”的節(jié)點,“committed”的消息就不會丟失。