国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

使用canal+Kafka進行數據庫同步實踐

Tecode / 2740人閱讀

摘要:比如,服務數據庫的數據來源于服務的數據庫服務的數據有變更操作時,需要同步到服務中。第二種解決方案通過數據庫的進行同步。并且,我們還用這套架構進行緩存失效的同步。目前這套同步架構正常運行中,后續有遇到問題再繼續更新。

在微服務拆分的架構中,各服務擁有自己的數據庫,所以常常會遇到服務之間數據通信的問題。比如,B服務數據庫的數據來源于A服務的數據庫;A服務的數據有變更操作時,需要同步到B服務中。

第一種解決方案:

在代碼邏輯中,有相關A服務數據寫操作時,以調用接口的方式,調用B服務接口,B服務再將數據寫到新的數據庫中。這種方式看似簡單,但其實“坑”很多。在A服務代碼邏輯中會增加大量這種調用接口同步的代碼,增加了項目代碼的復雜度,以后會越來越難維護。并且,接口調用的方式并不是一個穩定的方式,沒有重試機制,沒有同步位置記錄,接口調用失敗了怎么處理,突然的大量接口調用會產生的問題等,這些都要考慮并且在業務中處理。這里會有不少工作量。想到這里,就將這個方案排除了。

第二種解決方案:

通過數據庫的binlog進行同步。這種解決方案,與A服務是獨立的,不會和A服務有代碼上的耦合。可以直接TCP連接進行傳輸數據,優于接口調用的方式。 這是一套成熟的生產解決方案,也有不少binlog同步的中間件工具,所以我們關注的就是哪個工具能夠更好的構建穩定、性能滿足且易于高可用部署的方案。

經過調研,我們選擇了canal[https://github.com/alibaba/canal]。canal是阿里巴巴 MySQL binlog 增量訂閱&消費組件,已經有在生產上實踐的例子,并且方便的支持和其他常用的中間件組件組合,比如kafka,elasticsearch等,也有了canal-go go語言的client庫,滿足我們在go上的需求,其他具體內容參閱canal的github主頁。

原理簡圖

OK,開始干!現在要將A數據庫的數據變更同步到B數據庫。根據wiki很快就用docker跑起了一臺canal-server服務,直接用canal-gocanal-client代碼邏輯。用canal-go直接連canal-server,canal-servercanal-client之間是Socket來進行通信的,傳輸協議是TCP,交互協議采用的是 Google Protocol Buffer 3.0。

工作流程

1.Canal連接到A數據庫,模擬slave

2.canal-client與Canal建立連接,并訂閱對應的數據庫表

3.A數據庫發生變更寫入到binlog,Canal向數據庫發送dump請求,獲取binlog并解析,發送解析后的數據給canal-client

4.canal-client收到數據,將數據同步到新的數據庫

Protocol Buffer的序列化速度還是很快的。反序列化后得到的數據,是每一行的數據,按照字段名和字段的值的結構,放到一個數組中 代碼簡單示例:

func Handler(entry protocol.Entry)  {
    var keys []string
    rowChange := &protocol.RowChange{}
    proto.Unmarshal(entry.GetStoreValue(), rowChange)
    if rowChange != nil {
        eventType := rowChange.GetEventType()
        for _, rowData := range rowChange.GetRowDatas() { // 遍歷每一行數據             if eventType == protocol.EventType_DELETE || eventType == protocol.EventType_UPDATE {
                 columns := rowData.GetBeforeColumns() // 得到更改前的所有字段屬性             } else if eventType == protocol.EventType_INSERT {
                 columns := rowData.GetAfterColumns() // 得到更后前的所有字段屬性             }
            ......
        }
    }
} 

遇到的問題

為了高可用和更高的性能,我們會創建多個canal-client構成一個集群,來進行解析并同步到新的數據庫。這里就出現了一個比較重要的問題,如何保證canal-client集群解析消費binlog的順序性呢?

我們使用的binlog是row模式。每一個寫操作都會產生一條binlog日志。 舉個簡單的例子:插入了一條a記錄,并且立馬修改a記錄。這樣會有兩個消息發送給canal-client,如果由于網絡等原因,更新的消息早于插入的消息被處理了,還沒有插入記錄,更新操作的最后效果是失敗的。

怎么辦呢?canal可以和消息隊列組合呀!而且支持kafka,rabbitmq,rocketmq多種選擇,如此優秀。我們在消息隊列這層來實現消息的順序性。(后面會說怎么做)

選擇canal+kafka方案

我們選擇了消息隊列的業界標桿: kafka UCloud提供了kafka和rocketMQ消息隊列產品服務,使用它們能夠快速便捷的搭建起一套消息隊列系統。加速開發,方便運維。

下面就讓我們來一探究竟:

①選擇kafka消息隊列產品,并申請開通

②開通完成后,在管理界面,創建kafka集群,根據自身需求,選擇相應的硬件配置

③一個kafka+ZooKeeper集群就搭建出來了,給力!

并且包含了節點管理、Topic管理、Consumer Group管理,能夠非常方便的直接在控制臺對配置進行修改

監控視圖方面,監控的數據包括kafka生成和消費QPS,集群監控,ZooKeeper的監控。能夠比較完善的提供監控指標。

canal的kafka配置

canal配上kafka也非常的簡單。 vi /usr/local/canal/conf/canal.properties

# ...
# 可選項: tcp(默認), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9002
canal.mq.retries = 0
# flagMessage模式下可以調大該值, 但不要超過MQ消息體大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下請將該值改大, 建議50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默認50K, 由于kafka最大消息體限制請勿超過1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get數據的超時時間, 單位: 毫秒, 空為不限超時
canal.mq.canalGetTimeout = 100
# 是否為flat json格式對象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投遞是否使用事務
canal.mq.transaction = false

# mq config
canal.mq.topic=default
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2..*,.*..*
canal.mq.dynamicTopic=mydatabase.mytable
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
canal.mq.partitionHash=mydatabase.mytable

具體見:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

解決順序消費問題

看到下面這一行配置

canal.mq.partitionHash=mydatabase.mytable

我們配置了kafka的partitionHash,并且我們一個Topic就是一個表。這樣的效果就是,一個表的數據只會推到一個固定的partition中,然后再推給consumer進行消費處理,同步到新的數據庫。通過這種方式,解決了之前碰到的binlog日志順序處理的問題。這樣即使我們部署了多個kafka consumer端,構成一個集群,這樣consumer從一個partition消費消息,就是消費處理同一個表的數據。這樣對于一個表來說,犧牲掉了并行處理,不過個人覺得,憑借kafka的性能強大的處理架構,我們的業務在kafka這個節點產生瓶頸并不容易。并且我們的業務目的不是實時一致性,在一定延遲下,兩個數據庫保證最終一致性。

下圖是最終的同步架構,我們在每一個服務節點都實現了集群化。全都跑在UCloud的UK8s服務上,保證了服務節點的高可用性。

canal也是集群換,但是某一時刻只會有一臺canal在處理binlog,其他都是冗余服務。當這臺canal服務掛了,其中一臺冗余服務就會切換到工作狀態。同樣的,也是因為要保證binlog的順序讀取,所以只能有一臺canal在工作。

并且,我們還用這套架構進行緩存失效的同步。我們使用的緩存模式是:Cache-Aside。同樣的,如果在代碼中數據更改的地方進行緩存失效操作,會將代碼變得復雜。所以,在上述架構的基礎上,將復雜的觸發緩存失效的邏輯放到kafka-client端統一處理,達到一定解耦的目的。

    • *

目前這套同步架構正常運行中,后續有遇到問題再繼續更新。

更多內容,歡迎點擊下方作者主頁進行交流~

本文作者:UCloud應用研發工程師 Cary

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/125986.html

相關文章

  • 易用的 canal java 客戶端 canal-client

    摘要:易用的客戶端自身提供了簡單的客戶端,數據格式較為復雜,處理消費數據也不太方便,為了方便給業務使用,提供一種直接能獲取實體對象的方式來進行消費才更方便。 易用的canaljava 客戶端 canal 自身提供了簡單的客戶端,數據格式較為復雜,處理消費數據也不太方便,為了方便給業務使用,提供一種直接能獲取實體對象的方式來進行消費才更方便。先說一下實現的思路,首先canal 客戶端的消息對象...

    aboutU 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<