Kafka是當(dāng)前分布式系統(tǒng)中最流行的消息中間件之一,憑借著其高吞吐量的設(shè)計(jì),在日志收集系統(tǒng)和消息系統(tǒng)的應(yīng)用場(chǎng)景中深得開(kāi)發(fā)者喜愛(ài)。本篇就聊聊Kafka相關(guān)的一些知識(shí)點(diǎn)。主要包括以下內(nèi)容: Kafka簡(jiǎn)介 Kafka特點(diǎn) Kafka基本概念 Kafka架構(gòu)
Kafka的幾個(gè)核心概念 分區(qū)Partition 復(fù)制Replication 消息發(fā)送 消費(fèi)者組 消費(fèi)偏移量
Kafka的工程應(yīng)用
Kafka簡(jiǎn)介Kafka特點(diǎn)Kafka是最初由Linkedin公司開(kāi)發(fā),是一個(gè)分布式、分區(qū)的、多副本的、多訂閱者,基于zookeeper協(xié)調(diào)的分布式日志系統(tǒng)(也可以當(dāng)做MQ系統(tǒng)),常見(jiàn)可以用于web/nginx日志、訪問(wèn)日志,消息服務(wù)等等,Linkedin于2010年貢獻(xiàn)給了Apache基金會(huì)并成為頂級(jí)開(kāi)源項(xiàng)目。相比于其他的消息隊(duì)列中間件,Kafka的主要設(shè)計(jì)目標(biāo),也即其特點(diǎn)如下: 以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問(wèn)性能。 高吞吐率。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100K條消息的傳輸。 支持Kafka Server間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè)partition內(nèi)的消息順序傳輸。 同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理。 Scale out:支持在線水平擴(kuò)展
Kafka基本概念Broker Kaka集群中的一臺(tái)或多臺(tái)服務(wù)器稱(chēng)為Broker。Broker存儲(chǔ)Topic的數(shù)據(jù)。 如果某topic有N個(gè)partition,集群有N個(gè)broker,那么每個(gè)broker存儲(chǔ)該topic的一個(gè)partition。 如果某topic有N個(gè)partition,集群有(N+M)個(gè)broker,那么其中有N個(gè)broker存儲(chǔ)該topic的一個(gè)partition,剩下的M個(gè)broker不存儲(chǔ)該topic的partition數(shù)據(jù)。 如果某topic有N個(gè)partition,集群中broker數(shù)目少于N個(gè),那么一個(gè)broker存儲(chǔ)該topic的一個(gè)或多個(gè)partition。在實(shí)際生產(chǎn)環(huán)境中,盡量避免這種情況的發(fā)生,這種情況容易導(dǎo)致Kafka集群數(shù)據(jù)不均衡。
Topic Partition 物理上的Topic分區(qū),一個(gè)Topic可以分為多個(gè)Partition,至少有一個(gè)Partition。 每個(gè)Partition中的數(shù)據(jù)使用多個(gè)segment文件存儲(chǔ),每個(gè)Partition都是一個(gè)有序的隊(duì)列,不同Partition間的數(shù)據(jù)是無(wú)序的。 Partition中的每條消息都會(huì)被分配一個(gè)有序的ID(即offset)。
Producer 消息和數(shù)據(jù)的生產(chǎn)者。Producer將消息發(fā)布到Kafka的topic中。 Broker接收到Producer發(fā)布的消息后,Broker將該消息追加到當(dāng)前用于追加數(shù)據(jù)的segment文件中。 Producer發(fā)送的消息,存儲(chǔ)到一個(gè)Partition中,Producer也可以指定數(shù)據(jù)存儲(chǔ)的Partition。
Consumer Consumer Group 每個(gè)消費(fèi)者都屬于一個(gè)特定的消費(fèi)者組。 可為每個(gè)Consumer指定group name,若不指定group name則屬于默認(rèn)的group。 一個(gè)Topic可以有多個(gè)消費(fèi)者組,Topic的消息會(huì)被復(fù)制到所有的消費(fèi)者組中,但每個(gè)消費(fèi)者組只會(huì)把消息發(fā)送給該組中的一個(gè)消費(fèi)者。 消費(fèi)者組是Kafka用來(lái)實(shí)現(xiàn)一個(gè)Topic消息的廣播和單播的手段。
Leader Follower Follower跟隨Leader,所有寫(xiě)請(qǐng)求都通過(guò)Leader路由,數(shù)據(jù)變更會(huì)廣播給所有Follower,F(xiàn)ollower與Leader保持?jǐn)?shù)據(jù)同步。 如果Leader失效,則從Follower中選舉出一個(gè)新的Leader。 如果Follower與Leader掛掉、卡住或同步太慢,Leader會(huì)把這個(gè)Follower從"in sync replicas"## 高吞吐量的分布式消息組件Kafka是如何工作的
Kafka是當(dāng)前分布式系統(tǒng)中最流行的消息中間件之一,憑借著其高吞吐量的設(shè)計(jì),在日志收集系統(tǒng)和消息系統(tǒng)的應(yīng)用場(chǎng)景中深得開(kāi)發(fā)者喜愛(ài)。本篇就聊聊Kafka相關(guān)的一些知識(shí)點(diǎn)。主要包括以下內(nèi)容: Kafka簡(jiǎn)介 Kafka特點(diǎn) Kafka基本概念 Kafka架構(gòu)
Kafka的幾個(gè)核心概念 分區(qū)Partition 復(fù)制Replication 消息發(fā)送 消費(fèi)者組 消費(fèi)偏移量
Kafka的工程應(yīng)用
Kafka簡(jiǎn)介Kafka特點(diǎn)Kafka是最初由Linkedin公司開(kāi)發(fā),是一個(gè)分布式、分區(qū)的、多副本的、多訂閱者,基于zookeeper協(xié)調(diào)的分布式日志系統(tǒng)(也可以當(dāng)做MQ系統(tǒng)),常見(jiàn)可以用于web/nginx日志、訪問(wèn)日志,消息服務(wù)等等,Linkedin于2010年貢獻(xiàn)給了Apache基金會(huì)并成為頂級(jí)開(kāi)源項(xiàng)目。相比于其他的消息隊(duì)列中間件,Kafka的主要設(shè)計(jì)目標(biāo),也即其特點(diǎn)如下: 以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問(wèn)性能。 高吞吐率。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100K條消息的傳輸。 支持Kafka Server間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè)partition內(nèi)的消息順序傳輸。 同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理。 Scale out:支持在線水平擴(kuò)展
Kafka基本概念Broker Kaka集群中的一臺(tái)或多臺(tái)服務(wù)器稱(chēng)為Broker。Broker存儲(chǔ)Topic的數(shù)據(jù)。 如果某topic有N個(gè)partition,集群有N個(gè)broker,那么每個(gè)broker存儲(chǔ)該topic的一個(gè)partition。 如果某topic有N個(gè)partition,集群有(N+M)個(gè)broker,那么其中有N個(gè)broker存儲(chǔ)該topic的一個(gè)partition,剩下的M個(gè)broker不存儲(chǔ)該topic的partition數(shù)據(jù)。 如果某topic有N個(gè)partition,集群中broker數(shù)目少于N個(gè),那么一個(gè)broker存儲(chǔ)該topic的一個(gè)或多個(gè)partition。在實(shí)際生產(chǎn)環(huán)境中,盡量避免這種情況的發(fā)生,這種情況容易導(dǎo)致Kafka集群數(shù)據(jù)不均衡。
Topic Partition 物理上的Topic分區(qū),一個(gè)Topic可以分為多個(gè)Partition,至少有一個(gè)Partition。 每個(gè)Partition中的數(shù)據(jù)使用多個(gè)segment文件存儲(chǔ),每個(gè)Partition都是一個(gè)有序的隊(duì)列,不同Partition間的數(shù)據(jù)是無(wú)序的。 Partition中的每條消息都會(huì)被分配一個(gè)有序的ID(即offset)。
Producer 消息和數(shù)據(jù)的生產(chǎn)者。Producer將消息發(fā)布到Kafka的topic中。 Broker接收到Producer發(fā)布的消息后,Broker將該消息追加到當(dāng)前用于追加數(shù)據(jù)的segment文件中。 Producer發(fā)送的消息,存儲(chǔ)到一個(gè)Partition中,Producer也可以指定數(shù)據(jù)存儲(chǔ)的Partition。
Consumer Consumer Group 每個(gè)消費(fèi)者都屬于一個(gè)特定的消費(fèi)者組。 可為每個(gè)Consumer指定group name,若不指定group name則屬于默認(rèn)的group。 一個(gè)Topic可以有多個(gè)消費(fèi)者組,Topic的消息會(huì)被復(fù)制到所有的消費(fèi)者組中,但每個(gè)消費(fèi)者組只會(huì)把消息發(fā)送給該組中的一個(gè)消費(fèi)者。 消費(fèi)者組是Kafka用來(lái)實(shí)現(xiàn)一個(gè)Topic消息的廣播和單播的手段。
Leader Follower Follower跟隨Leader,所有寫(xiě)請(qǐng)求都通過(guò)Leader路由,數(shù)據(jù)變更會(huì)廣播給所有Follower,F(xiàn)ollower與Leader保持?jǐn)?shù)據(jù)同步。 如果Leader失效,則從Follower中選舉出一個(gè)新的Leader。 如果Follower與Leader掛掉、卡住或同步太慢,Leader會(huì)把這個(gè)Follower從"in sync replicas"列表中刪除,重新創(chuàng)建一個(gè)Follower。
Kafka架構(gòu)Kafka一般以集群方式來(lái)部署,一個(gè)典型的Kafka集群架構(gòu)如下圖所示: Kafka的幾個(gè)核心概念分區(qū)Partition分區(qū)的幾個(gè)特點(diǎn) 分區(qū)是Kafka的基本存儲(chǔ)單元,在一個(gè)Topic中會(huì)有一個(gè)或多個(gè)Partition,不同的Partition可位于不同的服務(wù)器節(jié)點(diǎn)上,物理上一個(gè)Partition對(duì)應(yīng)于一個(gè)文件夾。 Partition內(nèi)包含一個(gè)或多個(gè)Segment,每個(gè)Segment又包含一個(gè)數(shù)據(jù)文件和一個(gè)與之對(duì)應(yīng)的索引文件。 對(duì)于寫(xiě)操作,每次只會(huì)寫(xiě)Partition內(nèi)的一個(gè)Segment;對(duì)于讀操作,也只會(huì)順序讀取同一個(gè)Partition內(nèi)的不同Segment。 邏輯上,可以把Partition當(dāng)做一個(gè)非常長(zhǎng)的數(shù)組,使用時(shí)通過(guò)這個(gè)數(shù)組的索引(offset)訪問(wèn)數(shù)據(jù)。
高吞吐量設(shè)計(jì)分區(qū)正是Kafka高吞吐量設(shè)計(jì)的方法之一,具體體現(xiàn)在這樣幾點(diǎn): 由于不同的Partition可位于不同的機(jī)器上,因此可以實(shí)現(xiàn)機(jī)器間的并行處理。 由于一個(gè)Partition對(duì)應(yīng)一個(gè)文件夾,多個(gè)Partition也可位于同一臺(tái)服務(wù)器上,這樣就可以在同一臺(tái)服務(wù)器上使不同的Partition對(duì)應(yīng)不同的磁盤(pán),實(shí)現(xiàn)磁盤(pán)間的并行處理。 故一般通過(guò)增加Partition的數(shù)量來(lái)提高系統(tǒng)的并行吞吐量,但也會(huì)增加輕微的延遲。
但以下這幾種情況需要注意: 當(dāng)一個(gè)Topic有多個(gè)消費(fèi)者時(shí),一個(gè)消息只會(huì)被一個(gè)消費(fèi)者組里的一個(gè)消費(fèi)者消費(fèi); 由于消息是以Partition為單位分配的,在不考慮Rebalance時(shí),同一個(gè)Partition的數(shù)據(jù)只會(huì)被一個(gè)消費(fèi)者消費(fèi),所以如果消費(fèi)者的數(shù)量多于Partition的數(shù)量,就會(huì)存在部分消費(fèi)者不能消費(fèi)該Topic的情況,此時(shí)再增加消費(fèi)者并不能提高系統(tǒng)的吞吐量; 在生產(chǎn)者和Broker的角度,對(duì)不同Partition的寫(xiě)操作是完全并行的,可是對(duì)于消費(fèi)者其并發(fā)數(shù)則取決于Partition的數(shù)量。實(shí)際中配置的Partition數(shù)量需要根據(jù)所設(shè)計(jì)的系統(tǒng)吞吐量來(lái)推算。
復(fù)制復(fù)制原理Kafka利用zookeeper來(lái)維護(hù)集群成員的信息,每個(gè)Broker實(shí)例都會(huì)被設(shè)置一個(gè)唯一的標(biāo)識(shí)符,Broker在啟動(dòng)時(shí)會(huì)通過(guò)創(chuàng)建臨時(shí)節(jié)點(diǎn)的方式把自己的唯一標(biāo)識(shí)注冊(cè)到zookeeper中,Kafka中的其他組件會(huì)監(jiān)視Zookeeper里的/broker/ids路徑,所以當(dāng)集群中有Broker加入或退出時(shí),其他組件就會(huì)收到通知。集群間數(shù)據(jù)的復(fù)制機(jī)制,在Kafka中是通過(guò)Zookeeper提供的leader選舉方式實(shí)現(xiàn)數(shù)據(jù)復(fù)制方案。基本原理是:首先選舉出一個(gè)leader,其他副本作為Follower,所有的寫(xiě)操作都先發(fā)給leader,然后再由leader把消息發(fā)給Follower。復(fù)制功能是Kafka架構(gòu)的核心之一,因?yàn)樗梢栽趥€(gè)別節(jié)點(diǎn)不可用時(shí)還能保證Kafka整體的可用性。Kafka中的復(fù)制操作也是針對(duì)分區(qū)的。一個(gè)分區(qū)有多個(gè)副本,副本被保存在Broker上,每個(gè)Broker都可以保存上千個(gè)屬于不同Topic和分區(qū)的副本。副本有兩種類(lèi)型: leader副本:每個(gè)分區(qū)都會(huì)有,所有生產(chǎn)者和消費(fèi)者的請(qǐng)求都會(huì)經(jīng)過(guò)leader; follower副本:不處理客戶端的請(qǐng)求,它的職責(zé)是從leader處復(fù)制消息數(shù)據(jù),使自己和leader的狀態(tài)保持一致; 如果leader節(jié)點(diǎn)宕機(jī),那么某個(gè)follower就會(huì)被選為leader繼續(xù)對(duì)外提供服務(wù); 復(fù)制因子:一個(gè)分區(qū)有幾個(gè)副本。
消息發(fā)送方式從生產(chǎn)者的角度來(lái)看,消息發(fā)送到Broker有三種方式: 立即發(fā)送:只發(fā)送消息,不關(guān)心消息發(fā)送的結(jié)果。本質(zhì)上也是一種異步發(fā)送的方式,消息先存儲(chǔ)在緩沖區(qū)中,達(dá)到設(shè)定條件后批量發(fā)送。當(dāng)然這是kafka吞吐量最高的一種方式,并配合參數(shù)acks=0,這樣生產(chǎn)者不需要等待服務(wù)器的響應(yīng),以網(wǎng)絡(luò)能支持的最大速度發(fā)送消息。但是也是消息最不可靠的一種方式,因?yàn)閷?duì)于發(fā)送失敗的消息沒(méi)有做任何處理。 同步發(fā)送:生產(chǎn)者發(fā)送消息后獲取返回的Future對(duì)象,根據(jù)該對(duì)象的結(jié)果查看發(fā)送是否成功。如果業(yè)務(wù)要求消息必須是按順序發(fā)送的,那么可以使用同步的方式,并且只能在一個(gè)partation上,結(jié)合參數(shù)設(shè)置retries的值讓發(fā)送失敗時(shí)重試,設(shè)置max_in_flight_requests_per_connection=1,可以控制生產(chǎn)者在收到服務(wù)器晌應(yīng)之前只能發(fā)送1個(gè)消息,在消息發(fā)送成功后立刻flush,從而控制消息順序發(fā)送。 異步發(fā)送:生產(chǎn)者發(fā)送消息時(shí)將注冊(cè)的回調(diào)函數(shù)作為入?yún)魅?,生產(chǎn)者接收到Kafka服務(wù)器的響應(yīng)時(shí)會(huì)觸發(fā)執(zhí)行回調(diào)函數(shù)。如果業(yè)務(wù)需要知道消息發(fā)送是否成功,并且對(duì)消息的順序不關(guān)心,那么可以用異步+回調(diào)的方式來(lái)發(fā)送消息,配合參數(shù)retries=0,并將發(fā)送失敗的消息記錄到日志文件中。
消息發(fā)送確認(rèn)消息發(fā)送到Broker后怎么算投遞成功呢,Kafka有三種確認(rèn)模式: 不等Broker確認(rèn)就認(rèn)為投遞成功; 由leader來(lái)確認(rèn)投遞成功; 由所有的leader和follower都確認(rèn)才認(rèn)為是成功的。
三種模式對(duì)比的話,性能依次降低,但可靠性依次提高。 消息重發(fā)機(jī)制當(dāng)從Broker接收到的是臨時(shí)可恢復(fù)的異常時(shí),生產(chǎn)者會(huì)向Broker重發(fā)消息,重發(fā)次數(shù)的限制值由初始化生產(chǎn)者對(duì)象的retries屬性決定,在默認(rèn)情況下生產(chǎn)者會(huì)在重試后等待100ms,可以通過(guò)retry.backoff.ms屬性進(jìn)行修改。 批次發(fā)送當(dāng)有多條消息要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放到同一個(gè)批次里,Kafka通過(guò)批次的概念來(lái)提高吞吐量,但同時(shí)也會(huì)增加延遲。對(duì)批次的控制主要通過(guò)構(gòu)建生產(chǎn)者對(duì)象時(shí)的兩個(gè)屬性來(lái)實(shí)現(xiàn): batch.size:當(dāng)發(fā)往每個(gè)分區(qū)的緩存消息數(shù)量達(dá)到這個(gè)數(shù)值時(shí),就會(huì)觸發(fā)一次網(wǎng)絡(luò)請(qǐng)求,批次里的所有消息都會(huì)被發(fā)送出去; linger.ms:每條消息在緩存中的最長(zhǎng)時(shí)間,如果超過(guò)這個(gè)時(shí)間就會(huì)忽略batch.size的限制,由客戶端立即把消息發(fā)送出去。
消費(fèi)者組消費(fèi)者組是Kafka提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)機(jī)制,在一個(gè)消費(fèi)者組內(nèi)可以有多個(gè)消費(fèi)者,它們共享一個(gè)唯一標(biāo)識(shí),即分組ID。組內(nèi)的所有消費(fèi)者協(xié)調(diào)消費(fèi)它們訂閱的主題下的所有分區(qū)的消息,但一個(gè)分區(qū)只能由同一個(gè)消費(fèi)者組里的一個(gè)消費(fèi)者來(lái)消費(fèi)。 廣播和單播一個(gè)Topic可以有多個(gè)消費(fèi)者組,Topic的消息會(huì)被復(fù)制到所有的消費(fèi)者組中,但每個(gè)消費(fèi)者組只會(huì)把消息發(fā)送給一個(gè)消費(fèi)者組里的某一個(gè)消費(fèi)者。如果要實(shí)現(xiàn)廣播,只需為每個(gè)消費(fèi)者都分配一個(gè)單獨(dú)的消費(fèi)者組接口如果要實(shí)現(xiàn)單播,則需要把所有的消費(fèi)者都設(shè)置在同一個(gè)消費(fèi)者組里 再均衡消費(fèi)者組里有新消費(fèi)者加入或者有消費(fèi)者離開(kāi),分區(qū)所有權(quán)會(huì)從一個(gè)消費(fèi)者轉(zhuǎn)移到另一個(gè)消費(fèi)者再均衡協(xié)議規(guī)定了一個(gè)消費(fèi)者組下的所有消費(fèi)者如何達(dá)成一致來(lái)分配主題下的每個(gè)分區(qū)觸發(fā)再均衡的場(chǎng)景有三種: 消費(fèi)偏移量Kafka中有一個(gè)叫作_consumer_offset特殊主題用來(lái)保存消息在每個(gè)分區(qū)的偏移量,消費(fèi)者每次消費(fèi)時(shí)都會(huì)往這個(gè)主題中發(fā)送消息,消息包含每個(gè)分區(qū)的偏移量。如果消費(fèi)者一直處于運(yùn)行狀態(tài),偏移量沒(méi)什么作用;如果消費(fèi)者崩潰或者有新的消費(fèi)者加入消費(fèi)者組從而觸發(fā)再均衡操作,再均衡之后該分區(qū)的消費(fèi)者若不是之前的那個(gè),提交偏移量就有用了。維護(hù)消息偏移量對(duì)于避免消息被重復(fù)消費(fèi)和遺漏消費(fèi),確保消息的ExactlyOnce至關(guān)重要,以下是不同的提交偏移量的方式: 自動(dòng)提交:Kafka默認(rèn)會(huì)定期自動(dòng)提交偏移量,提交的時(shí)間間隔默認(rèn)是5秒。此方式會(huì)產(chǎn)生重復(fù)處理消息的問(wèn)題; 手動(dòng)提交:在進(jìn)行手動(dòng)提交之前需要先關(guān)閉消費(fèi)者的自動(dòng)提交配置,然后用commitSync方法來(lái)提交偏移量。處理完記錄后由開(kāi)發(fā)者確保調(diào)用了commitSync方法,來(lái)減少重復(fù)處理消息的數(shù)量,但可能降低消費(fèi)者的吞吐量; 異步提交:使用commitASync方法來(lái)提交最后一個(gè)偏移量。消費(fèi)者只管發(fā)送提交請(qǐng)求,而不需要等待Broker的立即回應(yīng)。
Kafka的工程應(yīng)用Kafka主要用于三種場(chǎng)景: 基于Kafka的用戶行為數(shù)據(jù)采集要獲取必要的數(shù)據(jù)進(jìn)行用戶行為等的分析,需要這樣幾個(gè)步驟: 基于Kafka的日志收集各個(gè)應(yīng)用系統(tǒng)在輸出日志時(shí)利用高吞吐量的Kafka作為數(shù)據(jù)緩沖平臺(tái),將日志統(tǒng)一輸出到Kafka,再通過(guò)Kafka以統(tǒng)一接口服務(wù)的方式開(kāi)放給各種消費(fèi)者。做統(tǒng)一日志平臺(tái)的方案,收集重要系統(tǒng)的日志集中到Kafka中,然后再導(dǎo)入ElasticSearch、HDFS、Storm等具體日志數(shù)據(jù)的消費(fèi)者中,用于進(jìn)行實(shí)時(shí)搜索分析、離線統(tǒng)計(jì)、數(shù)據(jù)備份、大數(shù)據(jù)分析等。 基于Kafka的流量削峰為了讓系統(tǒng)在大流量場(chǎng)景下仍然可用,可以在系統(tǒng)中的重點(diǎn)業(yè)務(wù)環(huán)節(jié)加入消息隊(duì)列作為消息流的緩沖,從而避免短時(shí)間內(nèi)產(chǎn)生的高流量帶來(lái)的壓垮整個(gè)應(yīng)用的問(wèn)題。
|