摘要:當我們正準備做前期調研和設計的時候,主辦方把唐長老拉去做現場導師,參賽規則規定導師不能下場比賽,囧,于是就這樣被被動放了鴿子。川總早早來到現場。
本文作者是來自 TiBoys 隊的崔秋同學,他們的項目 TBSSQL 在 TiDB Hackathon 2018 中獲得了一等獎。序TiDB Batch and Streaming SQL(簡稱 TBSSQL)擴展了 TiDB 的 SQL 引擎,支持用戶以類似 StreamSQL 的語法將 Kafka、Pulsar 等外部數據源以流式表的方式接入 TiDB。通過簡單的 SQL 語句,用戶可以實現對流式數據的過濾,流式表與普通表的 Join(比如流式事實表與多個普通維度表),甚至通過 CREATE TABLE AS SELECT 語法將處理過的流式數據寫入普通表中。此外,針對流式數據的時間屬性,我們實現了基于時間窗口的聚合/排序算子,使得我們可以對流式數據進行時間維度的聚合/排序。
算起來這應該是第三次參加的 Hackathon 了,第一次參加的時候還是在小西天的豌豆莢,和東旭一起,做跨平臺數據傳輸的工具,兩天一夜;第二次和奇叔一起在 3W 咖啡,又是兩天一夜;這次在自己家舉辦 Hackathon 比賽,下定決心一定要佛性一些,本著能抱大腿就不單干的心態,迅速決定拉唐長老(唐劉)下水。接下來就計劃著折騰點啥,因為我們兩個前端都不怎么樣,所以只能硬核一些,于是拍了兩個方案。
方案一:之前跟唐長老合作過很長一段時間,我們兩個對于測試質量之類的事情也都非常關注,所以想著能不能在 Chaos 系統上做一些文章,把一些前沿的測試理論和經驗方法結合到系統里面來,做一套通用的分布式系統測試框架,就像 Jepsen 那樣,用這套系統去測試和驗證主流的開源分布式項目。
方案二:越接近于業務實時性的數據處理越有價值,不管是 Kafka/KSQL,Flink/Spark Streaming 都是在向著實時流計算領域方向進行未來的探索。TiDB 雖然已經能夠支持類 Real Time OLAP 的場景,但是對于更實時的流式數據處理方面還沒有合適的解決方案,不過 TiDB 具有非常好的 Scale 能力,天然的能存儲海量的數據庫表數據,所以在 Streaming Event 和 Table 關聯的場景下具有非常明顯的優勢。如果在 TiDB 上能夠實現一個 Streaming SQL 的引擎,實現 Batch/Streaming 的計算融合,那將會是一件非常有意思的事情。
因為打 Hackathon 比賽主要是希望折騰一些新的東西,所以我們兩個簡單討論完了之后還是傾向于方案二,當然做不做的出來另說。
當我們正準備做前期調研和設計的時候,Hackathon 主辦方把唐長老拉去做現場導師,參賽規則規定導師不能下場比賽,囧,于是就這樣被被動放了鴿子。好在后來遇到了同樣被霸哥(韓飛)當導師而放鴿子的川總(杜川),川總對于 Streaming SQL 非常感興趣,于是難兄難弟一拍即合,迅速決定抱團取暖。隨后,Robot 又介紹了同樣還沒有組隊的社區小伙伴 GZY(高志遠),這樣算是湊齊了三個人,但是一想到沒有前端肯定搞不定,于是就拜托娘家人(Dashbase)的交際小王子 WPH(王鵬翰)出馬,幫助去召喚一個靠譜的前端小伙伴,后來交際未果直接把自己賣進了隊伍,這樣終于湊齊了四后端,不,應該是三后端 + 一偽前端的組合。
因為馬上要準備提交項目和團隊名稱,大家都一致覺得方案二非常有意思,所以就選定了更加儒雅的 TBSSQL(TiDB Batch and Streaming SQL)作為項目名稱,TSBSQL 遺憾落選。在團隊名稱方面,打醬油老男孩 / Scboy / TiStream / 養生 Hackathon / 佛系 Hackathon 都因為不夠符合氣質被遺憾淘汰,最后代表更有青春氣息的 TiBoys 入選(跟著我左手右手一個慢動作,逃……
前期準備所謂 “三軍未動, 糧草先行”,既然已經報名了,還是要稍作準備,雖然已經確定了大的方向,但是具體的落地方案還沒有細化,而且人員的分工也不是太明確。又經過一輪簡單的討論之后,明確了大家的職責方向,我這邊主要負責項目整體設計,進度管理以及和 TiDB 核心相關的代碼,川總主要負責 TiDB 核心技術攻關,GZY 負責流數據源數據的采集部分,WPH 負責前端展現以及 Hackathon 當天的 Demo 演示,分工之后大家就開始分頭調研動工。
作為這兩年來基本沒怎么寫過代碼的退役型選手來說,心里還是非常沒底的,也不知道現在 TiDB 代碼結構和細節變成什么樣了,不求有功,但求別太拖后腿。
對于項目本身的典型應用場景,大家還是比較明確的,覺得這個方向是非常有意義的。
應用層系統:實時流事件和離線數據的關聯查詢,比如在線廣告推薦系統,在線推薦系統,在線搜索,以及實時反欺詐系統等。
內部數據系統:
實時數據采樣統計,比如內部監控系統;
時間窗口數據分析系統,比如實時的數據流數據分析(分析一段時間內異常的數據流量和系統指標),用于輔助做 AI Ops 相關的事情(比如根據數據流量做節點自動擴容/自動提供參數調優/異常流量和風險報告等等)。
業界 Streaming 相關的系統很多,前期我這邊快速地看了下能不能站在巨人的肩膀上做事情,有沒有可借鑒或者可借用的開源項目。
Apache Beam
本質上 Apache Beam 還是一個批處理和流處理融合的 SDK Model,用戶可以在應用層使用更簡單通用的函數接口實現業務的處理,如果使用 Beam 的話,還需要實現自定義的 Runner,因為 TiDB 本身主要的架構設計非常偏重于數據庫方向,內部并沒有特別明確的通用型計算引擎,所以現階段基本上沒有太大的可行性。當然也可以選擇用 Flink 作為 Runner 連接 TiDB 數據源,但是這就變成了 Flink&TiDB 的事情了,和 Beam 本身關系其實就不大了。
Apache Flink / Spark Streaming
Flink 是一個典型的流處理系統,批處理可以用流處理來模擬出來。
本身 Flink 也是支持 SQL 的,但是是一種嵌入式 SQL,也就是 SQL 和應用程序代碼寫在一起,這種做法的好處是可以直接和應用層進行整合,但是不好的地方在于,接口不是太清晰,有業務侵入性。阿里內部有一個增強版的 Flink 項目叫 Blink,在這個領域比較活躍。如果要實現批處理和流處理融合的話,需要內部定制和修改 Flink 的代碼,把 TiDB 作為數據源對接起來,還有可能需要把一些環境信息提交給 TiDB 以便得到更好的查詢結果,當然或許像 TiSpark 那樣,直接 Flink 對接 TiKV 的數據源應該也是可以的。因為本身團隊對于 Scala/Java 代碼不是很熟悉,而且 Flink 的模式會有一定的侵入性,所以就沒有在這方面進行更多的探索。同理,沒有選擇 Spark Streaming 也是類似的原因。當然有興趣的小伙伴可以嘗試下這個方向,也是非常有意思的。
Kafka SQL
因為 Kafka 本身只是一個 MQ,以后會向著流處理方向演進,但是目前并沒有實現批處理和流處理統一的潛力,所以更多的我們只是借鑒 Kafka SQL 的語法。目前 Streaming SQL 還沒有一個統一的標準 SQL,Kafka SQL 也只是一個 SQL 方言,支持的語法還比較簡單,但是非常實用,而且是偏交互式的,沒有業務侵入性。非常適合在 Hackathon 上做 Demo 演示,我們在項目實現中也是主要參考了 Kafka SQL 的定義,當然,Flink 和 Calcite 也有自己定義的 Streaming 語法,這里就不再討論了。
調研準備工作討論到這里基本上也就差不多了,于是我們開始各自備(hua)戰(shui),出差的出差,加班的加班,接客戶的接客戶,學 Golang 的學 Golang,在這種緊(fang)張(fei)無(zi)比(wo)的節奏中,迎來了 Hackathon 比賽的到來。
Hackathon 流水賬具體的技術實現方面都是比較硬核的東西,細節也比較多,扔在最后面寫,免的大家看到一半就點×了。Day 1 3:30 AM至于參加 Hackathon 的感受,因為不像龍哥那么文豪,也不像馬老師那么俏皮,而且本來讀書也不多,所以也只能喊一句“黑客馬拉松真是太好玩了”!
由于飛機晚點,川總這個點兒才輾轉到酒店。睡覺之前非常擔心一覺睡過頭,讓這趟 Hackathon 之旅還沒開始就結束了,沒想到躺下以后滿腦子都是技術細節,怎么都睡不著。漫漫長夜,無眠。
7:45 AM川總早早來到 Hackathon 現場。由于來太早,其他選手都還沒到,所以他提前刺探刺探敵情的計劃也泡湯了,只好在賽場瞎晃悠一番熟悉熟悉環境,順道跟大獎合了個影。
11:00 AM簡單的開幕式之后,Hackathon 正式開始。我們首先搞定的是 Streaming SQL 的語法定義以及 Parser 相關改動。這一部分在之前就經過比較詳細的在線討論了,所以現場只需要根據碰頭后統一的想法一頓敲敲敲就搞定了。快速搞定這一塊以后,我們就有了 SQL 語法層面的 Streaming 實現。當然此時 Streaming 也僅限于語法層面,Streaming 在 SQL 引擎層面對應的其實還是普通的TiDB Table。
接下來是 DDL 部分。這一塊我們已經想好了要復用 TiDB Table 的 Meta 結構 TableInfo ,因此主要工作就是按照 DDL源碼解析 依葫蘆畫瓢,難度也不大,以至于我們還有閑心糾結一下 SHOW TABLES 語法里到底要不要屏蔽掉 Streaming Table 的問題。
整體上來看上午的熱身活動還是進行的比較順利的,起碼 Streaming DDL 這塊沒有成為太大的問題。這里面有個插曲就是我在 Hackathon 之前下載編譯 TiDB,結果發現 TiDB 的 parser 已經用上時髦的 go module 了(也是好久好久沒看 TiDB 代碼),折騰好半天,不過好處就是 Hackathon 當天的時候改起來 parser 就比較輕車熟路了,所以賽前編譯一個 TiDB 還是非常有必要的。
15:30 PM隨著熱身的結束,馬上迎來了穩定的敲敲敲階段。川總簡單弄了一個 Mock 的 StreamReader 然后丟給了我,因為我之前寫 TiDB 的時候,時代比較遙遠,那時候都還在用周 sir 的 Datum,現在一看,為了提高內存效率和性能,已經換成了高大上的 Chunk,于是一個很常見的問題:如何用最正確的做法把一個傳過來的 Json 數據格式化成 Table Row 數據放到 Chunk 里面,讓徹底我懵逼了。
這里面倒不是技術的問題,主要是類型太多,如果枚舉所有類型,搞起來很麻煩,按道理應該有更輕快的辦法,但是翻了源代碼還是沒找到解決方案。這個時候果斷去求助現場導師,也順便去賽場溜(ci)達(tan)一(di)圈(qing)。隨便掃了一眼,驚呆了,龍哥他們竟然已經開始寫 PPT 了,之前知道龍哥他們強,但是沒想到強到這個地步,還讓不讓大家一塊歡快地玩耍了。同時,也了解到了不少非常有意思的項目,比如用機器學習方法去自動調節 TiDB 的調度參數,用 Lua 給 TiKV 添加 UDF 之類的,在 TiDB 上面實現異構數據庫的關聯查詢(簡直就是 F1 的大一統,而且聽小道消息,他們都已經把 Join 推到 PG 上面去了,然而我們還沒開始進入到核心開發流程),在 TiKV 上面實現時序數據庫和 Memcached 協議等等,甚至東旭都按捺不住自己 Hackathon 起來了(嘻嘻,可以學學我啊 ;D )。
本來還想去聊聊各個項目的具體實現方案,但是一想到自己挖了一堆坑還沒填,只能默默回去膜拜 TiNiuB 項目。看起來不能太佛系了,于是乎我趕緊召開了一次內部團隊 sync 的 catch up,明確下分工,川總開始死磕 TBSSQL 的核心邏輯 Streaming Aggregation 的實現,我這邊繼續搞不帶 Aggregation 的 Streaming SQL 的其他實現,GZY 已經部署起來了 Pulsar,開始準備 Mock 數據,WPH 輔助 GZY 同時也快速理解我們的 Demo 場景,著手設計實現前端展現。
18:00 PM我這邊和面帶慈父般欣慰笑容的老師(張建)進行了一些技術方案實現上的交流后,了解到目前社區小伙伴已經在搞 CREATE TABLE AS SELECT 的重要信息(后續證明此信息值大概一千塊 RMB)。
此時,在解決了之前的問題之后,TBSSQL 終于能跑通簡單的 SELECT 語句了。我們心里稍微有點底了,于是一鼓作氣,順路也實現了帶 Where 條件的 Stream Table 的 SELECT,以及 Stream Table 和 TiDB Table 的多表 Join,到這里,此時,按照分工,我這邊的主體工作除了 Streaming Position 的持久化支持以外,已經寫的差不多了,剩下就是去實現一些 Nice to have 的 DDL 的語法支持。川總這里首先要搞的是基于時間窗口的 Streaming Aggregation。按照我們的如意算盤,這里基本上可以復用 TiDB 現有的 Hash Aggregation 的計算邏輯,只需要加上窗口的處理就完事兒了。
不過實際下手的時候仔細一研究代碼,發現 Aggregation 這一塊代碼在川總疏于研究這一段時間已經被重構了一把,加上了一個并發執行的分支,看起來還挺復雜。于是一不做二不休,川總把 Hash Aggregation 的代碼拷了一份,刪除了并發執行的邏輯,在比較簡單的非并發分支加上窗口相關實現。不過這種方法意味著帶時間窗口的 Aggregation 得多帶帶出 Plan,Planner 上又得改一大圈。這一塊弄完以后,還沒來得及調試,就到吃晚飯的點兒了。
21:00 PM吃完晚飯,因為下午死磕的比較厲害,我和張建、川總出門去園區溜達了一圈。期間張建問我們搞得咋樣了,我望了一眼川總,語重心長地說主要成敗已經不在我了(后續證明這句語重心長至少也得值一千塊 RMB),川總果斷信心滿滿地說問題不大,一切盡在掌握之中。
沒想到這個 Flag 剛立起來還是溫的,就立馬被打臉了。問題出在吃飯前搞的聚合那塊(具體細節可以看下后面的坑系列),為了支持時間窗口,我們必須確保 Streaming 上的窗口列能透傳到聚合算子當中,為此我們屏蔽了優化器中窗口聚合上的列裁剪規則。可是實際運行當中,我們的修改并沒有生效???而此時,川總昨天一整晚沒睡覺的副作用開始顯現出來了,思路已經有點不太清醒了。于是我們把張建拖過來一起 debug。然后我這邊也把用 TiDB Global Variable 控制 Streaming Position 的功能實現了,并且和 GZY 這邊也實現了 Mock 數據。
之后,我也順路休息休息,畢竟川總這邊搞不定,我們這邊搞的再好也沒啥用。除了觀摩川總和張建手把手,不,肩并肩結對小黑屋編程之外,我也順便申請了部署 Kafka 聯調的機器。
23:00 PM我們這邊最核心的功能還沒突破,亮眼的 CREATE TABLE AS SELECT Streaming 也還沒影,其實中期進度還是偏慢了(或者說之前我設計實現的功能的工作量太大了,看起來今天晚上只能死磕了,囧)。我調試 Kafka 死活調不通,端口可以 Telnet 登陸,但是寫入和獲取數據的時候一直報超時錯誤,而且我這邊已經開始困上來了,有點扛不動了,后來在 Kafka 老司機 WPH 一起看了下配置參數,才發現 Advertise URL 設置成了本地地址,換成對外的 IP 就好了,當然為了簡單方便,我們設置了單 Partition 的 Topic,這樣 collector 的 Kafka 部分就搞的差不多了,剩下就是實現一個 http 的 restful api 來提供給 TiDB 的 StreamReader 讀取,整個連通工作就差不多了。
Day 2 00:00 AM這時候川總那邊也傳來了好消息,終于從 Streaming Aggregation 這個大坑里面爬出來了,后面也比較順利地搞定了時間窗口上的聚合這塊。此時時間已經到了 Hackathon 的第二天,不少其他項目的小伙伴已經收攤回家了。不過我們抱著能多做一個 Feature 是一個的心態,決定挑燈夜戰。首先,川總把 Sort Executor 改了一把以支持時間窗口,可能剛剛的踩坑經歷為我們攢了人品,Sort 上的改動竟然一次 AC 了。借著這股勁兒,我們又回頭優化了一把 SHOW CREATE STREAM 的輸出。
這里有個插曲就是為了近距離再回味和感受下之前的開發流程,我們特意在 TiDB 的 repo 里面開了一個 tiboys/hackathon 的分支,然后提交的時候用了標準的 Pull Request 的方式,點贊了才能 merge(后來想想打 Hackathon 不是太可取,沒什么用,還挺耽誤時間,不知道當時怎么想的),所以在 master 分支和 tiboys/hackathon 分支看的時候都沒有任何提交記錄。嘻嘻,估計龍哥也沒仔細看我們的 repo,所以其實在龍哥的激勵下,我們的效率還是可以的 :) 。
2:30 AMGZY 和 WPH 把今天安排的工作完成的差不多了,而且第二天還靠他們主要準備 Demo Show,就去睡覺了,川總也已經困得不行了,準備打烊睡覺。我和川總合計了一下,還差一個最重要的 Feature,抱著就試一把,不行就手工的心態,我們把社區的小伙伴王聰(bb7133)提的支持 CREATE TABLE AS SELECT 語法的 PR 合到了我們的分支,沖突竟然不是太多,然后稍微改了一下來支持 Streaming,結果一運行奇跡般地發現竟然能夠運行,RP 全面爆發了,于是我們就近乎免費地增加了一個 Feature。改完這個地方,川總實在堅持不住了,就回去睡了。我這邊的 http restful api 也搞的差不多了,準備聯調一把,StreamReader 通過 http client 從 collector 讀數據,collector 通過 kafka consumer 從 kafka broker 獲取數據,結果獲取的 Json 數據序列化成 TiDB 自定義的 Time 類型老是出問題,于是我又花了一些時間給 Time 增加了 Marshall 和 Unmarshal 的格式化支持,到這里基本上可以 work 了,看了看時間,凌晨四點半,我也準備去睡了。期間好幾次看到霸哥(韓飛)凌晨還在一直幫小(tian)伙(zi)伴(ji)查(wa)問(de)題(keng),其實霸哥認真的時候還是非常靠譜的。
7:30 AM這個時候人陸陸續續地來了,我這邊也進入了打醬油的角色,年紀大了確實剛不動了,吃了早餐之后,開始準備思考接下來的分工。因為大家都是臨時組隊,到了 Hackathon 才碰面,基本上沒有太多磨合,而且普遍第二天狀態都不大好。雖然大家都很努力,但是在我之前設計的宏大項目面前,還是感覺人力不太夠,所以早上 10 點我們開了第二次 sync 的 catch up,討論接下來的安排。我去負責更新代碼和 GitHub 的 Readme,川總最后再簡單對代碼掃尾,順便和 GZY 去錄屏(羅伯特小姐姐介紹的不翻車經驗),WPH 準備畫圖和 PPT,因為時間有限,前端展現部分打算從賣家秀直接轉到買家秀。11 點敲定代碼完全封板,然后安心準備 PPT 和下午的 Demo。
14:00 PM因為抽簽抽的比較靠后,主要事情在 WPH 這邊,我和川總基本上也沒什么大事了,順手搞了幾幅圖,然后跟馬老師還有其他項目的小伙伴們開始八卦聊天。因為正好周末,家里妹子買東西順便過來慰問了下。下午主要聽了各個 Team 的介紹,欣賞到了極盡浮夸的 LOGO 動畫,Get 到了有困難找 Big Brother 的新技能,學習和了解了很有意思的 Idea,真心覺得這屆 Hackathon 做的非常值得回憶。
從最后的現場展示情況來看,因為 TBSSQL 內容比較多,真的展示下來,感覺 6 分鐘時間還是太趕,好在 WPH Demo 的還是非常順利的,把我們做的事情都展示出來了。因為砍掉了一些前端展現的部分(這塊我們也確實不怎么擅長),其實對于 Hackathon 項目是非常吃虧的,不過有一點比較欣慰,就像某光頭大佬說的,評委們都是懂技術的。因為實現完整性方面能做的也都搞差不多了,打的雖然很累但是也很開心,對于結果也就不怎么糾結了。
因為川總晚上的飛機,小伙伴們簡單溝通了幾句,一致同意去園區找個地吃個晚飯,于是大家拉上霸哥去了“頭一號”,也是第一次吃了大油條,中間小伙伴們各種黑誰誰誰寫的 bug 巴拉巴拉的,后來看手機群里有人 @ 我說拿獎了。
其實很多項目各方面綜合實力都不錯,可以說是各有特色,很難說的上哪個項目有絕對的優勢。我們之前有討論過,TBSSQL 有獲獎的贏面,畢竟從完整性,實用性和生態方面都是有潛質的,但是能獲得大家最高的認可還是小意外的,特別感謝各位技術大佬們,也特別感謝幫助我們領獎的滿分羅伯特小姐姐。
最后大家補了一張合照,算是為這次 Hackathon 畫下一個句號。
至此,基本上 Hackathon 的流水賬就記錄完了,整個項目地址在 https://github.com/qiuyesuifeng/tidb 歡迎大家關注和討論。
選讀:技術實現TLDR: 文章很長,挑感興趣的部分看看就可以了。
在前期分析和準備之后,基本上就只有在 TiDB 上做 SQL Streaming 引擎一條路可選了,細化了下要實現的功能以及簡單的系統架構,感覺工作量還是非常大的。
下面簡單介紹下系統架構和各個模塊的功能:
在數據源采集部分(collector),我們計劃選取幾種典型的數據源作為適配支持。
Kafka
最流行的開源 MQ 系統,很多 Streaming 系統對接的都是 Kafka。
Pulsar
流行的開源 MQ 系統,目前比較火爆,有趕超 Kafka 的勢頭。
Binlog
支持 MySQL/TiDB Binlog 處理,相當于是 MySQL Trigger 功能的升級加強版了。我們對之前的 MySQL -> TiDB 的數據同步工具 Syncer 也比較熟悉,所以這塊工作量應該也不大。
Log
常見的 Log 日志,這個就沒什么好解釋的了。
為了方便 Demo 和協作,collector 除了適配不同的數據源,還會提供一個 restful api 的接口,這樣 TBSSQL 就可以通過 pull 的方式一直獲取 streaming 的數據。因為 collector 主要是具體的工程實現,所以就不在這里細節展開了,感興趣的話,可以參考下 相關代碼。
要在 TiDB 中實現 Streaming 的功能即 TBSSQL,就需要在 TiDB 內部深入定制和修改 TiDB 的核心代碼。
Streaming 有兩個比較本質的特征:
Streaming 具有流式特性,也就是說,其數據可以是一直增長,無窮無盡的。而在 Batch 系統(暫時把 MySQL/TIDB 這種數據在一定時間內相對穩定的系統簡稱 Batch 系統,下面都會沿用這種說法)當中,每個 SQL 的輸入數據集是固定,靜態的。
Streaming 具有時序特性。每一條數據都有其內在的時間屬性(比如說事件發生時間等),數據之間有先后順序關系。而在 Batch 系統當中,一個表中的數據在時間維度上是無序的。
因此,要在 TiDB SQL 引擎上支持 Streaming SQL,所涉及到的算子都需要根據 Streaming 的這兩個特點做修改。以聚合函數(Aggregation)為例,按照 SQL 語義,聚合算子的實現應該分成兩步:首先是 Grouping, 即對輸入按照聚合列進行分組;然后是 Execute, 即在各個分組上應用聚合函數進行計算,如下圖所示。
對于 Streaming,因為其輸入可以是無盡的,Grouping 這個階段永遠不可能結束,所以按照老套路,聚合計算就沒法做了。這時,就要根據 Streaming 的時序特性對 Streaming 數據進行分組。每一個分組被稱為一個 Time Window(時間窗口)。就拿最簡單的 Tumbling Window 來說,可以按照固定的時間間隔把 Streaming 輸入切分成一個個相互無交集的窗口,然后在每一個窗口上就可以按照之前的方式進行聚合了。
聚合算子只是一個比較簡單的例子,因為其只涉及一路輸入。如果要修改多路輸入的算子(比如說 Join 多個 Streaming),改動更復雜。此外,時間窗口的類型也是多種多樣,剛剛例子中的 Tumbling Window 只是基礎款,還有復雜一點的 Hopping Window 以及更復雜的 Sliding Window。在 Hackathon 的有限時間內,我們既要考慮實現難度,又要突出 Batch / Streaming 融合處理的特點,因此在技術上我們做出如下抉擇:
時間窗口只做最基本的 Tumbling Window。
實現基于時間窗口的 Aggregation 和 Sort 作為經典流式算子的代表。
實現單 Streaming Join 多 Batch Table 作為 Batch / Streaming 融合的示例, 多個 Streaming Join 太復雜,因為時間有限就先不做了。
支持 Streaming 處理結果寫入 Batch Table(TiDB Table)這種常見但是非常實用的功能。也就是說要支持 CREATE TABLE AS SELECT xxx FROM streaming 的類似語法。
此外,既然是要支持 Streaming SQL,選擇合適的 SQL 語法也是必要的,需要在 Parser 和 DDL 部分做相應的修改。單整理下,我們的 Feature List 如下圖所示:
下面具體聊聊我們實現方案中的一些關鍵選擇。
Streaming SQL 語法
Streaming SQL 語法的核心是時間窗口的定義,Time Window 和一般 SQL 中的 Window Function 其實語義上是有區別的。在 Streaming SQL 中,Time Window 主要作用是為后續的 SQL 算子限定輸入的范圍,而在一般的 SQL 中,Window Funtion 本身就是一個 SQL 算子,里面的 Window 其實起到一個 Partition 的作用。
在純 Streaming 系統當中,這種語義的差別影響不大,反而還會因為語法的一致性降低用戶的學習成本,但是在 TBSSQL 這種 Batch / Streaming 混合場景下,同一套語法支持兩種語義,會對用戶的使用造成一定困擾,特別是在 TiDB 已經被眾多用戶應用到生產環境這種背景下,這種語義上的差別一定要體現在語法的差異上。
Sreaming DDL
DDL 這一塊實現難度不大,只要照著 DDL源碼解析 依葫蘆畫瓢就行。這里值得一提的是在 Meta 層,我們直接(偷懶)復用了 TableInfo 結構(加了判斷是否為 Streaming 的 Flag 和一些表示 Streaming 屬性的字段)來表示 Streaming Table。這個選擇主要是從實現難度上考慮的,畢竟復用現有的結構是最快最安全的。但是從設計思想上看,這個決定其實也暗示了在 TBSSQL 當中,Streaming 是 Table 的一種特殊形式,而不是一個獨立的概念。理解這一點很重要,因為這是一些其他設計的依據。比如按照以上設定,那么從語義上講,在同一個 DB 下 Streaming 和普通 Table 就不能重名,反之的話這種重名就是可以接受的。
StreamReader
這一塊主要有兩個部分,一個是適配不同的數據源(collector),另一個是將 Streaming 數據源引入 TiDB 計算引擎(StreamReader)。collector 這部分上面已經介紹過了,這里就不再過多介紹了。StreamReader 這一塊,主要要修改由 LogicalPlan 生成 PhysicalPlan(具體代碼),以及由 PhysicalPlan 生成 Executor Operator Tree 的過程(具體代碼)。StreamReader 的 Open 方法中,會利用 Meta 中的各種元信息來初始化與 collector 之間的連接,然后在 Next 方法中通過 Pull 的方式不斷拉取數據。
對時間窗口的處理
前面我們提到,時間窗口是 Streaming 系統中的核心概念。那么這里就有一個重要的問題,Time Window 中的 Time 如何界定?如何判斷什么時候應該切換 Window?最容易想到,也是最簡單粗暴的方式,就是按照系統的當前時間來進行切割。這種方式問題很大,因為:
數據從生成到被 TBSSQL 系統接收到,肯定會有一定的延遲,而且這個延遲時間是沒有辦法精確預估的。因此在用戶實際場景中,除非是要測量收發延遲,這個系統時間對用戶沒有太大意義。
考慮到算子并發執行的可能性(雖然還沒有實現),不同機器的系統時間可能會有些許偏差,這個偏差對于 Window 操作來說可能導致致命的誤差,也會導致結果的不精確(因為 Streaming 源的數據 Shuffle 到不同的處理節點上,系統時間的誤差可能不太一樣,可能會導致 Window 劃分的不一樣)。
因此,比較合理的方式是以 Streaming 中的某一 Timestamp 類型的列來切分窗口,這個值由用戶在應用層來指定。當然 Streaming 的 Schema 中可能有多個 Timestamp 列,這里可以要求用戶指定一個作為 Window 列。在實現 Demo 的時候,為了省事,我們直接限定了用戶 Schema 中只能有一個時間列,并且以該列作為 Window 列([具體代碼](https://github.com/qiuyesuifeng/tidb/blob/656971da00a3b1f81f5085aaa277159868fca223/ddl/table.go#L58))。當然這里帶來一個問題,就是 Streaming 的 Schema 中必須有 Timestamp 列,不然這里就沒法玩了。為此,我們在創建 Streaming 的 DDL 中加了 [檢查邏輯](https://github.com/qiuyesuifeng/tidb/blob/656971da00a3b1f81f5085aaa277159868fca223/ddl/ddl_api.go#L149),強制 Streaming 的 Schema 必須有 Timestamp 列(其實我們也沒想明白當初 Hackathon 為啥要寫的這么細,這些細節為后來通宵埋下了濃重的伏筆,只能理解為程序猿的本能,希望這些代碼大家看的時候吐槽少一些)。
Streaming DML
這里簡單 DML 指的就是不依賴時間窗口的 DML,比如說只帶 Selection 和 Projection 的SELECT 語句,或者單個 Streaming Join 多個 Table。因為不依賴時間窗口,支持這類 DML 實際上不需要對計算層做任何改動,只要接入 Streaming 數據源就可以了。
對于 Streaming Join Table(如上圖表示的是 Stream Join User&Ads 表的示意圖) 可以多說一點,如果不帶 Time Window,其實這里需要修改一下Planner。因為 Streaming 的流式特性,這里可能沒法獲取其完整輸入集,因此就沒法對 Streaming 的整個輸入進行排序,所以 Merge Join 算法這里就沒法使用了。同理,也無法基于 Streaming 的整個輸入建 Hash 表,因此在 Hash Join 算法當中也只能某個普通表 Build Hash Table。不過,在我們的 Demo 階段,輸入其實也是還是有限的,所以這里其實沒有做,倒也影響不大。
基于時間窗口的 Aggregation 和 Sort
在 TBSSQL 當中,我們實現了基于固定時間窗的 Hash Aggregation Operator 和 Sort Operator。這里比較正規的打法其實應該是實現一個獨立的 TimeWindow,各種基于時間窗口的 Operator 可以切換時間窗的邏輯,然后比如 Aggregation 和 Sort 這類算子只關心自己的計算邏輯。 但是這樣一來要對 Planner 做比較大的改動,想想看難度太大了,所以我們再一次采取了直(tou)接(lan)的方法,將時間窗口直接實現分別實現在 Aggregation 和 Sort 內部,這樣 Planner 這塊不用做傷筋動骨的改動,只要在各個分支邏輯上修修補補就可以了。
對于 Aggregation,我們還做了一些額外的修改。Aggregation 的輸出 Schema 語義上來說只包括聚合列和聚合算子的輸出列。但是在引入時間窗口的情況下,為了區分不同的窗口的聚合輸出,我們為聚合結果顯式加上了兩個 Timestamp 列 window_start 和 window_end, 來表示窗口的開始時間和結束時間。為了這次這個小特性,我們踩到一個大坑,費了不少勁,這個后面再仔細聊聊。
支持 Streaming 處理結果寫入 Batch Table
因為 TiDB 本身目前還暫時不支持 CREATE TABLE AS SELECT … 語法,而從頭開始搞的話工作量又太大,因此我們一度打算放棄這個 Feature。后面經過老司機提醒,我們發現社區的小伙伴王聰(bb7133)已經提了一個 PR 在做這個事情了。本著試一把的想法我們把這個 PR 合到我們的分支上一跑,結果竟然沒多少沖突,還真能 Work…...稍微有點問題的是如果 SELECT 子句中有帶時間窗口的聚合,輸出的結果不太對。仔細研究了一下發現,CREATE TABLE AS SELECT 語句中做 LogicalPlan 的路徑和直接執行 SELECT 時做 LogicalPlan 的入口不太一致,以至于對于前者,我們做 LogicalPlan 的時候遺漏了一些 Streaming 相關信息。這里稍作修改以后,也能夠正常運行了。
遇到的困難和坑本著前人采坑,后人盡量少踩的心態聊聊遇到的一些問題,主要的技術方案上面已經介紹的比較多了。限于篇幅,只描述遇到的最大的坑——消失的窗口列的故事。在做基于時間窗口的 Aggregation 的時候,我們要按照用戶指定的窗口列來切窗口。但是根據 列裁剪 規則,如果這個窗口列沒有被用作聚合列或者在聚合函數中被使用,那么這一列基本上會被優化器裁掉。這里的修改很簡單(我們以為),只需要在聚合的列裁剪邏輯中,如果發現聚合帶時間窗口,那么直接不做裁剪就完事兒了(代碼)。三下五除二修改完代碼,編譯完后一運行,結果……瞬間 Panic 了……Debug 一看,發現剛剛的修改沒有生效,Streaming 的窗口列還是被裁剪掉了,隨后我們又把 Planner 的主要流程看了一遍,還是沒有在其他地方發現有類似的裁剪邏輯。
這時我們意識到事情沒有這么簡單了,趕忙從導師團搬來老司機(還是上面那位)。我們一起用簡單粗暴的二分大法和 Print 大法,在生成 LogicalPlan,PhysicalPlan 和 Executor 前后將各個算子的 Schema 打印出來。結果發現,在 PhysicalPlan 完成后,窗口列還是存在的,也就是說我們的修改是生效了的,但是在生成 Executor 以后,這一列卻神秘消失了。所以一開始我們定位的思路就錯了,問題出在生成 Executor 的過程,但是我們一直在 Planner 中定位,當然找不到問題。
明確了方向以后,我們很快就發現了元兇。在 Build HashAggregation 的時候,有一個不起眼的函數調用 buildProjBelowAgg,這個函數悄悄地在 Aggregation 算子下面加塞了一個 Projection 算子,順道又做了一把列裁剪,最為頭疼的是,因為這個 Projection 算子是在生成 Executor 階段才塞進去的,而 EXPLAIN 語句是走不到這里來的,所以這個 Projection 算子在做 Explain 的時候是看不見的,想當于是一個隱形的算子,所以我們就這樣華麗麗地被坑了,于是就有了羅伯特小姐姐聽到的那句 “xxx,出來挨打” 的橋段。
今后的計劃從立項之初,我們就期望 TBSSQL 能夠作為一個正式的 Feature 投入生產環境。為此,在設計和實現過程中,如果能用比較優雅的解決方案,我們都盡量不 Hack。但是由于時間緊迫和能力有限,目前 TBSSQL 還是處于 Demo 的階段,離實現這個目標還有很長的路要走。
1. Streaming 數據源在對接 Streaming 數據源這塊,目前 TBSSQL 有兩個問題。首先,TBSSQL 默認輸入數據是按照窗口時間戳嚴格有序的。這一點在生產環境中并不一定成立(比如因為網絡原因,某一段數據出現了亂序)。為此,我們需要引入類似 Google MillWheel 系統中 Low Watermark 的機制來保證數據的有序性。其次,為了保證有序,目前 StreamReader 只能單線程運行。在實際生產環境當中,這里很可能因為數據消費速度趕不上上游數據生產速度,導致上游數據源的堆積,這又會反過來導致產生計算結果的時間和數據生產時間之間的延遲越來越大。為了解決這個問題,我們需要將 StreamReader 并行化,而這又要求基于時間窗口的計算算子能夠對多路數據進行歸并排序。另外,目前采用 TiDB Global Variable 來模擬 Streaming 的位置信息,其實更好地方案是設計用一個 TiDB Table 來記錄每個不同 StreamReader 讀取到的數據位置,這種做法更標準。
2. Planner在 Planner 這塊,從前面的方案介紹可以看出,Streaming 的流式特性和時序特性決定了 Streaming SQL 的優化方式和一般 SQL 有所不同。目前 TBSSQL 的實現方式是在現有 Planner 的執行路徑上加上一系列針對 Streaming SQL 的特殊分支。這種做法很不優雅,既難以理解,也難以擴展。目前,TiDB 正在基于 Cascade 重構 Planner 架構,我們希望今后 Streaming SQL 的相關優化也基于新的 Planner 框架來完成。
3. 時間窗口目前,TBSSQL 只實現了最簡單的固定窗口。在固定窗口上,Aggregation、Sort 等算子很大程度能復用現有邏輯。但是在滑動窗口上,Aggregation、Sort 的計算方式和在 Batch Table 上的計算方式會完全不一樣。今后,我們希望 TBSSQL 能夠支持完善對各種時間窗口類型的支持。
4. 多 Streaming 處理目前 TBSSQL 只能處理單路 Streaming 輸入,比如單個 Streaming 的聚合,排序,以及單個Streaming 和多個 Table 之間的 Join。多個 Streaming 之間的 Join 因為涉及多個 Streaming 窗口的對齊,目前 TBSSQL 暫不支持,所以 TBSSQL 目前并不是一個完整的 Streaming SQL 引擎。我們計劃今后對這一塊加以完善。
TBSSQL 是一個復雜的工程,要實現 Batch/Streaming 的融合,除了以上提到這四點,TBSSQL 還有很有很多工作要做,這里就不一一詳述了。或許,下次 Hackathon 可以再繼續搞一把 TBSSQL 2.0 玩玩:) 有點遺憾的是作為選手出場,沒有和所有優秀的參賽的小伙伴們暢談交流,希望有機會可以補上。屬于大家的青春不散場,TiDB Hackathon 2019,不見不散~~
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/17869.html
摘要:在上,我司聯合創始人崔秋帶大家一起回顧了年社區成長足跡,在社區榮譽時刻環節,我們為新晉授予了證書,并為年度最佳貢獻個人團隊頒發了榮譽獎杯。同時,我們也為新晉授予了證書,并為年最佳社區貢獻個人最佳社區貢獻團隊頒發了榮譽獎杯。 2018 年 TiDB 產品變得更加成熟和穩定,同時 TiDB 社區力量也在發展壯大。在 TiDB DevCon 2019 上,我司聯合創始人崔秋帶大家一起回顧了 ...
摘要:我們非常希望本屆誕生的優秀項目能夠在社區中延續下去,感興趣的小伙伴們可以加入進來哦本文作者是來自團隊的楊文同學,他們的項目天真貝葉斯學習機在本屆中獲得了三等獎最佳創意獎。比賽前一日從廣州南站出發,次日抵達北京西站。 Ti Hack 系列 TiDB Hackathon 2018 共評選出六組優秀項目,本系列文章將由這六組項目的成員主筆,分享他們的參賽經驗和成果。我們非常希望本屆 Hack...
閱讀 2236·2021-11-24 11:15
閱讀 3079·2021-11-24 10:46
閱讀 1377·2021-11-24 09:39
閱讀 3924·2021-08-18 10:21
閱讀 1478·2019-08-30 15:53
閱讀 1395·2019-08-30 11:19
閱讀 3319·2019-08-29 18:42
閱讀 2321·2019-08-29 16:58