Flink具備高吞吐、低延遲、純流式架構、支持對亂序事件的處理、有狀態、高度靈活的窗口定制、失敗恢復、故障轉移、水平擴展、批處理、流處理統一的API等大數據處理優勢。基于大數據的應用場景中,從數據生產,到數據收集、數據處理、數據應用,貫穿整個大數據生命周期全棧的每個環節,Flink 均可應用其中。作為新一代開源大數據計算引擎,Flink 不僅滿足海量數據對實時性的需求,且能夠全鏈路打通端到端的數據價值挖掘。
基于開源組件的架構如果能實現性能最優化,那就是高潮迭起,掌聲不斷,如果架構性能不能最優化,那就是爺爺趕牛車,急死孫子。
筆者所在項目的日志綜管平臺架構使用了Flink組件,遇到了實時計算延遲的性能問題,下面筆者將和團隊一起解決該性能問題的過程分享如下。
運行模式 | Flink on yarn |
Fink | 1.7.2 |
Hadoop | 2.8.5 |
計算節點數 | 9臺虛擬機(單臺cpu:12核內存:64GB 硬盤:550G) |
kafka | 2.1 |
業務高峰期處理數據量(每分鐘) | 860w |
生成指標 | 30個 |
跑的任務數 | 8個 |
如圖所示,flink處理的業務鏈數據量從某一時間點突然出現斷崖式下降,數據處理積壓越來嚴重,flink 任務背壓較高,同時指標出現延時生成現象(正常處理延時1分以內)。
首先通過查看flink業務鏈處理日志,發現疑似線索。日志顯示任務連接上游kafka報Disconnection連接異常現象。當指標延時時,此錯誤信息報頻率較高,但指標不延時偶爾也會報錯,是否這就是導致問題的罪魁禍首?根據這一線索,繼續刨根問底:
分析及措施:
上游kafka采用kerberos認證機制,每隔24小時需要重新認證(調用專有客戶端進行認證),flink 9臺計算節點上部署自動認證腳本,每隔10分鐘程序自動認證,Disconnection連接異常現象出現頻率減少,但指標延時情況還在存在。
調整flink 連接kafka消費topic參數
default.api.timeout.ms
session.timeout.ms
request.timeout.ms
fetch.max.wait.ms
fetch.min.bytes
調整連接參數后Disconnection連接異常現象未出現,但指標延時現象依然存在。
通過監測上游kafka topic 消費分組Lag值,發現是下游消費滯后造成數據積壓現象。
分析結論:通過以上監測與優化措施,指標生成延遲問題仍未解決,排除由Kafka引起指標延時的可能性。
通過上述優化整改,Flink與kafka連接異常問題解決,但延遲的問題還是存在,革命尚未成功,吾輩仍需繼續深入分析。經比對多天日志,發現每次任務重啟前都有checkpoint失敗,ClosedByInterruptException異常現象
分析及措施:
因為業務鏈業務量巨大(高峰期每分鐘需處理的數據量達800萬左右),在有限flink計算節點上(9臺虛擬機),按照要求需要滿足幾十個指標在1分鐘內不出現延時生成。當任務重啟后如果從歷史檢查點恢復處理消費數據,數據量積壓概率較高,無法保障指標生成不延時。所以,重啟處理機制更改為每次任務重啟后從當前時間點消費kafka 數據,而非從檢查點開始。
關閉checkpoint后,無對應異常日志,但指標生成延遲問題依然存在。
分析結論:雖然對該可疑目標進行了tunning,但延遲依舊存在,進一步排除了checkpoint失敗導致指標延時的可能性。
排除以上兩種情況后,繼續對flink組件本身的運行狀態做全面綜合深入分析。
分析及措施:
加大并發數處理:業務鏈kafka topic 是100 partition,正常下游Flink需要開100 個并發與partition個數一一對應起來,如此配置性能才能最優。但當前flink集群資源有限(flink集群機器上跑了其它96個并發任務),無法開啟100 個并發(經測試最大可開啟72 個并發)。按可用最大并發配置后,計算節點cpu 負載30%以下,但指標仍出現延時,看來擴大并發數解決不了延時問題。
線程運行狀況
通過分析程序運行狀態,發現shsncSpanEventStream pre -> Timestamps/Watermarks 這段邏輯有線程鎖現象:
"AsyncIO-Emitter-Thread (trans stream -> Process -> shsncSpanEventStream pre -> Timestamps/Watermarks (28/72))" #181 daemon prio=5 os_prio=0 tid=0x00007f4da3cf8000 nid=0x1324c waiting for monitor entry [0x00007f4d8bdeb000]
java.lang.Thread.State: BLOCKED (on object monitor)at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:125)
- waiting to lock <0x0000000610a5b028> (a java.lang.Object)at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
at java.lang.Thread.run(Thread.java:748)
背壓運行狀況
從任務背壓圖上看,處理延時是堵在入口解析及下游水位處理環節點邏輯上:
優化措施:
flink共享waterMaker機制,在數據源進行waterMaker注冊,減少邏輯處理N倍;
對應用吐過來業務數據SPAN和SPANEVENT進行分流處理,提高程序處理速度;
增加過濾數據邏輯,過濾掉無需做指標計算的數據,減少程序數據處理量。
業務鏈flink任務入redis/es拆分出來做多帶帶算子進行入庫。
任務并發數,調整為50個并發數,消費kafka topic(topic 100 partition)
實施以上優化措施后,問題依舊,延時并沒有得到緩解和解決。由于在上一步為了排除checkpoint原因,關閉了checkpoint,關閉check后雖然沒有解決延時問題,但是由于關閉了checkpoint程序不會因為checkpoint失敗而停止,因此得以觀察延時情況下程序gc和堆棧具體使用情況。
gc分析:指標延遲時,通過jstat 監測發現flink 計算節點不斷做FGC,雖然FGC已經達到每1秒一次(FGC時JVM會暫停,導致程序卡頓延時),但是老年代并沒有任何回收(一直是99.98),因此可以判斷出現了內存泄漏,究竟是哪個位置出現了內存泄漏呢?
jmap分析: 通過jmap查看堆使用排行,驚訝的發現排在第一是Atomiclong類,占堆內存達到恐怖的2.7G,而我們的代碼并沒有顯示使用Atomiclong類,排第二的是[C,表示字符串,字符串在程序中用的很多,排第二屬正常,第三還是Atomiclong類,這個Atomiclong類究竟是哪個對象引用的呢?第四是genericonobjectpool,這個也不正常,程序中連接池對象竟然有372198個,哪里用得了這么多,還有一個jedisFactory類,一個工廠類竟然也有37萬個實例,也是有問題的。
mat分析:
通過簡單的jmap命令,發現很多不正常的類占據著堆內存的前幾名,這些類究竟有什么關系,誰才是罪魁禍首?只好使出我們的終極MAT分析大法。
通過分析導出生成的dump, 整個dump文件有6.7G,使用32G內存的機器經過10多分鐘的處理,才得到以下分析結果。
分析報告顯示ScheduledThreadPoolExecutor對象持有4.3GB堆內存未釋放,堆餅圖中占有97%
點進去查看樹圖,發現ScheduledThreadPoolExecutor對象持有4.3GB堆內存全部是GenericObjectPool對象(4.3G,接近1百萬個對象)
再點擊GenericObjectPool展開后發現:
之前通過jmap分析排行在前的AtomicLong(排第一,占2.7G)和redisFactory類都是躲藏在GenericObjectPool對象中的。分析至此,本人的第六擼感告訴我,離事情的真相越來越近了。與redis連接相關的GenericObjectPool對象就是問題的真兇,內存泄漏的對象。
看到GenericObjectPool連接池對象不釋放,首先想到的是連redis的連接池去掉。將flink任務與redis交互的代碼去掉GenericObjectPool連接池,采用直接獲取redisCluseter對象方式:
(上圖是初始代碼,JedisCluter保存在GenericObjectPool連接池中)
(去掉GenericObjectPool連接池后只獲取JedisCluster對象)
結果:問題未緩解,未解決,還得繼續。
由于去掉和redis的連接池未解決問題,依然生成大量GenericObjectPool對象不釋放,一個推測是并發原因導致單例沒有生效,生成了很多個JedisCluster對象,而JedisCluster對象包含了連接池。嘗試synchronized加鎖:
結果:問題仍未緩解,仍未解決,還得繼續。
上兩步都沒有進展,從頭開始分析代碼,代碼分析過程中發現flink十多個任務都是使用統一的redis初始化方法且只生成單個redis連接實例。十多個flink任務, 每個flink任務中又有許多地方需要用到redis連接,redis單例在這里就會成為瓶頸(數據處理不過來,進而積壓)。于是變單例的redisCluseter對象為多帶帶變量,每個用到redis連接的類都生成redisCluseter變量,然后再與redis交互,以此使redis隨Flink的連接數并發派生。
整改結果:問題得到階段性解決,之前運行一天就出現堆和gc問題,整改后穩定運行三天后又出現同樣問題。
雖然只穩定運行三天,但對筆者和整個團隊來說,也還是很開心的,說明我們的方向大概率是對的。但問題復現,作為四有好青年的IT民工,咱得發揚不怕苦,不怕累的精神繼續分析排查。這次排查過程中發現眾多的GenericObjectPool是由ScheduledThreadPoolExector引用的,ScheduledThreadPoolExector是一個異步類,再排查flink中的異步方法。找到AsyncDataStream.unorderedWait()是異步寫入redis方法,將其修改為改造后的官方flink-redis連接包,去除異步。
結果:問題解決,堆和gc一直正常
業務鏈指標生成正常:
指標數據量正常:
未發現有線程鎖現象:
Gc 正常:
1.通過此次問題一波三折的解決過程,筆者總結在排查分析處理相關開源組件的性能問題時,要充分利用jdk自帶的stat/jmap/jstack等內存分析工具及相關開源性能監測工具(如arthas)對進程運行狀態進行深入分析,找出性能瓶頸,如死鎖,fgc頻繁等。
2.通過hadoop web管理界面,自帶背壓監測圖及metrics監測指標圖可以查看任務運行現狀。條件充許情況下,建議利用Prometheus工具對metrics進行實時監測。
3.結合日志,分階段分析任務邏輯存在的性能瓶頸,然后通過一系列的優化措施(拆分/合并/過濾/異步)提高任務處理性能。
開源組件架構的最優化使用是基于海量業務場景不斷迭代進化而來,這個過程對自己對團隊都是一種歷練和精進。在問題得到最終解決,性能得到大幅提升,業務流暢運行后,有種發自內心的會當凌絕頂,一覽眾山小的成就感。最后感謝那些通宵排查問題的夜晚和我一起并肩作戰的兄弟們。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/130230.html
摘要:基于在阿里巴巴搭建的平臺于年正式上線,并從阿里巴巴的搜索和推薦這兩大場景開始實現。在經過一番調研之后,阿里巴巴實時計算認為是一個非常適合的選擇。接下來,我們聊聊阿里巴巴在層對又大刀闊斧地進行了哪些改進。 Apache Flink 概述 Apache Flink(以下簡稱Flink)是誕生于歐洲的一個大數據研究項目,原名StratoSphere。該項目是柏林工業大學的一個研究性項目,早期...
摘要:另外,將機制發揚光大,對有著非常好的支持。系統也注意到并討論了和的問題。總結本文分享了四本相關的書籍和一份領域相關的論文列表篇,涉及的設計,實現,故障恢復,彈性擴展等各方面。 前言 之前也分享了不少自己的文章,但是對于 Flink 來說,還是有不少新入門的朋友,這里給大家分享點 Flink 相關的資料(國外數據 pdf 和流處理相關的 Paper),期望可以幫你更好的理解 Flink。...
摘要:第三個就是比較重點的內容,在有贊的實踐。第四部分是將實時計算化,界面化的一些實踐。二有贊實時平臺架構有贊的實時平臺架構呢有幾個主要的組成部分。實時平臺提供了集群管理,項目管理,任務管理和報警監控的功能。。 一、前言 這篇主要由五個部分來組成: 首先是有贊的實時平臺架構。 其次是在調研階段我們為什么選擇了 Flink。在這個部分,主要是 Flink 與 Spark 的 structure...
摘要:第三個就是比較重點的內容,在有贊的實踐。第四部分是將實時計算化,界面化的一些實踐。二有贊實時平臺架構有贊的實時平臺架構呢有幾個主要的組成部分。實時平臺提供了集群管理,項目管理,任務管理和報警監控的功能。。 一、前言 這篇主要由五個部分來組成: 首先是有贊的實時平臺架構。 其次是在調研階段我們為什么選擇了 Flink。在這個部分,主要是 Flink 與 Spark 的 structure...
閱讀 1346·2023-01-11 13:20
閱讀 1684·2023-01-11 13:20
閱讀 1132·2023-01-11 13:20
閱讀 1858·2023-01-11 13:20
閱讀 4099·2023-01-11 13:20
閱讀 2704·2023-01-11 13:20
閱讀 1385·2023-01-11 13:20
閱讀 3594·2023-01-11 13:20