題記
logstash和kafka_connector都僅支持基于自增id或者時間戳更新的方式 回到問題本身:如果庫表里沒有相關(guān)字段,該如何處理呢? 本文給出相關(guān)探討和解決方案。 1、 binlog認(rèn)知1.1 啥是 binlog?binlog是Mysql sever層維護(hù)的一種 作用主要有:
1.2 阿里的Canal實(shí)現(xiàn)了增量Mysql同步一圖勝千言,canal是用java開發(fā)的基于數(shù)據(jù)庫增量日志解析、提供增量數(shù)據(jù)訂閱&消費(fèi)的中間件。 目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用來處理獲得的相關(guān)數(shù)據(jù)。目的: 綜上,使用binlog可以突破logstash或者kafka-connector沒有自增id或者沒有時間戳字段的限制,實(shí)現(xiàn)增量同步。 2、基于binlog的同步方式1)基于kafka Connect的Debezium 開源工程,地址:. https:/// 2)不依賴第三方的獨(dú)立應(yīng)用: Maxwell開源項(xiàng)目,地址:http:/// 由于已經(jīng)部署過conluent(kafka的企業(yè)版本,自帶zookeeper、kafka、ksql、kafka-connector等),本文僅針對Debezium展開。 3、Debezium介紹Debezium是捕獲數(shù)據(jù)實(shí)時動態(tài)變化的開源的分布式同步平臺。能實(shí)時捕獲到數(shù)據(jù)源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、刪除(deletes)操作,實(shí)時同步到Kafka,穩(wěn)定性強(qiáng)且速度非常快。 特點(diǎn):
4、同步架構(gòu)如圖,Mysql到ES的同步策略,采取“曲線救國”機(jī)制。 步驟1: 基Debezium的binlog機(jī)制,將Mysql數(shù)據(jù)同步到Kafka。 步驟2: 基于Kafka_connector機(jī)制,將kafka數(shù)據(jù)同步到Elasticsearch。 5、Debezium實(shí)現(xiàn)Mysql到ES增刪改實(shí)時同步軟件版本:
5.1 Debezium安裝confluent的安裝部署參見:http:///Ef5poZk,不再贅述。 Debezium的安裝只需要把debezium-connector-mysql的壓縮包解壓放到Confluent的解壓后的插件目錄(share/java)中。 MySQL Connector plugin 壓縮包的下載地址:
注意重啟一下confluent,以使得Debezium生效。 5.2 Mysql binlog等相關(guān)配置。Debezium使用MySQL的binlog機(jī)制實(shí)現(xiàn)數(shù)據(jù)動態(tài)變化監(jiān)測,所以需要Mysql提前配置binlog。 核心配置如下,在Mysql機(jī)器的/etc/my.cnf的mysqld下添加如下配置。 1[mysqld] 然后,重啟一下Mysql以使得binlog生效。 1systemctl start mysqld.service 5.3 配置connector連接器。配置confluent路徑目錄 : /etc 創(chuàng)建文件夾命令 : 1mkdir kafka-connect-debezium 在mysql2kafka_debezium.json存放connector的配置信息 : 1[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json 注意如下配置:
坑一:transforms相關(guān)5行配置作用是寫入數(shù)據(jù)格式轉(zhuǎn)換。 如果沒有,輸入數(shù)據(jù)會包含:before、after記錄修改前對比信息以及元數(shù)據(jù)信息(source,op,ts_ms等)。 這些信息在后續(xù)數(shù)據(jù)寫入Elasticsearch是不需要的。(注意結(jié)合自己業(yè)務(wù)場景)。 格式轉(zhuǎn)換相關(guān)原理:http:///EftoaIi 5.4 啟動connector1curl -X POST -H "Content-Type:application/json" 5.5 驗(yàn)證寫入是否成功。5.5.1 查看kafka-topic1 kafka-topics --list --zookeeper localhost:2181 此處會看到寫入數(shù)據(jù)topic的信息。 注意新寫入數(shù)據(jù)topic的格式:database.schema.table-smt 三部分組成。 本示例topic名稱: full.kafka_base_db.account-smt 5.5.2 消費(fèi)數(shù)據(jù)驗(yàn)證寫入是否正常1./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning 至此,Debezium實(shí)現(xiàn)mysql同步kafka完成。 6、kafka-connector實(shí)現(xiàn)kafka同步Elasticsearch6.1、Kafka-connector介紹見官網(wǎng):https://docs./current/connect.html Kafka Connect是一個用于連接Kafka與外部系統(tǒng)(如數(shù)據(jù)庫,鍵值存儲,檢索系統(tǒng)索引和文件系統(tǒng))的框架。 連接器實(shí)現(xiàn)公共數(shù)據(jù)源數(shù)據(jù)(如Mysql、Mongo、Pgsql等)寫入Kafka,或者Kafka數(shù)據(jù)寫入目標(biāo)數(shù)據(jù)庫,也可以自己開發(fā)連接器。 6.2、kafka到ES connector同步配置配置路徑: 1/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties 配置內(nèi)容: 1"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", 6.3 kafka到ES啟動connector啟動命令 1confluent load elasticsearch-sink-test 6.4 Kafka-connctor RESTFul API查看Mysql2kafka,kafka2ES的connector詳情信息可以借助postman或者瀏覽器或者命令行查看。 1curl -X GET http://localhost:8083/connectors 7、坑復(fù)盤。坑2: 同步的過程中可能出現(xiàn)錯誤,比如:kafka topic沒法消費(fèi)到數(shù)據(jù)。
1curl -X GET http://localhost:8083/connectors-xxx/status 坑3: Mysql2ES出現(xiàn)日期格式不能識別。 是Mysql jar包的問題,解決方案:在my.cnf中配置時區(qū)信息即可。 坑4: kafka2ES,ES沒有寫入數(shù)據(jù)。 排解思路:
8、小結(jié)
參考: [2] http:///EftXJU6 [3] http:///EftXO8c [4] http:///EftXn9M [5] http:///EftXeOc |
|