小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

Kafka是如何保證高性能和高吞吐量的?

 xxcc140 2019-11-13

Kafka是一款分布式消息發(fā)布和訂閱系統(tǒng),它的特點是高性能、高吞吐量。

最早設計的目的是作為LinkedIn的活動流和運營數(shù)據(jù)的處理管道。這些數(shù)據(jù)主要是用來對用戶做用戶畫像分析以及服務器性能數(shù)據(jù)的一些監(jiān)控。

所以kafka一開始設計的目標就是作為一個分布式、高吞吐量的消息系統(tǒng),所以適合運用在大數(shù)據(jù)傳輸場景。

Kafka的應用場景

由于kafka具有更好的吞吐量、內(nèi)置分區(qū)、冗余及容錯性的優(yōu)點(kafka每秒可以處理幾十萬消息),讓kafka成為了一個很好的大規(guī)模消息處理應用的解決方案。所以在企業(yè)級應用長,主要會應用于如下幾個方面

  • 行為跟蹤:kafka可以用于跟蹤用戶瀏覽頁面、搜索及其他行為。通過發(fā)布-訂閱模式實時記錄到對應的topic中,通過后端大數(shù)據(jù)平臺接入處理分析,并做更進一步的實時處理和監(jiān)控
  • 日志收集:日志收集方面,有很多比較優(yōu)秀的產(chǎn)品,比如Apache Flume,很多公司使用kafka代理日志聚合。日志聚合表示從服務器上收集日志文件,然后放到一個集中的平臺(文件服務器)進行處理。在實際應用開發(fā)中,我們應用程序的log都會輸出到本地的磁盤上,排查問題的話通過linux命令來搞定,如果應用程序組成了負載均衡集群,并且集群的機器有幾十臺以上,那么想通過日志快速定位到問題,就是很麻煩的事情了。所以一般都會做一個日志統(tǒng)一收集平臺管理log日志用來快速查詢重要應用的問題。所以很多公司的套路都是把應用日志集中到kafka上,然后分別導入到es和hdfs上,用來做實時檢索分析和離線統(tǒng)計數(shù)據(jù)備份等。而另一方面,kafka本身又提供了很好的api來集成日志并且做日志收集。
    image.png

Kafka的架構

一個典型的kafka集群包含若干Producer(可以是應用節(jié)點產(chǎn)生的消息,也可以是通過Flume收集日志產(chǎn)生的事件),若干個Broker(kafka支持水平擴展)、若干個Consumer Group,以及一個zookeeper集群。kafka通過zookeeper管理集群配置及服務協(xié)同。Producer使用push模式將消息發(fā)布到broker,consumer通過監(jiān)聽使用pull模式從broker訂閱并消費消息。

多個broker協(xié)同工作,producer和consumer部署在各個業(yè)務邏輯中。三者通過zookeeper管理協(xié)調(diào)請求和轉(zhuǎn)發(fā)。這樣就組成了一個高性能的分布式消息發(fā)布和訂閱系統(tǒng)。

圖上有一個細節(jié)是和其他mq中間件不同的點,producer 發(fā)送消息到broker的過程是push,而consumer從broker消費消息的過程是pull,主動去拉數(shù)據(jù)。而不是broker把數(shù)據(jù)主動發(fā)送給consumer。

Kafka是如何保證高性能和高吞吐量的?

名詞解釋

1)Broker
Kafka集群包含一個或多個服務器,這種服務器被稱為broker。broker端不維護數(shù)據(jù)的消費狀態(tài),提升了性能。直接使用磁盤進行存儲,線性讀寫,速度快:避免了數(shù)據(jù)在JVM內(nèi)存和系統(tǒng)內(nèi)存之間的復制,減少耗性能的創(chuàng)建對象和垃圾回收。
2)Producer
負責發(fā)布消息到Kafka broker
3)Consumer
消息消費者,向Kafka broker讀取消息的客戶端,consumer從broker拉取(pull)數(shù)據(jù)并進行處理。
4)Topic
每條發(fā)布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費數(shù)據(jù)而不必關心數(shù)據(jù)存于何處)
5)Partition
Parition是物理上的概念,每個Topic包含一個或多個Partition.
6)Consumer Group
每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于默認的group)
7)Topic & Partition
Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進哪個queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。若創(chuàng)建topic1和topic2兩個topic,且分別有13個和19個分區(qū),則整個集群上會相應會生成共32個文件夾(本文所用集群共8個節(jié)點,此處topic1和topic2 replication-factor均為1)。

Java中使用kafka進行通信

依賴

org.apache.kafka

kafka-clients

2.0.0

發(fā)送端代碼

Kafka是如何保證高性能和高吞吐量的?

消費端代碼

Kafka是如何保證高性能和高吞吐量的?

異步發(fā)送

Kafka是如何保證高性能和高吞吐量的?

batch.size

生產(chǎn)者發(fā)送多個消息到broker上的同一個分區(qū)時,為了減少網(wǎng)絡請求帶來的性能開銷,通過批量的方式來提交消息,可以通過這個參數(shù)來控制批量提交的字節(jié)數(shù)大小,默認大小是16384byte,也就是16kb,意味著當一批消息大小達到指定的batch.size的時候會統(tǒng)一發(fā)送

linger.ms

Producer默認會把兩次發(fā)送時間間隔內(nèi)收集到的所有Requests進行一次聚合然后再發(fā)送,以此提高吞吐量,而linger.ms就是為每次發(fā)送到broker的請求增加一些delay,以此來聚合更多的Message請求。這個有點想TCP里面的Nagle算法,在TCP協(xié)議的傳輸中,為了減少大量小數(shù)據(jù)包的發(fā)送,采用了Nagle算法,也就是基于小包的等-停協(xié)議。
batch.size和linger.ms這兩個參數(shù)是kafka性能優(yōu)化的關鍵參數(shù),batch.size和linger.ms這兩者的作用是一樣的,如果兩個都配置了,那么怎么工作的呢?實際上,當二者都配置的時候,只要滿足其中一個要求,就會發(fā)送請求到broker上

一些基礎配置分析

group.id

consumer group是kafka提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內(nèi)必然可以有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題(subscribed topics)的所有分區(qū)(partition)。當然,每個分區(qū)只能由同一個消費組內(nèi)的一個consumer來消費.如下圖所示,分別有三個消費者,屬于兩個不同的group,那么對于firstTopic這個topic來說,這兩個組的消費者都能同時消費這個topic中的消息,對于此時的架構來說,這個firstTopic就類似于ActiveMQ中的topic概念。如右圖所示,如果3個消費者都屬于同一個group,那么此時firstTopic就是一個Queue的概念

Kafka是如何保證高性能和高吞吐量的?

Kafka是如何保證高性能和高吞吐量的?

enable.auto.commit

消費者消費消息以后自動提交,只有當消息提交以后,該消息才不會被再次接收到,還可以配合auto.commit.interval.ms控制自動提交的頻率。
當然,我們也可以通過consumer.commitSync()的方式實現(xiàn)手動提交

auto.offset.reset

這個參數(shù)是針對新的groupid中的消費者而言的,當有新groupid的消費者來消費指定的topic時,對于該參數(shù)的配置,會有不同的語義。
auto.offset.reset=latest情況下,新的消費者將會從其他消費者最后消費的offset處開始消費Topic下的消息。
auto.offset.reset= earliest情況下,新的消費者會從該topic最早的消息開始消費。
auto.offset.reset=none情況下,新的消費者加入以后,由于之前不存在offset,則會直接拋出異常。

max.poll.records

此設置限制每次調(diào)用poll返回的消息數(shù),這樣可以更容易的預測每次poll間隔要處理的最大值。通過調(diào)整此值,可以減少poll間隔

原理分析

從前面的整個演示過程來看,只要不是超大規(guī)模的使用kafka,那么基本上沒什么大問題,否則,對于kafka本身的運維的挑戰(zhàn)會很大,同時,針對每一個參數(shù)的調(diào)優(yōu)也顯得很重要。
據(jù)我了解,快手在使用kafka集群規(guī)模是挺大的,他們在19年的開發(fā)者大會上有提到

關于Topic和Partition

Topic

在kafka中,topic是一個存儲消息的邏輯概念,可以認為是一個消息集合。每條消息發(fā)送到kafka集群的消息都有一個類別。物理上來說,不同的topic的消息是分開存儲的,
每個topic可以有多個生產(chǎn)者向它發(fā)送消息,也可以有多個消費者去消費其中的消息。

Kafka是如何保證高性能和高吞吐量的?

image.png

Partition

每個topic可以劃分多個分區(qū)(每個Topic至少有一個分區(qū)),同一topic下的不同分區(qū)包含的消息是不同的。每個消息在被添加到分區(qū)時,都會被分配一個offset(稱之為偏移量),它是消息在此分區(qū)中的唯一編號,kafka通過offset保證消息在分區(qū)內(nèi)的順序,offset的順序不跨分區(qū),即kafka只保證在同一個分區(qū)內(nèi)的消息是有序的。

下圖中,對于名字為test的topic,做了3個分區(qū),分別是p0、p1、p2.
每一條消息發(fā)送到broker時,會根據(jù)partition的規(guī)則選擇存儲到哪一個partition。如果partition規(guī)則設置合理,那么所有的消息會均勻的分布在不同的partition中,這樣就有點類似數(shù)據(jù)庫的分庫分表的概念,把數(shù)據(jù)做了分片處理。

Kafka是如何保證高性能和高吞吐量的?

image.png

Topic&Partition的存儲

Partition是以文件的形式存儲在文件系統(tǒng)中,比如創(chuàng)建一個名為firstTopic的topic,其中有3個partition,那么在kafka的數(shù)據(jù)目錄(/tmp/kafka-log)中就有3個目錄,firstTopic-0~3, 命名規(guī)則是-

關于消息分發(fā)

kafka消息分發(fā)策略

消息是kafka中最基本的數(shù)據(jù)單元,在kafka中,一條消息由key、value兩部分構成,在發(fā)送一條消息時,我們可以指定這個key,那么producer會根據(jù)key和partition機制來判斷當前這條消息應該發(fā)送并存儲到哪個partition中。我們可以根據(jù)需要進行擴展producer的partition機制。

自定義Partitioner

發(fā)送端代碼添加自定義分區(qū)

消息默認的分發(fā)機制

默認情況下,kafka采用的是hash取模的分區(qū)算法。如果Key為null,則會隨機分配一個分區(qū)。這個隨機是在這個參數(shù)”metadata.max.age.ms”的時間范圍內(nèi)隨機選擇一個。對于這個時間段內(nèi),如果key為null,則只會發(fā)送到唯一的分區(qū)。這個值值哦默認情況下是10分鐘更新一次。

關于Metadata,這個之前沒講過,簡單理解就是Topic/Partition和broker的映射關系,每一個topic的每一個partition,需要知道對應的broker列表是什么,leader是誰、follower是誰。這些信息都是存儲在Metadata這個類里面。

消費端如何消費指定的分區(qū)

通過下面的代碼,就可以消費指定該topic下的0號分區(qū)。其他分區(qū)的數(shù)據(jù)就無法接收

消息的消費原理

在實際生產(chǎn)過程中,每個topic都會有多個partitions,多個partitions的好處在于,一方面能夠?qū)roker上的數(shù)據(jù)進行分片有效減少了消息的容量從而提升io性能。另外一方面,為了提高消費端的消費能力,一般會通過多個consumer去消費同一個topic ,也就是消費端的負載均衡機制,也就是我們接下來要了解的,在多個partition以及多個consumer的情況下,消費者是如何消費消息的。
kafka存在consumer group的概念,也就是group.id一樣的consumer,這些consumer屬于一個consumer group,組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題的所有分區(qū)。當然每一個分區(qū)只能由同一個消費組內(nèi)的consumer來消費,那么同一個consumergroup里面的consumer是怎么去分配該消費哪個分區(qū)里的數(shù)據(jù)的呢?如下圖所示,3個分區(qū),3個消費者,那么哪個消費者消分哪個分區(qū)?

Kafka是如何保證高性能和高吞吐量的?

image.png

對于上面這個圖來說,這3個消費者會分別消費test這個topic 的3個分區(qū),也就是每個consumer消費一個partition。

  • 演示1(3個partiton對應3個consumer)
    ? 創(chuàng)建一個帶3個分區(qū)的topic
    ? 啟動3個消費者消費同一個topic,并且這3個consumer屬于同一個組
    ? 啟動發(fā)送者進行消息發(fā)送

演示結果:consumer1會消費partition0分區(qū)、consumer2會消費partition1分區(qū)、consumer3會消費partition2分區(qū)
如果是2個consumer消費3個partition呢?會是怎么樣的結果?

  • 演示2(3個partiton對應2個consumer)
    ? 基于上面演示的案例的topic不變
    ? 啟動2個消費這消費該topic
    ? 啟動發(fā)送者進行消息發(fā)送
    演示結果:consumer1會消費partition0/partition1分區(qū)、consumer2會消費partition2分區(qū)
  • 演示3(3個partition對應4個或以上consumer)
    演示結果:仍然只有3個consumer對應3個partition,其他的consumer無法消費消息
    通過這個演示的過程,引出接下來需要了解的kafka的分區(qū)分配策略(Partition Assignment Strategy)

consumer和partition的數(shù)量建議

  1. 如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許并發(fā)的,所以consumer數(shù)不要大于partition數(shù)
  2. 如果consumer比partition少,一個consumer會對應于多個partitions,這里主要合理分配consumer數(shù)和partition數(shù),否則會導致partition里面的數(shù)據(jù)被取的不均勻。最好partiton數(shù)目是consumer數(shù)目的整數(shù)倍,所以partition數(shù)目很重要,比如取24,就很容易設定consumer數(shù)目
  3. 如果consumer從多個partition讀到數(shù)據(jù),不保證數(shù)據(jù)間的順序性,kafka只保證在一個partition上數(shù)據(jù)是有序的,但多個partition,根據(jù)你讀的順序會有不同
  4. 增減consumer,broker,partition會導致rebalance,所以rebalance后consumer對應的partition會發(fā)生變化

什么是分區(qū)分配策略

通過前面的案例演示,我們應該能猜到,同一個group中的消費者對于一個topic中的多個partition,存在一定的分區(qū)分配策略。
在kafka中,存在三種分區(qū)分配策略,一種是Range(默認)、 另一種是RoundRobin(輪詢)、StickyAssignor(粘性)。 在消費端中的ConsumerConfig中,通過這個屬性來指定分區(qū)分配策略

RangeAssignor(范圍分區(qū))

Range策略是對每個主題而言的,首先對同一個主題里面的分區(qū)按照序號進行排序,并對消費者按照字母順序進行排序。

假設我們有10個分區(qū),3個消費者,排完序的分區(qū)將會是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費者線程排完序?qū)荂1-0, C2-0, C3-0。然后將partitions的個數(shù)除于消費者線程的總數(shù)來決定每個消費者線程消費幾個分區(qū)。如果除不盡,那么前面幾個消費者線程將會多消費一個分區(qū)。在我們的例子里面,我們有10個分區(qū),3個消費者線程, 10 / 3 = 3,而且除不盡,那么消費者線程 C1-0 將會多消費一個分區(qū).
結果看起來是這樣的:
C1-0 將消費 0, 1, 2, 3 分區(qū)
C2-0 將消費 4, 5, 6 分區(qū)
C3-0 將消費 7, 8, 9 分區(qū)

假如我們有11個分區(qū),那么最后分區(qū)分配的結果看起來是這樣的:
C1-0 將消費 0, 1, 2, 3 分區(qū)
C2-0 將消費 4, 5, 6, 7 分區(qū)
C3-0 將消費 8, 9, 10 分區(qū)

假如我們有2個主題(T1和T2),分別有10個分區(qū),那么最后分區(qū)分配的結果看起來是這樣的:
C1-0 將消費 T1主題的 0, 1, 2, 3 分區(qū)以及 T2主題的 0, 1, 2, 3分區(qū)
C2-0 將消費 T1主題的 4, 5, 6 分區(qū)以及 T2主題的 4, 5, 6分區(qū)
C3-0 將消費 T1主題的 7, 8, 9 分區(qū)以及 T2主題的 7, 8, 9分區(qū)

可以看出,C1-0 消費者線程比其他消費者線程多消費了2個分區(qū),這就是Range strategy的一個很明顯的弊端

RoundRobinAssignor(輪詢分區(qū))

輪詢分區(qū)策略是把所有partition和所有consumer線程都列出來,然后按照hashcode進行排序。最后通過輪詢算法分配partition給消費線程。如果所有consumer實例的訂閱是相同的,那么partition會均勻分布。

在我們的例子里面,假如按照 hashCode 排序完的topic-partitions組依次為T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我們的消費者線程排序為C1-0, C1-1, C2-0, C2-1,最后分區(qū)分配的結果為:
C1-0 將消費 T1-5, T1-2, T1-6 分區(qū);
C1-1 將消費 T1-3, T1-1, T1-9 分區(qū);
C2-0 將消費 T1-0, T1-4 分區(qū);
C2-1 將消費 T1-8, T1-7 分區(qū);

使用輪詢分區(qū)策略必須滿足兩個條件:

  1. 每個主題的消費者實例具有相同數(shù)量的流
  2. 每個消費者訂閱的主題必須是相同的

StrickyAssignor 分配策略

kafka在0.11.x版本支持了StrickyAssignor, 翻譯過來叫粘滯策略,它主要有兩個目的:

  • 分區(qū)的分配盡可能的均勻
  • 分區(qū)的分配盡可能和上次分配保持相同

當兩者發(fā)生沖突時, 第 一 個目標優(yōu)先于第二個目標。 鑒于這兩個目標, StickyAssignor分配策略的具體實現(xiàn)要比RangeAssignor和RoundRobinAssi gn or這兩種分配策略要復雜得多,假設我們有這樣一個場景:

假設消費組有3個消費者:C0,C1,C2,它們分別訂閱了4個Topic(t0,t1,t2,t3),并且每個主題有兩個分區(qū)(p0,p1),也就是說,整個消費組訂閱了8個分區(qū):tOpO 、 tOpl 、 tlpO 、 tlpl 、 t2p0 、t2pl 、t3p0 、 t3pl
那么最終的分配場景結果為
CO: tOpO、tlpl 、 t3p0
Cl: tOpl、t2p0 、 t3pl
C2: tlpO、t2pl
這種分配方式有點類似于輪詢策略,但實際上并不是,因為假設這個時候,C1這個消費者掛了,就勢必會造成重新分區(qū)(reblance),如果是輪詢,那么結果應該是
CO: tOpO、tlpO、t2p0、t3p0
C2: tOpl、tlpl、t2pl、t3pl
然后,strickyAssignor它是一種粘滯策略,所以它會滿足`分區(qū)的分配盡可能和上次分配保持相同,所以分配結果應該是
消費者CO: tOpO、tlpl 、 t3p0、t2p0
消費者C2: tlpO、t2pl、tOpl、t3pl
也就是說,C0和C2保留了上一次是的分配結果,并且把原來C1的分區(qū)分配給了C0和C2。 這種策略的好處是使得分區(qū)發(fā)生變化時,由于分區(qū)的“粘性,減少了不必要的分區(qū)移動

誰來執(zhí)行Rebalance以及管理consumer的group呢?

Kafka提供了一個角色:coordinator來執(zhí)行對于consumer group的管理,Kafka提供了一個角色:coordinator來執(zhí)行對于consumer group的管理,當consumer group的第一個consumer啟動的時候,它會去和kafka server確定誰是它們組的coordinator。之后該group內(nèi)的所有成員都會和該coordinator進行協(xié)調(diào)通信

如何確定coordinator

consumer group如何確定自己的coordinator是誰呢, 消費者向kafka集群中的任意一個broker發(fā)送一個GroupCoordinatorRequest請求,服務端會返回一個負載最小的broker節(jié)點的id,并將該broker設置為coordinator

JoinGroup的過程

在rebalance之前,需要保證coordinator是已經(jīng)確定好了的,整個rebalance的過程分為兩個步驟,Join和Sync

join: 表示加入到consumer group中,在這一步中,所有的成員都會向coordinator發(fā)送joinGroup的請求。一旦所有成員都發(fā)送了joinGroup請求,那么coordinator會選擇一個consumer擔任leader角色,并把組成員信息和訂閱信息發(fā)送消費者
leader選舉算法比較簡單,如果消費組內(nèi)沒有l(wèi)eader,那么第一個加入消費組的消費者就是消費者leader,如果這個時候leader消費者退出了消費組,那么重新選舉一個leader,這個選舉很隨意,類似于隨機算法

Kafka是如何保證高性能和高吞吐量的?

image.png

protocol_metadata: 序列化后的消費者的訂閱信息
leader_id: 消費組中的消費者,coordinator會選擇一個座位leader,對應的就是member_id
member_metadata 對應消費者的訂閱信息
members:consumer group中全部的消費者的訂閱信息
generation_id: 年代信息,類似于之前講解zookeeper的時候的epoch是一樣的,對于每一輪rebalance,generation_id都會遞增。主要用來保護consumer group。隔離無效的offset提交。也就是上一輪的consumer成員無法提交offset到新的consumer group中。

每個消費者都可以設置自己的分區(qū)分配策略,對于消費組而言,會從各個消費者上報過來的分區(qū)分配策略中選舉一個彼此都贊同的策略來實現(xiàn)整體的分區(qū)分配,這個'贊同'的規(guī)則是,消費組內(nèi)的各個消費者會通過投票來決定

  • 在joingroup階段,每個consumer都會把自己支持的分區(qū)分配策略發(fā)送到coordinator
  • coordinator手機到所有消費者的分配策略,組成一個候選集
  • 每個消費者需要從候選集里找出一個自己支持的策略,并且為這個策略投票
  • 最終計算候選集中各個策略的選票數(shù),票數(shù)最多的就是當前消費組的分配策略

Synchronizing Group State階段

完成分區(qū)分配之后,就進入了Synchronizing Group State階段,主要邏輯是向GroupCoordinator發(fā)送SyncGroupRequest請求,并且處理SyncGroupResponse響應,簡單來說,就是leader將消費者對應的partition分配方案同步給consumer group 中的所有consumer

Kafka是如何保證高性能和高吞吐量的?

image.png

每個消費者都會向coordinator發(fā)送syncgroup請求,不過只有l(wèi)eader節(jié)點會發(fā)送分配方案,其他消費者只是打打醬油而已。當leader把方案發(fā)給coordinator以后,coordinator會把結果設置到SyncGroupResponse中。這樣所有成員都知道自己應該消費哪個分區(qū)。

consumer group的分區(qū)分配方案是在客戶端執(zhí)行的!Kafka將這個權利下放給客戶端主要是因為這樣做可以有更好的靈活性

總結

我們再來總結一下consumer group rebalance的過程
? 對于每個consumer group子集,都會在服務端對應一個GroupCoordinator進行管理,GroupCoordinator會在zookeeper上添加watcher,當消費者加入或者退出consumer group時,會修改zookeeper上保存的數(shù)據(jù),從而觸發(fā)GroupCoordinator開始Rebalance操作
? 當消費者準備加入某個Consumer group或者GroupCoordinator發(fā)生故障轉(zhuǎn)移時,消費者并不知道GroupCoordinator的在網(wǎng)絡中的位置,這個時候就需要確定GroupCoordinator,消費者會向集群中的任意一個Broker節(jié)點發(fā)送ConsumerMetadataRequest請求,收到請求的broker會返回一個response作為響應,其中包含管理當前ConsumerGroup的GroupCoordinator,
? 消費者會根據(jù)broker的返回信息,連接到groupCoordinator,并且發(fā)送HeartbeatRequest,發(fā)送心跳的目的是要要奧噶蘇GroupCoordinator這個消費者是正常在線的。當消費者在指定時間內(nèi)沒有發(fā)送心跳請求,則GroupCoordinator會觸發(fā)Rebalance操作。

? 發(fā)起join group請求,兩種情況

  • 如果GroupCoordinator返回的心跳包數(shù)據(jù)包含異常,說明GroupCoordinator因為前面說的幾種情況導致了Rebalance操作,那這個時候,consumer會發(fā)起join group請求
  • 新加入到consumer group的consumer確定好了GroupCoordinator以后消費者會向GroupCoordinator發(fā)起join group請求,GroupCoordinator會收集全部消費者信息之后,來確認可用的消費者,并從中選取一個消費者成為group_leader。并把相應的信息(分區(qū)分配策略、leader_id、…)封裝成response返回給所有消費者,但是只有group leader會收到當前consumer group中的所有消費者信息。當消費者確定自己是group leader以后,會根據(jù)消費者的信息以及選定分區(qū)分配策略進行分區(qū)分配
  • 接著進入Synchronizing Group State階段,每個消費者會發(fā)送SyncGroupRequest請求到GroupCoordinator,但是只有Group Leader的請求會存在分區(qū)分配結果,GroupCoordinator會根據(jù)Group Leader的分區(qū)分配結果形成SyncGroupResponse返回給所有的Consumer。
  • consumer根據(jù)分配結果,執(zhí)行相應的操作

到這里為止,我們已經(jīng)知道了消息的發(fā)送分區(qū)策略,以及消費者的分區(qū)消費策略和rebalance。對于應用層面來說,還有一個最重要的東西沒有講解,就是offset,他類似一個游標,表示當前消費的消息的位置。

如何保存消費端的消費位置

什么是offset

前面在講解partition的時候,提到過offset, 每個topic可以劃分多個分區(qū)(每個Topic至少有一個分區(qū)),同一topic下的不同分區(qū)包含的消息是不同的。每個消息在被添加到分區(qū)時,都會被分配一個offset(稱之為偏移量),它是消息在此分區(qū)中的唯一編號,kafka通過offset保證消息在分區(qū)內(nèi)的順序,offset的順序不跨分區(qū),即kafka只保證在同一個分區(qū)內(nèi)的消息是有序的; 對于應用層的消費來說,每次消費一個消息并且提交以后,會保存當前消費到的最近的一個offset。那么offset保存在哪里?

Kafka是如何保證高性能和高吞吐量的?

image.png

offset在哪里維護?

在kafka中,提供了一個consumer_offsets_* 的一個topic,把offset信息寫入到這個topic中。
consumer_offsets——按保存了每個consumer group某一時刻提交的offset信息。
__consumer_offsets 默認有50個分區(qū)。
根據(jù)前面我們演示的案例,我們設置了一個KafkaConsumerDemo的groupid。首先我們需要找到這個consumer_group保存在哪個分區(qū)中
properties.put(ConsumerConfig.GROUP_ID_CONFIG,'KafkaConsumerDemo');
計算公式:
Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 由于默認情況下groupMetadataTopicPartitionCount有50個分區(qū),計算得到的結果為:35, 意味著當前的consumer_group的位移信息保存在__consumer_offsets的第35個分區(qū)
執(zhí)行如下命令,可以查看當前consumer_goup中的offset位移提交的信息

從輸出結果中,我們就可以看到test這個topic的offset的位移日志

分區(qū)的副本機制

我們已經(jīng)知道Kafka的每個topic都可以分為多個Partition,并且多個partition會均勻分布在集群的各個節(jié)點下。雖然這種方式能夠有效的對數(shù)據(jù)進行分片,但是對于每個partition來說,都是單點的,當其中一個partition不可用的時候,那么這部分消息就沒辦法消費。所以kafka為了提高partition的可靠性而提供了副本的概念(Replica),通過副本機制來實現(xiàn)冗余備份。

每個分區(qū)可以有多個副本,并且在副本集合中會存在一個leader的副本,所有的讀寫請求都是由leader副本來進行處理。剩余的其他副本都做為follower副本,follower副本會從leader副本同步消息日志。
這個有點類似zookeeper中l(wèi)eader和follower的概念,但是具體的時間方式還是有比較大的差異。所以我們可以認為,副本集會存在一主多從的關系。

一般情況下,同一個分區(qū)的多個副本會被均勻分配到集群中的不同broker上,當leader副本所在的broker出現(xiàn)故障后,可以重新選舉新的leader副本繼續(xù)對外提供服務。通過這樣的副本機制來提高kafka集群的可用性。

創(chuàng)建一個帶副本機制的topic

通過下面的命令去創(chuàng)建帶2個副本的topic

然后我們可以在/tmp/kafka-log路徑下看到對應topic的副本信息了。我們通過一個圖形的方式來表達。
針對secondTopic這個topic的3個分區(qū)對應的3個副本

Kafka是如何保證高性能和高吞吐量的?

image.png

如何知道那個各個分區(qū)中對應的leader是誰呢?

在zookeeper服務器上,通過如下命令去獲取對應分區(qū)的信息, 比如下面這個是獲取secondTopic第1個
分區(qū)的狀態(tài)信息。

{'controller_epoch':12,'leader':0,'version':1,'leader_epoch':0,'isr':[0,1]}
或通過這個命令

leader表示當前分區(qū)的leader是那個broker-id。下圖中。綠色線條的表示該分區(qū)中的leader節(jié)點。其他節(jié)點就為follower

Kafka是如何保證高性能和高吞吐量的?

image.png

需要注意的是,kafka集群中的一個broker中最多只能有一個副本,leader副本所在的broker節(jié)點的分區(qū)叫l(wèi)eader節(jié)點,follower副本所在的broker節(jié)點的分區(qū)叫follower節(jié)點

副本的leader選舉

Kafka提供了數(shù)據(jù)復制算法保證,如果leader副本所在的broker節(jié)點宕機或者出現(xiàn)故障,或者分區(qū)的leader節(jié)點發(fā)生故障,這個時候怎么處理呢?
那么,kafka必須要保證從follower副本中選擇一個新的leader副本。那么kafka是如何實現(xiàn)選舉的呢?
要了解leader選舉,我們需要了解幾個概念
Kafka分區(qū)下有可能有很多個副本(replica)用于實現(xiàn)冗余,從而進一步實現(xiàn)高可用。副本根據(jù)角色的不同可分為3類:

  • leader副本:響應clients端讀寫請求的副本
  • follower副本:被動地備份leader副本中的數(shù)據(jù),不能響應clients端讀寫請求。
  • ISR副本:包含了leader副本和所有與leader副本保持同步的follower副本——如何判定是否與leader同步后面會提到每個Kafka副本對象都有兩個重要的屬性:LEO和HW。注意是所有的副本,而不只是leader副本。
  • LEO:即日志末端位移(log end offset),記錄了該副本底層日志(log)中下一條消息的位移值。注意是下一條消息!也就是說,如果LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。另外,leader LEO和follower LEO的更新是有區(qū)別的。我們后面會詳細說
  • HW:即上面提到的水位值。對于同一個副本對象而言,其HW值不會大于LEO值。小于等于HW值的所有消息都被認為是“已備份”的(replicated)。同理,leader副本和follower副本的HW更新是有區(qū)別的
    從生產(chǎn)者發(fā)出的 一 條消息首先會被寫入分區(qū)的leader 副本,不過還需要等待ISR集合中的所有follower副本都同步完之后才能被認為已經(jīng)提交,之后才會更新分區(qū)的HW, 進而消費者可以消費到這條消息。

副本協(xié)同機制

剛剛提到了,消息的讀寫操作都只會由leader節(jié)點來接收和處理。follower副本只負責同步數(shù)據(jù)以及當leader副本所在的broker掛了以后,會從follower副本中選取新的leader。

寫請求首先由Leader副本處理,之后follower副本會從leader上拉取寫入的消息,這個過程會有一定的延遲,導致follower副本中保存的消息略少于leader副本,但是只要沒有超出閾值都可以容忍。但是如果一個follower副本出現(xiàn)異常,比如宕機、網(wǎng)絡斷開等原因長時間沒有同步到消息,那這個時候,leader就會把它踢出去。kafka通過ISR集合來維護一個分區(qū)副本信息

Kafka是如何保證高性能和高吞吐量的?

image.png

一個新leader被選舉并被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個副本為leader;leader負責維護和跟蹤ISR(in-Sync replicas , 副本同步隊列)中所有follower滯后的狀態(tài)。當producer發(fā)送一條消息到broker后,leader寫入消息并復制到所有follower。消息提交之后才被成功復制到所有的同步副本。

ISR

ISR表示目前“可用且消息量與leader相差不多的副本集合,這是整個副本集合的一個子集”。怎么去理解可用和相差不多這兩個詞呢?具體來說,ISR集合中的副本必須滿足兩個條件:

  1. 副本所在節(jié)點必須維持著與zookeeper的連接
  2. 副本最后一條消息的offset與leader副本的最后一條消息的offset之間的差值不能超過指定的閾值
    (replica.lag.time.max.ms) replica.lag.time.max.ms:如果該follower在此時間間隔內(nèi)一直沒有追上過leader的所有消息,則該follower就會被剔除isr列表
  3. ISR數(shù)據(jù)保存在Zookeeper的 /brokers/topics//partitions//state 節(jié)點中

follower副本把leader副本LEO之前的日志全部同步完成時,則認為follower副本已經(jīng)追趕上了leader副本,這個時候會更新這個副本的lastCaughtUpTimeMs標識,kafk副本管理器會啟動一個副本過期檢查的定時任務,這個任務會定期檢查當前時間與副本的lastCaughtUpTimeMs的差值是否大于參數(shù)replica.lag.time.max.ms 的值,如果大于,則會把這個副本踢出ISR集合

Kafka是如何保證高性能和高吞吐量的?

image.png

如何處理所有的Replica不工作的情況

在ISR中至少有一個follower時,Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失,但如果某個Partition的所有Replica都宕機了,就無法保證數(shù)據(jù)不丟失了

  1. 等待ISR中的任一個Replica“活”過來,并且選它作為Leader
  2. 選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader

這就需要在可用性和一致性當中作出一個簡單的折衷。
如果一定要等待ISR中的Replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中的所有Replica都無法“活”過來了,或者數(shù)據(jù)都丟失了,這個Partition將永遠不可用。
選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica,那即使它并不保證已經(jīng)包含了所有已commit的消息,它也會成為Leader而作為consumer的數(shù)據(jù)源(所有讀寫都由Leader完成)。在我們課堂講的版本中,使用的是第一種策略。

副本數(shù)據(jù)同步原理

了解了副本的協(xié)同過程以后,還有一個最重要的機制,就是數(shù)據(jù)的同步過程。它需要解決

  1. 怎么傳播消息
  2. 在向消息發(fā)送端返回ack之前需要保證多少個Replica已經(jīng)接收到這個消息

下圖中,深紅色部分表示test_replica分區(qū)的leader副本,另外兩個節(jié)點上淺色部分表示follower副本

Kafka是如何保證高性能和高吞吐量的?

image.png

Producer在發(fā)布消息到某個Partition時,

  • 先通過ZooKeeper找到該Partition的Leader get /brokers/topics//partitions/2/state ,然后無論該Topic的Replication Factor為多少(也即該Partition有多少個Replica),Producer只將該消息發(fā)送到該Partition的Leader。
  • Leader會將該消息寫入其本地Log。每個Follower都從Leader pull數(shù)據(jù)。這種方式上,F(xiàn)ollower存儲的數(shù)據(jù)順序與Leader保持一致。
  • Follower在收到該消息并寫入其Log后,向Leader發(fā)送ACK。
  • 一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認為已經(jīng)commit了,Leader將增加HW(HighWatermark)并且向Producer發(fā)送ACK。

LEO:即日志末端位移(log end offset),記錄了該副本底層日志(log)中下一條消息的位移值。注意是下一條消息!也就是說,如果LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。另外,leader LEO和follower LEO的更新是有區(qū)別的。

HW:即上面提到的水位值(Hight Water)。對于同一個副本對象而言,其HW值不會大于LEO值。小于等于HW值的所有消息都被認為是“已備份”的(replicated)。同理,leader副本和follower副本的HW更新是有區(qū)別的

通過下面這幅圖來表達LEO、HW的含義,隨著follower副本不斷和leader副本進行數(shù)據(jù)同步,follower副本的LEO會主鍵后移并且追趕到leader副本,這個追趕上的判斷標準是當前副本的LEO是否大于或者等于leader副本的HW,這個追趕上也會使得被踢出的follower副本重新加入到ISR集合中。
另外, 假如說下圖中的最右側(cè)的follower副本被踢出ISR集合,也會導致這個分區(qū)的HW發(fā)生變化,變成了3

Kafka是如何保證高性能和高吞吐量的?

image.png

數(shù)據(jù)丟失的問題

表達的含義是,至少需要多少個副本同步才能表示消息是提交的, 所以,當 min.insync.replicas=1的時候,一旦消息被寫入leader端log即被認為是“已提交”,而延遲一輪FETCH RPC更新HW值的設計使得follower HW值是異步延遲更新的,倘若在這個過程中l(wèi)eader發(fā)生變更,那么成為新leader的follower的HW值就有可能是過期的,使得clients端認為是成功提交的消息被刪除。

Kafka是如何保證高性能和高吞吐量的?

image.png

acks配置表示producer發(fā)送消息到broker上以后的確認值。有三個可選項
0:表示producer不需要等待broker的消息確認。這個選項時延最小但同時風險最大(因為當server宕機時,數(shù)據(jù)將會丟失)。
1:表示producer只需要獲得kafka集群中的leader節(jié)點確認即可,這個選擇時延較小同時確保了leader節(jié)點確認接收成功。
all(-1):需要ISR中所有的Replica給予接收確認,速度最慢,安全性最高,但是由于ISR可能會縮小到僅包含一個Replica,所以設置參數(shù)為all并不能一定避免數(shù)據(jù)丟失,

數(shù)據(jù)丟失的解決方案

在kafka0.11.0.0版本之后,引入了一個leader epoch來解決這個問題,所謂的leader epoch實際上是一對值(epoch,offset),epoch代表leader的版本號,從0開始遞增,當leader發(fā)生過變更,epoch就+1,而offset則是對應這個epoch版本的leader寫入第一條消息的offset,比如
(0,0), (1,50) ,表示第一個leader從offset=0開始寫消息,一共寫了50條。第二個leader版本號是1,從offset=50開始寫,這個信息會持久化在對應的分區(qū)的本地磁盤上,文件名是 /tmp/kafka-log/topic/leader-epoch-checkpoint 。
leader broker中會保存這樣一個緩存,并且定期寫入到checkpoint文件中
當leader寫log時它會嘗試更新整個緩存: 如果這個leader首次寫消息,則會在緩存中增加一個條目;否則就不做更新。而每次副本重新成為leader時會查詢這部分緩存,獲取出對應leader版本的offset

我們基于同樣的情況來分析,follower宕機并且恢復之后,有兩種情況,如果這個時候leader副本沒有掛,也就是意味著沒有發(fā)生leader選舉,那么follower恢復之后并不會去截斷自己的日志,而是先發(fā)送一個OffsetsForLeaderEpochRequest請求給到leader副本,leader副本收到請求之后返回當前的LEO。
如果follower副本的leaderEpoch和leader副本的epoch相同, leader的leo只可能大于或者等于follower副本的leo值,所以這個時候不會發(fā)生截斷
如果follower副本和leader副本的epoch值不同,那么leader副本會查找follower副本傳過來的epoch+1在本地文件中存儲的StartOffset返回給follower副本,也就是新leader副本的LEO。這樣也避免了數(shù)據(jù)丟失的問題
如果leader副本宕機了重新選舉新的leader,那么原本的follower副本就會變成leader,意味著epoch從0變成1,使得原本follower副本中LEO的值的到了保留。

Leader副本的選舉過程

  1. KafkaController會監(jiān)聽ZooKeeper的/brokers/ids節(jié)點路徑,一旦發(fā)現(xiàn)有broker掛了,執(zhí)行下面的邏輯。這里暫時先不考慮KafkaController所在broker掛了的情況,KafkaController掛了,各個broker會重新leader選舉出新的KafkaController
  2. leader副本在該broker上的分區(qū)就要重新進行l(wèi)eader選舉,目前的選舉策略是
    a) 優(yōu)先從isr列表中選出第一個作為leader副本,這個叫優(yōu)先副本,理想情況下有限副本就是該分區(qū)的leader副本
    b) 如果isr列表為空,則查看該topic的unclean.leader.election.enable配置。
    unclean.leader.election.enable:為true則代表允許選用非isr列表的副本作為leader,那么此時就意味著數(shù)據(jù)可能丟失,為false的話,則表示不允許,直接拋出NoReplicaOnlineException異常,造成leader副本選舉失敗。
    c) 如果上述配置為true,則從其他副本中選出一個作為leader副本,并且isr列表只包含該leader副本。一旦選舉成功,則將選舉后的leader和isr和其他副本信息寫入到該分區(qū)的對應的zk路徑上。

消息的存儲

消息發(fā)送端發(fā)送消息到broker上以后,消息是如何持久化的呢?那么接下來去分析下消息的存儲首先我們需要了解的是,kafka是使用日志文件的方式來保存生產(chǎn)者和發(fā)送者的消息,每條消息都有一個offset值來表示它在分區(qū)中的偏移量。Kafka中存儲的一般都是海量的消息數(shù)據(jù),為了避免日志文件過大,Log并不是直接對應在一個磁盤上的日志文件,而是對應磁盤上的一個目錄,這個目錄的命名規(guī)則是_

消息的文件存儲機制

一個topic的多個partition在物理磁盤上的保存路徑,路徑保存在 /tmp/kafka-logs/topic_partition,包含日志文件、索引文件和時間索引文件

Kafka是如何保證高性能和高吞吐量的?

image.png

kafka是通過分段的方式將Log分為多個LogSegment,LogSegment是一個邏輯上的概念,一個LogSegment對應磁盤上的一個日志文件和一個索引文件,其中日志文件是用來記錄消息的。索引文件是用來保存消息的索引。那么這個LogSegment是什么呢?

LogSegment

假設kafka以partition為最小存儲單位,那么我們可以想象當kafka producer不斷發(fā)送消息,必然會引起partition文件的無線擴張,這樣對于消息文件的維護以及被消費的消息的清理帶來非常大的挑戰(zhàn),所以kafka 以segment為單位又把partition進行細分。每個partition相當于一個巨型文件被平均分配到多個大小相等的segment數(shù)據(jù)文件中(每個segment文件中的消息不一定相等),這種特性方便已經(jīng)被消費的消息的清理,提高磁盤的利用率。

  • log.segment.bytes=107370 (設置分段大小),默認是1gb,我們把這個值調(diào)小以后,可以看到日志分段的效果
  • 抽取其中3個分段來進行分析
    image.png
    segment file由2大部分組成,分別為index file和data file,此2個文件一一對應,成對出現(xiàn),后綴'.index'和“.log”分別表示為segment索引文件、數(shù)據(jù)文件.
    segment文件命名規(guī)則:partion全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值進行遞增。數(shù)值最大為64位long大小,20位數(shù)字字符長度,沒有數(shù)字用0填充

查看segment文件命名規(guī)則

通過下面這條命令可以看到kafka消息日志的內(nèi)容

假如第一個log文件的最后一個offset為:5376,所以下一個segment的文件命名為:
00000000000000005376.log。對應的index為00000000000000005376.index

segment中index和log的對應關系

從所有分段中,找一個分段進行分析
為了提高查找消息的性能,為每一個日志文件添加2個索引索引文件:OffsetIndex 和 TimeIndex,分別對應.index以及.timeindex, TimeIndex索引文件格式:它是映射時間戳和相對offset
查看索引內(nèi)容:

Kafka是如何保證高性能和高吞吐量的?

image.png

如圖所示,index中存儲了索引以及物理偏移量。 log存儲了消息的內(nèi)容。索引文件的元數(shù)據(jù)執(zhí)行對應數(shù)據(jù)文件中message的物理偏移地址。舉個簡單的案例來說,以[4053,80899]為例,在log文件中,對應的是第4053條記錄,物理偏移量(position)為80899. position是ByteBuffer的指針位置

在partition中如何通過offset查找message

查找的算法是

  1. 根據(jù)offset的值,查找segment段中的index索引文件。由于索引文件命名是以上一個文件的最后一個offset進行命名的,所以,使用二分查找算法能夠根據(jù)offset快速定位到指定的索引文件。
  2. 找到索引文件后,根據(jù)offset進行定位,找到索引文件中的符合范圍的索引。(kafka采用稀疏索引的方式來提高查找性能)
  3. 得到position以后,再到對應的log文件中,從position出開始查找offset對應的消息,將每條消息的offset與目標offset進行比較,直到找到消息

比如說,我們要查找offset=2490這條消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]這個索引,再到log文件中,根據(jù)49111這個position開始查找,比較每條消息的offset是否大于等于2490。最后查找到對應的消息以后返回

Log文件的消息內(nèi)容分析

前面我們通過kafka提供的命令,可以查看二進制的日志文件信息,一條消息,會包含很多的字段。

offset和position這兩個前面已經(jīng)講過了、 createTime表示創(chuàng)建時間、keysize和valuesize表示key和value的大小、 compresscodec表示壓縮編碼、payload:表示消息的具體內(nèi)容

日志的清除策略以及壓縮策略

日志清除策略

前面提到過,日志的分段存儲,一方面能夠減少單個文件內(nèi)容的大小,另一方面,方便kafka進行日志清理。日志的清理策略有兩個:

  1. 根據(jù)消息的保留時間,當消息在kafka中保存的時間超過了指定的時間,就會觸發(fā)清理過程
  2. 根據(jù)topic存儲的數(shù)據(jù)大小,當topic所占的日志文件大小大于一定的閥值,則可以開始刪除最舊的消息。kafka會啟動一個后臺線程,定期檢查是否存在可以刪除的消息

通過log.retention.bytes和log.retention.hours這兩個參數(shù)來設置,當其中任意一個達到要求,都會執(zhí)行刪除。
默認的保留時間是:7天

日志壓縮策略

Kafka還提供了“日志壓縮(Log Compaction)”功能,通過這個功能可以有效的減少日志文件的大小,緩解磁盤緊張的情況,在很多實際場景中,消息的key和value的值之間的對應關系是不斷變化的,就像數(shù)據(jù)庫中的數(shù)據(jù)會不斷被修改一樣,消費者只關心key對應的最新的value。因此,我們可以開啟kafka的日志壓縮功能,服務端會在后臺啟動啟動Cleaner線程池,定期將相同的key進行合并,只保留最新的value值。日志的壓縮原理是

Kafka是如何保證高性能和高吞吐量的?

image.png

磁盤存儲的性能問題

磁盤存儲的性能優(yōu)化

我們現(xiàn)在大部分企業(yè)仍然用的是機械結構的磁盤,如果把消息以隨機的方式寫入到磁盤,那么磁盤首先要做的就是尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤上就要找到對應的柱面、磁頭以及對應的扇區(qū);這個過程相對內(nèi)存來說會消耗大量時間,為了規(guī)避隨機讀寫帶來的時間消耗,kafka采用順序?qū)懙姆绞酱鎯?shù)據(jù)。即使是這樣,但是頻繁的I/O操作仍然會造成磁盤的性能瓶頸

零拷貝

消息從發(fā)送到落地保存,broker維護的消息日志本身就是文件目錄,每個文件都是二進制保存,生產(chǎn)者和消費者使用相同的格式來處理。在消費者獲取消息時,服務器先從硬盤讀取數(shù)據(jù)到內(nèi)存,然后把內(nèi)存中的數(shù)據(jù)原封不動的通過socket發(fā)送給消費者。雖然這個操作描述起來很簡單,但實際上經(jīng)歷了很多步驟。

操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁緩存:
? 應用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
? 應用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到socket緩存中
? 操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)復制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡發(fā)出

Kafka是如何保證高性能和高吞吐量的?

image.png

通過“零拷貝”技術,可以去掉這些沒必要的數(shù)據(jù)復制操作,同時也會減少上下文切換次數(shù)?,F(xiàn)代的unix操作系統(tǒng)提供一個優(yōu)化的代碼路徑,用于將數(shù)據(jù)從頁緩存?zhèn)鬏數(shù)絪ocket;在Linux中,是通過sendfile系統(tǒng)調(diào)用來完成的。Java提供了訪問這個系統(tǒng)調(diào)用的方法:FileChannel.transferTo API
使用sendfile,只需要一次拷貝就行,允許操作系統(tǒng)將數(shù)據(jù)直接從頁緩存發(fā)送到網(wǎng)絡上。所以在這個優(yōu)化的路徑中,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的

Kafka是如何保證高性能和高吞吐量的?

image.png

頁緩存

頁緩存是操作系統(tǒng)實現(xiàn)的一種主要的磁盤緩存,但凡設計到緩存的,基本都是為了提升i/o性能,所以頁緩存是用來減少磁盤I/O操作的。
磁盤高速緩存有兩個重要因素:
第一,訪問磁盤的速度要遠低于訪問內(nèi)存的速度,若從處理器L1和L2高速緩存訪問則速度更快。
第二,數(shù)據(jù)一旦被訪問,就很有可能短時間內(nèi)再次訪問。正是由于基于訪問內(nèi)存比磁盤快的多,所以磁盤的內(nèi)存緩存將給系統(tǒng)存儲性能帶來質(zhì)的飛越。

當 一 個進程準備讀取磁盤上的文件內(nèi)容時, 操作系統(tǒng)會先查看待讀取的數(shù)據(jù)所在的頁(page)是否在頁緩存(pagecache)中,如果存在(命中)則直接返回數(shù)據(jù), 從而避免了對物理磁盤的I/0操作;如果沒有命中, 則操作系統(tǒng)會向磁盤發(fā)起讀取請求并將讀取的數(shù)據(jù)頁存入頁緩存, 之后再將數(shù)據(jù)返回給進程。
同樣,如果 一 個進程需要將數(shù)據(jù)寫入磁盤, 那么操作系統(tǒng)也會檢測數(shù)據(jù)對應的頁是否在頁緩存中,如果不存在, 則會先在頁緩存中添加相應的頁, 最后將數(shù)據(jù)寫入對應的頁。 被修改過后的頁也就變成了臟頁, 操作系統(tǒng)會在合適的時間把臟頁中的數(shù)據(jù)寫入磁盤, 以保持數(shù)據(jù)的 一 致性
Kafka中大量使用了頁緩存, 這是Kafka實現(xiàn)高吞吐的重要因素之 一 。 雖然消息都是先被寫入頁緩存,然后由操作系統(tǒng)負責具體的刷盤任務的, 但在Kafka中同樣提供了同步刷盤及間斷性強制刷盤(fsync),可以通過 log.flush.interval.messages 和 log.flush.interval.ms 參數(shù)來控制。
同步刷盤能夠保證消息的可靠性,避免因為宕機導致頁緩存數(shù)據(jù)還未完成同步時造成的數(shù)據(jù)丟失。但是實際使用上,我們沒必要去考慮這樣的因素以及這種問題帶來的損失,消息可靠性可以由多副本來解決,同步刷盤會帶來性能的影響。 刷盤的操作由操作系統(tǒng)去完成即可

Kafka消息的可靠性

沒有一個中間件能夠做到百分之百的完全可靠,可靠性更多的還是基于幾個9的衡量指標,比如4個9、5個9. 軟件系統(tǒng)的可靠性只能夠無限去接近100%,但不可能達到100%。所以kafka如何是實現(xiàn)最大可能的可靠性呢?

  • 分區(qū)副本, 你可以創(chuàng)建更多的分區(qū)來提升可靠性,但是分區(qū)數(shù)過多也會帶來性能上的開銷,一般來說,3個副本就能滿足對大部分場景的可靠性要求
  • acks,生產(chǎn)者發(fā)送消息的可靠性,也就是我要保證我這個消息一定是到了broker并且完成了多副本的持久化,但這種要求也同樣會帶來性能上的開銷。它有幾個可選項:
    1 ,生產(chǎn)者把消息發(fā)送到leader副本,leader副本在成功寫入到本地日志之后就告訴生產(chǎn)者消息提交成功,但是如果isr集合中的follower副本還沒來得及同步leader副本的消息,leader掛了,就會造成消息丟失。
    -1 ,消息不僅僅寫入到leader副本,并且被ISR集合中所有副本同步完成之后才告訴生產(chǎn)者已經(jīng)提交成功,這個時候即使leader副本掛了也不會造成數(shù)據(jù)丟失。
    0:表示producer不需要等待broker的消息確認。這個選項時延最小但同時風險最大(因為當server宕機時,數(shù)據(jù)將會丟失)。
  • 保障消息到了broker之后,消費者也需要有一定的保證,因為消費者也可能出現(xiàn)某些問題導致消息沒有消費到。
  • enable.auto.commit默認為true,也就是自動提交offset,自動提交是批量執(zhí)行的,有一個時間窗口,這種方式會帶來重復提交或者消息丟失的問題,所以對于高可靠性要求的程序,要使用手動提交。 對于高可靠要求的應用來說,寧愿重復消費也不應該因為消費異常而導致消息丟失

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多