平時ogg數據同步的案例中,遇到最多就是常見RDBMS之間的同步,源端和目標端分別找合適安裝包,按照經典的抽取、傳輸、復制3個進程進行實施,例如oracle到oracle,oracle到mysql,sqlserver到oracle等等場景。早就知道ogg也可以同步數據到Hahoop等BigData平臺及Kafka等消息中間件進行同步,今天終于遇到這樣的實際需求了。
需求:某系統需要原生實時數據采集到大數據平臺。根據統一的實現方案,kafka規劃原則:
省分topic隔離。不同省分,不共用相同的topic;
依據省份生產庫實例建設情況,按實例劃分對接topic,要求kafka一個topic對應一個物理數據庫實例;
為保證每張表數據在kafka中保證“有序”,要求每張表對應topic的一個partition;
具體省分kafka規劃方案如下:
針對本次環境源端是oracle12c數據庫,目標端是Kafka2.11-1.0.2集群。源端使用ogg12cfor oracle即可,目標端我們需要使用OGG_BigData的軟件包來實現的。根據官方的版本適配文檔,確定使用19c19.1來搭建。
源端軟件包
123014_fbo_ggs_Linux_x64_shiphome.zip
(12.3.0.1.4OGGCORE_12.3.0.1.0_PLATFORMS_180415.0359_FBO)
目標端軟件包
OGG_BigData_Linux_x64_19.1.0.0.1.zip
(19.1.0.0.2OGGCORE_OGGADP.19.1.0.0.2_PLATFORMS_190916.0039)
對于kafka集群來說,ogg的目標端程序實際上是作為kafka的生產者客戶端,把解析trail文件得到的數據推送到kafka中。
源端數據庫12cRAC的多租戶模式,使用ogg12c12.3的集成模式,按常規配置ex_kaf、dp_kaf進程即可。
抽取進程EX_KAF參數
##view param EX_KAF EXTRACT ex_kaf SETENV (NLS_LANG=AMERICAN_AMERICA.ZHS16GBK) setenv (ORACLE_HOME=/u01/app/oracle/product/12.2.0.1/db_1) SETENV (ORACLE_SID=db2) USERID c##ggs@db, PASSWORD XXXX EXTTRAIL ./dirdat/ha --DISCARDFILE ./dirrpt/ex_kaf.DSC, APPEND,MEGABYTES 100 -- report info REPORTCOUNT EVERY 10 MINUTES, RATE WARNLONGTRANS 2h,CHECKINTERVAL 30m dboptions allowunusedcolumn fetchoptions nousesnapshot LOGALLSUPCOLS //加入前鏡像(12c新版本特有參數,遇到josn格式不固定,新老參數同時加) getupdatebefores // 加入前鏡像(老版本參數) nocompressdeletes / /加入前鏡像(老版本參數) nocompressupdates // 加入前鏡像(老版本參數) --crm3hj 208 tables TABLE crm3hj.CUST.TAB; |
##view param DP_KAF EXTRACT dp_kaf PASSTHRU //傳輸進程透傳,不做任何處理 rmthost 192.168.100.100, mgrport 7809 , TCPBUFSIZE 300000, TCPFLUSHBYTES 300000//目標端地址 rmttrail ./dirdat/hk EOFDELAYCSECS 10 --gettruncates --crm3hj 8 tables TABLE crm3hj.CUST.TAB; |
解壓ogg壓縮包
tar xvfOGG_BigData_Linux_x64_19.1.0.0.1.tar -C /oggdata/ggv191adp
因為要登錄到kafka集群,需要引用對應jar包,故需部署
tar -zxvfkafka_2.11-1.0.2.tgz -C /oggdata/kafka
kafka的lib目錄為 /oggdata/kafka/kafka_2.11-1.0.2/libs
參數文件配置,這里除了rp_kaf進程參數文件外,還有kafka屬性參數文件、生產者屬性配置參數文件。
rp_kaf.prm參數文件
[oracle@server003 dirprm]$ cat rp_kaf.prm REPLICAT rp_kaf //定義進程名稱 sourcedefs ./dirdef/o2kaf_jt.def //指定使用源和目標的表映射文件,高版本可省略 TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka_crm_ha.props //定義kafka一些適配性的庫文件以及配置文件,配置文件指定屬性文件 --reperror default, abend reperror default, discard //錯誤處理,這里將錯誤信息記錄DSC文件 DISCARDFILE ./dirrpt/rp_kaf.DSC, APPEND, MEGABYTES 4096 //對DSC文件的屬性定義 REPORTCOUNT EVERY 10 MINUTES, RATE //報告指定10min統計一次報告 --grouptransops 10000 //組提交,以事務傳輸時,事務合并的單位,減少IO操作 MAP CRM3HJ.CUST.TAB, TARGET CUST. TAB; //具體表的映射配置 |
kafka_crm_ha.props參數文件
[oracle@server003 dirprm]$ cat kafka_crm_ha.props gg.handlerlist=kafkahandler //handler類型 gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.topicMappingTemplate=tp_share_crm //指定kafka的topic gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer_JT.properties //指定kafka生產者配置文件 gg.handler.kafkahandler.ProducerRecordClass=oracle.goldengate.handler.kafka.MyCreateProducerRecordHa//生產者方法 gg.handler.kafkahandler.BlockingSend=false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode=op //OGG for BigData中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務傳輸一次 gg.handler.kafkahandler.format=json #傳輸的消息最終解析的格式,格式相關 gg.handler.kafkahandler.format.includePrimaryKeys=true gg.handler.kafkahandler.format.insertOpKey=I gg.handler.kafkahandler.format.updateOpKey=U gg.handler.kafkahandler.format.deleteOpKey=D gg.handler.kafkahandler.authType=kerberos //kerberos安全認證相關 gg.handler.kafkahandler.kerberosKeytabFile=/home/oracle/KDC/kafka_XXXX.keytab gg.handler.kafkahandler.kerberosPrincipal=kafka_XXXX@HADOOP.XXXX.CN goldengate.userexit.timestamp=utc+8 goldengate.userexit.writers=javawriter javawriter.stats.display=true javawriter.stats.full=true javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/oracle/kafka_jass.conf //JVM設置 gg.log=log4j //日志記錄 gg.log.level=INFO gg.report.time=30sec gg.classpath=dirprm/:/oggdata/kafka/kafka_2.11-1.0.2/libs/*:/oggdata/ggv191adp/:/oggdata/ggv191adp/lib/* #Kafka的lib目錄 |
核心參數說明:
custom_kafka_producer.properties參數文件
[oracle@server003 dirprm]$ cat custom_kafka_producer_JT.properties bootstrap.servers=XXXX.COM //kafka集群的broker的地址 acks=1 //參考KAFKA的acks參數 compression.type=gzip //壓縮類型 reconnect.backoff.ms=1000 //重連延時 max.request.size=5024000 //請求發送設置 send.buffer.bytes=5024000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000 security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka // kerberos安全認證相關 saal.machanism=GSSAPI |
測試實踐過程中,遇到最大的問題就是默認情況下,ogg會把所有表都放在同一topic下,而根據規范文檔,不同的表要對應同一topic下的不同分區。
實際需求是每個表對應一個partition,例如
我們在kafka屬性參數文件中,可以指定自定義生產者方法,繼承ogg自帶的生產者父類,編寫自己的生產者方法這樣就能實現表與分區的對應關系。
gg.handler.kafkahandler.ProducerRecordClass=MyCreateProducerRecordXX.java
第1步:編寫MyCreateProducerRecordXX.java文件
新建MyCreateProducerRecordXX類,實現ogg預定義好的接口方法CreateProducerRecord,編寫自定義createProducerRecord方法。
第2步:編譯jar包
第3步:替換jar包
[oracle@server003 lib]$ls -rtlh ggkafka-19.1.0.0.1.003.jar
-rwxr-xr-x 1 oracleoinstall 27K Sep 26 03:37 ggkafka-19.1.0.0.1.003.jar
[oracle@server003 lib]$pwd
/oggdata/ggv191adp/ggjava/resources/lib
第4步:指定使用的自定義方法
rp_kaf進程同步的數據,也就是生產的消息其實是json格式的DML操作數據,我們可以使用消費者命令檢查查看數據內容:表名、操作類型、更新時間、主鍵信息、數據行before鏡像,數據行after鏡像。
1) ogg往kafka傳數據大體上還是之前的套路,區別點就在于怎么把復制進程當做客戶端,當做生產者去往kafka對應的topic上生產數據。
2) Kafka作為高吞吐量、低延遲、高并發的消息中間件產品,我們的同步進程甚至不需要考慮目標端的性能問題,只要往kafka上推送數據,最終的數據使用則是另一端的消費者程序怎么使用數據的問題。
3) 既然是BigData的adapter軟件包,還可以實現往HDFS、Hive、Hbase、ApacheCassandra、MongoDB、Greenplum等多種開源產品中同步數據,基本與kafka的配置類似,自定義類的實現為數據同步提供了更多的可能性,有待嘗試。
4) 實際生產中的配置,還涉及到安全認證的問題,KDC的認證在這里省略。
5) 擴展聯想一下,如果kafka消費者程序可以連接到不同的數據庫、不同的大數據開源組件進行數據的消費,那么就可以形成一個統一的模式,ogg_for_XXDB? ogg_for Big Data?KAFKA?任意數據庫。
6) 再聯想一下,在kafka上看到json格式里有數據變化的前后鏡像,是不是可以結合這個做一個基于ogg的數據操作閃回功能?
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/130014.html
摘要:從零開始設計開發一個日處理數據億的大數據高并發實時系統,哪些性能問題需要特別注意這里我們一起梳理一下本文中我將以,同學戲稱的系統網易云捕設計開發實踐中兩年的時間里碰到的真實問題,踩過的坑及解決問題的方法和大家一起討論如何解決這些問題。 本文由作者余寶虹授權網易云社區發布。 從零開始設計開發一個日處理數據8億的大數據高并發實時系統,哪些性能問題需要特別注意?這里我們一起梳理一下,本文中我...
摘要:慕課網流處理平臺學習總結時間年月日星期日說明本文部分內容均來自慕課網。 慕課網《Kafka流處理平臺》學習總結 時間:2018年09月09日星期日 說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com 教學源碼:無 學習源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 課程介紹 Kafk...
OGG Integrated Native DDL簡單測試 img{ display:block; margin:0 auto !important; width:100%; } body{ width:75%;...
閱讀 1346·2023-01-11 13:20
閱讀 1684·2023-01-11 13:20
閱讀 1132·2023-01-11 13:20
閱讀 1858·2023-01-11 13:20
閱讀 4100·2023-01-11 13:20
閱讀 2704·2023-01-11 13:20
閱讀 1385·2023-01-11 13:20
閱讀 3597·2023-01-11 13:20