摘要:如果到齊了,就可以開始統計出這個時間窗口內的指標了。這種里會遇到兩個難題多個流的速度不一樣,如何判斷一個時間窗口內的都到齊了。
在本文發出之后不久,老外就寫了一篇類似內容的。人家比我寫得好,推薦大家讀這篇
http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101....
流式統計聽著挺容易的一個事情,說到底不就是數數嘛,每個告警系統里基本上都有一個簡單的流式統計模塊。但是當時基于storm做的時候,這幾個問題還是困擾了我很長時間的。沒有用過spark streaming/flink,不知道下面這些問題在spark streaming/flink里是不是都已經解決得很好了。
時間窗口切分問題做流式統計首要的問題是把一個時間窗口內的數據統計到一起。問題是,什么是時間窗口?有兩種選擇
日志時間(event timestamp)
墻上時間(wall clock)
最簡單的時間窗口統計的是基于“墻上時間”的,每過1分鐘就切分出一個新窗口出來。比如statsd,它的窗口切分就是這樣的。這種基于“墻上時間”的統計有一個非常嚴重的問題是不能回放數據流。當數據流是實時產生的時候,“墻上時間”的一分鐘也就只會有一分鐘的event被產生出來。但是如果統計的數據流是基于歷史event的,那么一分鐘可以產生消費的event數量只受限于數據處理速度。另外event在分布式采集的時候也遇到有快有慢的問題,一分鐘內產生的event未必可以在一分鐘內精確到達統計端,這樣就會因為采集的延遲波動影響統計數據的準確性。實際上基于“墻上時間”統計需要
collection latency = wall clock - event timestamp
基于“墻上時間”的統計需要采集延遲非常小,波動也很小才可以工作良好。大部分時候更現實的選擇是需要基于“日志時間”來進行窗口統計的。
使用“日志時間”就會引入數據亂序的問題,對于一個實時event stream流,其每個event的timestamp未必是嚴格遞增的。這種亂序有兩種因素引入:
event產生的機器的時鐘不完全同步(NTP有100ms左右的不同步)
event從采集到到達kafka的速度不均衡(不同的網絡線路有快有慢)
我們希望的流式統計是這樣的:
但是實際上數據只是基本有序的,也就是在時間窗口的邊緣會有一些event需要跨到另外一個窗口去:
最簡單的分發event到時間窗口代碼是這樣的
window index = event timestamp / window size
對1分鐘的時間窗口 window size 就是60,timestamp除以60為相同window index的event就是在同一個時間窗口的。問題的關鍵是,什么時候我可以確信這個時間窗口內的event都已經到齊了。如果到齊了,就可以開始統計出這個時間窗口內的指標了。然后突然又有一個落后于大伙的event落到這個已經被計算過的時間窗口如何處理?
對于大部分統計而言,一個時間窗口統計出多條結果存入db并不是什么大的問題,從db里查詢的時候把多條結果再合并就可以了。
對于一些類型的統計(非monad),比如平均值,時間窗口內的event分為兩批統計出來的結果是沒有辦法被再次匯總的。
實時類的計算對時間敏感,來晚了的數據就沒有意義了。比如告警,一個時間窗過去了就沒有必要再理會這個時間窗口了。
所以對于來晚了的數據就兩種策略:要么再統計一條結果出來,要么直接丟棄。要確定什么時候一個時間窗口內的event已經到齊了,有幾種策略:
sleep 等待一段時間(墻上時間)
event timestamp超過了時間窗口一點點不關閉當前時間窗口,而是要等event timestamp大幅超出時間窗口的時候才關閉窗口。比如12:05:30秒的event到了才關閉12:04:00 ~ 12:05:00的時間窗口。
一兩個event超出了時間窗口不關閉,只有當“大量”的event超出時間窗口才關閉。比如1個event超過12:05分不關閉,如果有100個event超過了12:05的時間窗口就關閉它。
三種策略其實都是“等”,只是等的依據不同。實踐中,第二種策略也就是根據“日志時間”的等待是最容易實現的。如果對于過期的event不是丟棄,而是要再次統計一條結果出來,那么過期的窗口要重新打開,又要經過一輪“等待”去判斷這個過去的窗口什么時候再被關閉。
在spark上已經有人做類似的嘗試了:Building Big Data Operational Intelligence platform with Apache Spark - Eric Carr (Guavus)
多流合并的問題一個kafka的partition就是一個流,一個kafka topic的多個partition就是多個獨立的流(offset彼此獨立增長)。多個kafka topic顯然是多個獨立的流。流式統計經常需要把多個流合并統計到一起。這種里會遇到兩個難題
多個流的速度不一樣,如何判斷一個時間窗口內的event都到齊了。如果按照前面的等待策略,可能處理一個流內部的基本有序局部亂序是有效的,但是對于多個流速差異很大的流就無能為力了。一個很快的流很容易把時間窗口往后推得很遠,把其他流遠遠跑到后面。
流速不均不能靠下游兜著,下游的內存是有限的。根本上是需要一種“背壓”的機制,讓下游通知流速過快的上游,你慢點產生新的event,等等其他人。
舉一個具體的例子:
spout 1 emit 12:05 spout 1 emit 12:06 spout 2 emit 12:04 spout 1 emit 12:07 spout 2 emit 12:05 // this is when 12:05 is ready
要想知道12:05這個時間窗的event都到齊了,首先要知道相關的流有幾個(在這例子里是spout1和spout2兩個流),然后要知道什么時候spout1產生了12:05的數據,什么時候spout2產生了12:05的數據,最后才可以判斷出來12:05的數據是到齊了的。在某個地方要存一份這樣的流速的數據去跟蹤,在窗口內數據到齊之后發出信號讓相關的下游往前推動時間窗口。考慮到一個分布式的系統,這個跟蹤要放在哪個地方做,怎么去通知所有的相關方。
極端一些的例子
spout 1 emit 13:05 spout 2 emit 12:31 spout 1 emit 13:06 spout 2 emit 12:32
多個流的流速可能會相差到半個小時以上。考慮到如果用歷史的數據匯入到實時統計系統里時,很容易因為計算速度不同導致不同節點之間的處理進度不一致。要計算出正確的結果,下游需要緩存這些差異的半個小時內的所有數據,這樣很容易爆內存。但是上游如何感知到下游要處理不過來了呢?多個上游之間又如何感知彼此之間的速度差異呢?又有誰來仲裁誰應該流慢一些呢?
一個相對簡單的做法是在整個流式統計的分布式系統里引入一個coordinator的角色。它負責跟蹤不同流的流速,在時間窗口的數據到齊之后通知下游flush,在一些上游流速過快的時候(比如最快的流相比最慢的流差距大于10分鐘)由coordinator發送backoff指令給流速過快的上游,然后接到指令之后sleep一段時間。一段基本堪用的跟蹤不同流流速的代碼:https://gist.github.com/taowen/2d0b3bcc0a4bfaecd404
數據一致性問題低檔一些的說法是這樣的。假設統計出來的曲線是這樣的:
如果中間,比如08:35左右重啟了統計程序,那么曲線能否還是連續的?
高檔一些的說法是,可以把流式統計理解為主數據庫與分析數據庫之間通過kafka消息隊列進行異步同步。主數據庫與分析數據庫之間應該保持eventual consistency。
要保證數據不重不丟,就要做到生產到kafka的時候,在主數據庫和kafka消息隊列之間保持一個事務一致性。舉一個簡單的例子:
用戶下了一個訂單 主數據庫里插入了一條訂單的數據記錄 kafka消息隊列里多了一條OrderPlaced的event
這個流程中一個問題就是,主數據插入成功了之后,可能往kafka消息隊列里enqueue event失敗。如果把這個操作反過來
用戶下了一個訂單 kafka消息隊列里多了一條OrderPlaced的event 主數據庫里插入了一條訂單的數據記錄
又可能出現kafka消息隊列里enqueue了,但是主數據庫插入失敗的情況。就kafka隊列的目前的設計而言,對這個問題是無解的。一旦enqueue的event,除非過期是無法刪除的。
在消費端,當我們從kafka里取出數據之后,去更新分析數據庫的過程也要保持一個分布式事務的一致性。
取出下一條OrderPlaced evnet(指向的offset+1) 當前時間窗的統計值+1 重復以上過程,直到窗口被關閉,數據寫入到分析數據庫
kafka的數據是可以重放的,只要指定offset就可以把這個offset以及之后的數據讀取出來。所謂消費的過程就是把客戶端保存的offset值加1的過程。問題是,這個offset指針保存在哪里的問題。常規的做法是把消費的offset保存到zookeeper里。那么這就有一個分布式的一致性問題了,zookeeper里offset+1了,但是分析數據庫并沒有實際把值統計進去。考慮到統計一般不是每條輸入的event都會更新分析數據庫,而是把中間狀態緩存在內存中的。那么就有可能消費了成千上萬個event,狀態都在內存里,然后“啪”的一下機器掉電了。如果每次讀取event都移動offset的話,這些event就丟掉了。如果不是每次都移動offset的話,又可能在重啟的時候導致重復統計。
搞統計的人在乎這么一兩條數據嗎?其實大部分人是不在乎的。不少團隊壓根連offset都不保存,每次開始統計直接seek到隊列的尾部開始。實時計算嘛,實時最重要了。準確計算?重放歷史?這個讓hadoop搞定就好了。但是如果就是要較這個真呢?或者我們不追求嚴格的強一致,只要求重啟之后曲線不斷開那么難看就好了。
別的流式計算框架不清楚,storm的ack機制是毫無幫助的。
storm的ack機制是基于每個message來做的。這就要求如果做一個每分鐘100萬個event的統計,一分鐘就要跟蹤100萬個message id。就算是100萬個int,也是一筆相當可觀的內存開銷。要知道,從kafka里讀出來的event都是順序offset的,處理也是順序,只要記錄一個offset就可以跟蹤整個流的消費進度了。1個int,相比100萬個int,storm的per message ack的機制對于流式處理的進度跟蹤來說,沒有利用消息處理的有序性(storm根本上假設message之間是彼此獨立處理的),而變得效率低下。
要做到強一致是很困難的,它需要把
更新保存的offset
更新插入分析數據庫
變成一個原子事務來完成。大部分分析數據庫都沒有原子性事務的能力,連插入三條數據都不能保持同時變為可見,且不說還要用它來記錄offset了。考慮到kafka在生產端都無法提供分布式事務,event從生產出來就不是完全一致的(多產生了或者少產生了),真正高一致的計費場景還是用其他的技術棧。所以值得解決的問題是,如何在重啟之后,把之前重啟的時候丟棄掉的內存狀態重新恢復出來,使得統計出來的曲線仍然是連續的。
解決思路有三點:
上游備份策略:重啟的時候重放kafka的歷史數據,恢復內存狀態
中間狀態持久化:把統計的狀態放到外部的持久的數據庫里,不放內存里
同時跑兩份:同時有兩個完全一樣的統計任務,重啟一個,另外一個還能正常運行。
內存狀態管理的問題做流式統計的有兩種做法:
依賴于外部存儲管理狀態:比如沒收到一個event,就往redis里發incr增1
純內存統計:在內存里設置一個counter,每收到一個event就+1
基于外部存儲會把整個壓力全部壓到數據庫上。一般來說流式統計的流速是很快的,遠大于普通的關系型數據庫,甚至可能會超過單臺redis的承載。這就使得基于純內存的統計非常有吸引力。大部分的時候都是在更新時間窗口內的內存狀態,只有當時間窗口關閉的時候才把數據刷到分析數據庫里去。刷數據出去的同時記錄一下當前流消費到的位置(offset)。
這種純內存的狀態相對來說容易管理一些。計算直接是基于這個內存狀態做的。如果重啟丟失了,重放一段歷史數據就可以重建出來。
但是內存的問題是它總是不夠用的。當統計的維度組合特別多的時候,比如其中某個字段是用戶的id,那么很快這個內存狀態就會超過單機的內存上限。這種情況有兩種辦法:
利用partition把輸入的input分割,一個流分成多個流,每個統計程序需要跟蹤的維度組合就變少了
把存儲移到外邊去
簡單地在流式統計程序里開關數據庫連接是可以解決這個容量問題的:
但是這種對外部數據庫使用不小心就會導致兩個問題:
處理速度慢。不用一些批量的操作,數據庫操作很快就會變成瓶頸
數據庫的狀態不一直。內存的狀態重啟了就丟失了,外部的狀態重啟之后不丟失。重放數據流就可能導致數據的重復統計
但是這種把窗口統計的中間狀態落地的好處也是顯而易見的。重啟之后不用通過重算來恢復內存狀態。如果一個時間窗口有24小時,重算24小時的歷史數據可能是很昂貴的操作。
版本跟蹤,批量等都不應該是具體的統計邏輯的實現者的責任。理論上框架應該負責把冷熱數據分離,自動把冷數據下沉到外部的存儲,以把本地內存空閑出來。同時每次小批量處理event的時候都要記錄處理的offset,而不是要等到窗口關閉等待時候。
數據庫狀態和內存狀態要變成一個緊密結合的整體。可以把兩者的關系想象成操作系統的filesystem page cache。用mmap把狀態映射到內存里,由框架負責什么時候把內存里的變更持久化到外部存儲里。
總結基于storm做流式統計缺乏對以下四個基本問題的成熟解決方案。其trident框架可能可以提供一些答案,但是實踐中好像使用的人并不多,資料也太少了。可以比較自信的說,不僅僅是storm,對于大多數流式計算平臺都是如此。
時間窗口切分的問題
多流合并的問題
數據一致性問題(重啟之后曲線斷開的問題)
內存狀態管理問題
這些問題要好好解決,還是需要一番功夫的。新一代的流式計算框架比如spark streaming/flink應該有很多改進。即便底層框架提供了支持,從這四個角度去考察一下它們是如何支持的也是非常有裨益的事情。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/17474.html
摘要:為了適應流式渲染技術對網絡高吞吐零緩沖的特點,可能需要對現有網絡協議進行改造主要針對。視頻基于的,視頻在客戶端的播放會相對較為容易。輸入信號各自隔離處理即可,瀏覽器端對常見的輸入信號幾乎都有支持。 本文首發于我的博客(點此查看),歡迎關注。 流式渲染技術,不同于傳統意義上前端領域的服務端渲染(即 SSR),指的是云端性能強勁的機器進行畫面渲染,將渲染完成的數據傳送至客戶端,客戶端只負責...
閱讀 819·2023-04-25 19:40
閱讀 3416·2023-04-25 17:41
閱讀 2998·2021-11-11 11:01
閱讀 2604·2019-08-30 15:55
閱讀 3222·2019-08-30 15:44
閱讀 1352·2019-08-29 14:07
閱讀 481·2019-08-29 11:23
閱讀 1320·2019-08-27 10:54