一、為什么要做隨著馬蜂窩的逐漸發(fā)展,我們的業(yè)務(wù)數(shù)據(jù)越來越多,單純使用 MySQL 已經(jīng)不能滿足我們的數(shù)據(jù)查詢需求,例如對于商品、訂單等數(shù)據(jù)的多維度檢索。 使用 Elasticsearch 存儲業(yè)務(wù)數(shù)據(jù)可以很好的解決我們業(yè)務(wù)中的搜索需求。而數(shù)據(jù)進(jìn)行異構(gòu)存儲后,隨之而來的就是數(shù)據(jù)同步的問題。 二、現(xiàn)有方法及問題對于數(shù)據(jù)同步,我們目前的解決方案是建立數(shù)據(jù)中間表。把需要檢索的業(yè)務(wù)數(shù)據(jù),統(tǒng)一放到一張MySQL 表中,這張中間表對應(yīng)了業(yè)務(wù)需要的Elasticsearch 索引,每一列對應(yīng)索引中的一個Mapping 字段。通過腳本以 Crontab 的方式,讀取 MySQL 中間表中 UTime 大于上一次讀取時(shí)間的所有數(shù)據(jù),即該段時(shí)間內(nèi)的增量,寫入Elasticsearch。 所以,一旦業(yè)務(wù)邏輯中有相應(yīng)字段的數(shù)據(jù)變更,需要同時(shí)顧及 MySQL 中間表的變更;如果需要 Elasticsearch 中的數(shù)據(jù)即時(shí)性較高,還需要同時(shí)寫入 Elasticsearch。 隨著業(yè)務(wù)數(shù)據(jù)越來越多,MySQL 中間表的數(shù)據(jù)量越來越大。當(dāng)需要在 Elasticsearch 的索引中新增 Mapping 字段時(shí),相應(yīng)的 MySQL 中間表也需要新增列,在數(shù)據(jù)量龐大的表中,擴(kuò)展列的耗時(shí)是難以忍受的。 而且 Elasticsearch 索引中的 Mapping 字段隨著業(yè)務(wù)發(fā)展增多,需要由業(yè)務(wù)方增加相應(yīng)的寫入 MySQL 中間表方法,這也帶來一部分開發(fā)成本。 三、方案設(shè)計(jì)3.1、 整體思路目前現(xiàn)有的一些開源數(shù)據(jù)同步工具,如阿里的 DataX 等,主要是基于查詢來獲取數(shù)據(jù)源,這會存在如何確定增量(比如使用utime字段解決等)和輪詢頻率的問題,而我們一些業(yè)務(wù)場景對于數(shù)據(jù)同步的實(shí)時(shí)性要求比較高。為了解決上述問題,我們提出了一種基于 MySQL Binlog 來進(jìn)行 MySQL 數(shù)據(jù)同步到 Elasticsearch 的思路。Binlog 是 MySQL 通過 Replication 協(xié)議用來做主從數(shù)據(jù)同步的數(shù)據(jù),所以它有我們需要寫入 Elasticsearch 的數(shù)據(jù),并符合對數(shù)據(jù)同步時(shí)效性的要求。 使用 Binlog 數(shù)據(jù)同步 Elasticsearch,業(yè)務(wù)方就可以專注于業(yè)務(wù)邏輯對 MySQL 的操作,不用再關(guān)心數(shù)據(jù)向 Elasticsearch 同步的問題,減少了不必要的同步代碼,避免了擴(kuò)展中間表列的長耗時(shí)問題。 經(jīng)過調(diào)研后,我們采用開源項(xiàng)目 go-mysql-elasticsearch 實(shí)現(xiàn)數(shù)據(jù)同步,并針對馬蜂窩技術(shù)棧和實(shí)際的業(yè)務(wù)環(huán)境進(jìn)行了一些定制化開發(fā)。 3.2、 數(shù)據(jù)同步正確性保證公司的所有表的 Binlog 數(shù)據(jù)屬于機(jī)密數(shù)據(jù),不能直接獲取,為了滿足各業(yè)務(wù)線的使用需求,采用接入 Kafka 的形式提供給使用方,并且需要使用方申請相應(yīng)的 Binlog 數(shù)據(jù)使用權(quán)限。獲取使用權(quán)限后,使用方以 Consumer Group 的形式讀取。 這種方式保證了 Binglog 數(shù)據(jù)的安全性,但是對保證數(shù)據(jù)同步的正確性帶來了挑戰(zhàn)。因此我們設(shè)計(jì)了一些機(jī)制,來保證數(shù)據(jù)源的獲取有序、完整。 1). 順序性 通過 Kafka 獲取 Binlog 數(shù)據(jù),首先需要保證獲取數(shù)據(jù)的順序性。嚴(yán)格說,Kafka 是無法保證全局消息有序的,只能局部有序,所以無法保證所有 Binlog 數(shù)據(jù)都可以有序到達(dá) Consumer。 但是每個 Partition 上的數(shù)據(jù)是有序的。為了可以按順序拿到每一行 MySQL 記錄的 Binglog,我們把每條 Binlog 按照其 Primary Key,Hash 到各個 Partition 上,保證同一條 MySQL 記錄的所有 Binlog 數(shù)據(jù)都發(fā)送到同一個 Partition。 如果是多 Consumer 的情況,一個 Partition 只會分配給一個 Consumer,同樣可以保證 Partition 內(nèi)的數(shù)據(jù)可以有序的 Update 到 Elasticsearch 中。 2). 完整性 考慮到同步程序可能面臨各種正?;虍惓5耐顺?,以及 Consumer 數(shù)量變化時(shí)的 Rebalance,我們需要保證在任何情況下不能丟失 Binlog 數(shù)據(jù)。 利用 Kafka 的 Offset 機(jī)制,在確認(rèn)一條 Message 數(shù)據(jù)成功寫入 Elasticsearch 后,才 Commit 該條 Message 的 Offset,這樣就保證了數(shù)據(jù)的完整性。而對于數(shù)據(jù)同步的使用場景,在保證了數(shù)據(jù)順序性和完整性的情況下,重復(fù)消費(fèi)是不會有影響的。 四、技術(shù)實(shí)現(xiàn)黃色箭頭表示依賴,藍(lán)色箭頭表示數(shù)據(jù)流向 4.1、 功能模塊·配置解析模塊 負(fù)責(zé)解析配置文件(toml 或 json 格式),或在配置中心(Skipper)配置的 json 字符串。包括 Kafka 集群配置、Elasticsearch 地址配置、日志記錄方式配置、MySQL 庫表及字段與 Elasticsearch 的 Index 和 Mapping 對應(yīng)關(guān)系配置等。 ·規(guī)則模塊 規(guī)則模塊決定了一條 Binlog 數(shù)據(jù)應(yīng)該寫入到哪個 Elasticsearch 索引、文檔_id 對應(yīng)的 MySQL 字段、Binlog 中的各個 MySQL 字段與索引 Mapping 的對應(yīng)關(guān)系和寫入類型等。 在本地化過程中,根據(jù)我們的業(yè)務(wù)場景,增加了對 MySQL 表各字段的 where 條件判斷,來過濾掉不需要的 Binlog 數(shù)據(jù)。 ·Kafka 相關(guān)模塊 該模塊負(fù)責(zé)連接 Kafka 集群,獲取 Binlog 數(shù)據(jù)。在本地化過程中,該模塊的大部分功能已經(jīng)封裝成了一個通用的 Golang Kafka Consumer Client。包括 Dba Binlog 訂閱平臺要求的 SASL 認(rèn)證,以及從指定時(shí)間點(diǎn)的 Offset 開始消費(fèi)數(shù)據(jù)。 ·Binlog 數(shù)據(jù)解析模塊 原項(xiàng)目中的 Binlog 數(shù)據(jù)解析針對的是原始的 Binlog 數(shù)據(jù),包含了解析 Replication 協(xié)議的實(shí)現(xiàn)。在我們的使用場景中,Binlog 數(shù)據(jù)已經(jīng)是由 canal 解析成的 json 字符串,所以對該模塊的功能進(jìn)行了簡化。 binlog json字符串示例 上面是一個簡化的 binlog json 字符串,通過該條 binlog 的 database 和 table 可以命中一條配置規(guī)則,根據(jù)該配置規(guī)則,把 Data 中的 key-value 構(gòu)造成一個與對應(yīng) Elasticsearch 索引相匹配的 key-value map,同時(shí)包括一些數(shù)據(jù)類型的轉(zhuǎn)換:
·Elasticsearch相關(guān)模塊 Binlog 數(shù)據(jù)解析模塊生成的 key-value map,由該模塊拼裝成請求_bulk 接口的 update payload,寫入 Elasticsearch??紤]到 MySQL 頻繁更新時(shí)對 Elasticsearch 的寫入壓力,key-value map 會暫存到一個 slice 中,每 200ms 或 slice 長度達(dá)到一定長度時(shí)(可以通過配置調(diào)整),才會調(diào)用 Elasticsearch 的_bulk 接口,寫入數(shù)據(jù)。 4.2、定制化開發(fā)1). 適應(yīng)業(yè)務(wù)需求 ·upsert 業(yè)務(wù)中使用的索引數(shù)據(jù)可能是來自多個不同的表,同一個文檔的數(shù)據(jù)來自不同表的時(shí)候,先到的數(shù)據(jù)是一條 index,后到的數(shù)據(jù)是一條 update,在我們無法控制先后順序時(shí),需要實(shí)現(xiàn) upsert 功能。在_bulk 參數(shù)中加入
·Filter 實(shí)際業(yè)務(wù)場景中,可能業(yè)務(wù)需要的數(shù)據(jù)只是某張表中的部分?jǐn)?shù)據(jù),比如用 type 字段標(biāo)識該條數(shù)據(jù)來源,只需要把 type=1或2的數(shù)據(jù)同步到 Elasticsearch 中。我們擴(kuò)展了規(guī)則配置,可以支持對 Binlog 指定字段的過濾需求,類似
2). 快速增量 數(shù)據(jù)同步一般分為全量和增量。接入一個業(yè)務(wù)時(shí),首先需要把業(yè)務(wù)現(xiàn)有的歷史 MySQL 數(shù)據(jù)導(dǎo)入到 Elasticsearch 中,這部分為全量同步。在全量同步過程中以及后續(xù)增加的數(shù)據(jù)為增量數(shù)據(jù)。 在全量數(shù)據(jù)同步完成后,如果從最舊開始消費(fèi) Kafka,隊(duì)列數(shù)據(jù)量很大的情況下,需要很長時(shí)間增量數(shù)據(jù)才能追上當(dāng)前進(jìn)度。為了更快的拿到所需的增量 Binlog,在 Consumer Group 消費(fèi) Kafka 之前,先獲取各個 Topic 的 Partition 在指定時(shí)間的 offset 值,并 commit 這些 offset,這樣在 Consumer Group 連接 Kafka 集群時(shí),會從剛才提交的 offset 開始消費(fèi),可以立即拿到所需的增量 Binlog。 3). 微服務(wù)和配置中心 項(xiàng)目使用馬蜂窩微服務(wù)部署,為新接入業(yè)務(wù)提供了快速上線支持,并且在業(yè)務(wù) Binlog 數(shù)據(jù)突增時(shí)可以方便快速的擴(kuò)容 Consumer。 馬蜂窩配置中心支持了各個接入業(yè)務(wù)的配置管理,相比于開源項(xiàng)目中的 toml 格式配置文件,使用配置中心可以更方便的管理不同業(yè)務(wù)不同環(huán)境的配置。 五、日志與監(jiān)控馬蜂窩微服務(wù)的日志 ELK 提供了記錄和查詢?nèi)罩镜耐緩剑覀儗τ诮尤霕I(yè)務(wù)的每一條 Binlog 的數(shù)據(jù)流轉(zhuǎn)都記錄了日志。如針對一個訂單,拿到的 Kafka Message 是什么,調(diào)用_bulk 接口時(shí)的 Post Payload 是什么,_bulk 接口的 Response有沒有錯誤信息等。 除了方便于排查問題,日志也是監(jiān)控的一部分。目前監(jiān)控的指標(biāo)有兩個,一個是數(shù)據(jù)同步延時(shí),同步延時(shí)計(jì)算的是該條 Binlog 從產(chǎn)生到寫入 Elasticsearch 的時(shí)間差。 從上圖中可以看出,訂單各個表的數(shù)據(jù)同步延時(shí)平均在 1s 左右。把延時(shí)數(shù)據(jù)接入 ElastAlert,在延時(shí)數(shù)據(jù)過多時(shí)發(fā)送報(bào)警通知。 另一個監(jiān)控指標(biāo)是心跳檢測,單獨(dú)建立一張獨(dú)立于業(yè)務(wù)的表,crontab 腳本每分鐘修改一次該表,同時(shí)檢查上一次修改是否同步到了指定的索引,如果沒有,則發(fā)送報(bào)警通知。該心跳檢測,監(jiān)控了整個流程上的 Kafka、微服務(wù)和 ES,任何一個會導(dǎo)致數(shù)據(jù)不同步的環(huán)節(jié)出問題,都會第一個接到通知。 六、結(jié)語目前接入的最重要業(yè)務(wù)方是電商的訂單索引,數(shù)據(jù)同步延時(shí)穩(wěn)定在 1s 左右。這次的開源項(xiàng)目本地化實(shí)踐,希望能為一些有 Elasticsearch 數(shù)據(jù)同步需求的業(yè)務(wù)場景提供幫助。 本文作者:張坤,馬蜂窩電商研發(fā)團(tuán)隊(duì)度假業(yè)務(wù)高級研發(fā)工程師。
|
|