摘要:對批處理表的查詢不支持,和很多中常見的標量函數。此外,可以同時在靜態(tài)表和流表上進行查詢,這和的愿景是一樣的,將批處理看做特殊的流處理批看作是有限的流。最后,使用標準進行流處理意味著有很多成熟的工具支持。查詢結果直接顯示在中。
從何而來
關系型API有很多好處:是聲明式的,用戶只需要告訴需要什么,系統決定如何計算;用戶不必特地實現;更方便優(yōu)化,可以執(zhí)行得更高效。本身Flink就是一個統一批和流的分布式計算平臺,所以社區(qū)設計關系型API的目的之一是可以讓關系型API作為統一的一層,兩種查詢擁有同樣的語義和語法。大多數流處理框架的API都是比較low-level的API,學習成本高而且很多邏輯需要寫到UDF中,所以Apache Flink 添加了SQL-like的API處理關系型數據--Table API。這套API中最重要的概念是Table(可以在上面進行關系型操作的結構化的DataSet或DataStream)。Table API 與 DataSet和DataStream API 結合緊密,DataSet 和 DataStream都可以很容易地轉換成 Table,同樣轉換回來也很方便:
val execEnv = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(execEnv) // obtain a DataSet from somewhere val tempData: DataSet[(String, Long, Double)] = // convert the DataSet to a Table val tempTable: Table = tempData.toTable(tableEnv, "location, "time, "tempF) // compute your result val avgTempCTable: Table = tempTable .where("location.like("room%")) .select( ("time / (3600 * 24)) as "day, "Location as "room, (("tempF - 32) * 0.556) as "tempC ) .groupBy("day, "room) .select("day, "room, "tempC.avg as "avgTempC) // convert result Table back into a DataSet and print it avgTempCTable.toDataSet[Row].print()
example使用的是Scala的API,Java版API也有同樣的功能。
下圖展示了 Table API 的架構:
從 DataSet 或 DataStream 創(chuàng)建一個 Table,然后在上面進行關系型操作比如 fliter、join、select。對Table的操作將會轉換成邏輯運算符樹。Table 轉換回 DataSet 和 DataStream 的時候將會轉換成DataSet 和 DataStream的算子。有些類似 "location.like("room%") 的表達式將會通過 code generation 編譯成Flink的函數。
然而,最初傳統的Table API 有一定的限制。首先,它不能獨立使用。Table API 的 query 必須嵌入到 DataSet 或 DataStream的程序中。對批處理表的查詢不支持outer join,sorting和很多SQL中常見的標量函數。對于流處理的查詢只支持filtetr union 和 projection,不支持aggregation和join。而且,轉換過程中沒有利用太多查詢優(yōu)化技術,除了適用于所有DataSet程序的優(yōu)化。
Table API 和 SQL 緊密結合隨著流處理的日益普及和Flink在該領域的增長,Flink社區(qū)認為需要一個更簡單的API使更多的用戶能夠分析流數據。一年前Flink社區(qū)決定將Table API提升到一個新的層級,擴展Table API中流處理的能力以及支持SQL。社區(qū)不想重復造輪子,于是決定在 Apache Calcite (一個比較流行的SQL解析和優(yōu)化框架)的基礎上構建新的 Table API。Apache Calcite 被用在很多項目中,包括 Apache Hive,Apache Drill,Cascading等等。除此之外,Calcite社區(qū)將 SQL on Stream 寫入它的roadmap,所以Flink的SQL很適合和它結合。
以Calcite為核心的新架構圖:
新架構提供兩種API進行關系型查詢,Table API 和 SQL。這兩種API的查詢都會用包含注冊過的Table的catalog進行驗證,然后轉換成統一Calcite的logical plan。在這種表示中,stream和batch的查詢看起來完全一樣。下一步,利用 Calcite的 cost-based 優(yōu)化器優(yōu)化轉換規(guī)則和logical plan。根據數據源的性質(流式和靜態(tài))使用不同的規(guī)則進行優(yōu)化。最終優(yōu)化后的plan轉傳成常規(guī)的Flink DataSet 或 DataStream 程序。這步還涉及code generation(將關系表達式轉換成Flink函數)。
下面我們舉一個例子來理解新的架構。表達式轉換成Logical Plan如下圖所示:
調用Table API 實際上是創(chuàng)建了很多 Table API 的 LogicalNode,創(chuàng)建的過程中對會對整個query進行validate。比如table是CalalogNode,window groupBy之后在select時會創(chuàng)建WindowAggregate和Project,where對應Filter。然后用RelBuilder翻譯成Calcite LogicalPlan。如果是SQL API 將直接用Calcite的Parser進行解釋然后validate生成Calcite LogicalPlan。
利用Calcite內置的一些rule來優(yōu)化LogicalPlan,也可以自己添加或者覆蓋這些rule。轉換成Optimized Calcite Plan后,仍然是Calcite的內部表示方式,現在需要transform成DataStream Plan,對應上圖第三列的類,里面封裝了如何translate成普通的DataStream或DataSet程序。隨后調用相應的tanslateToPlan方法轉換和利用CodeGen元編程成Flink的各種算子。現在就相當于我們直接利用Flink的DataSet或DataStream API開發(fā)的程序。
Table API的新架構除了維持最初的原理還改進了很多。為流式數據和靜態(tài)數據的關系查詢保留統一的接口,而且利用了Calcite的查詢優(yōu)化框架和SQL parser。該設計是基于Flink已構建好的API構建的,DataStream API 提供低延時高吞吐的流處理能力而且就有exactly-once語義而且可以基于event-time進行處理。而且DataSet擁有穩(wěn)定高效的內存算子和流水線式的數據交換。Flink的core API和引擎的所有改進都會自動應用到Table API和SQL上。
新的SQL接口集成到了Table API中。DataSteam, DataSet和外部數據源可以在TableEnvironment中注冊成表,為了是他們可以通過SQL進行查詢。TableEnvironment.sql()方法用來聲明SQL和將結果作為Table返回。下面的是一個完整的樣例,從一個JSON編碼的Kafka topic中讀取流表,然后用SQL處理并寫到另一個Kafka topic。
// get environments val execEnv = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(execEnv) // configure Kafka connection val kafkaProps = ... // define a JSON encoded Kafka topic as external table val sensorSource = new KafkaJsonSource[(String, Long, Double)]( "sensorTopic", kafkaProps, ("location", "time", "tempF")) // register external table tableEnv.registerTableSource("sensorData", sensorSource) // define query in external table val roomSensors: Table = tableEnv.sql( "SELECT STREAM time, location AS room, (tempF - 32) * 0.556 AS tempC " + "FROM sensorData " + "WHERE location LIKE "room%"" ) // define a JSON encoded Kafka topic as external sink val roomSensorSink = new KafkaJsonSink(...) // define sink for room sensor data and execute query roomSensors.toSink(roomSensorSink) execEnv.execute()
這個樣例中忽略了流處理中最有趣的部分:window aggregate 和 join。這些操作如何用SQL表達呢?Apache Calcite社區(qū)提出了一個proposal來討論SQL on streams的語法和語義。社區(qū)將Calcite的stream SQL描述為標準SQL的擴展而不是另外的 SQL-like語言。這有很多好處,首先,熟悉SQL標準的人能夠在不學習新語法的情況下分析流數據。靜態(tài)表和流表的查詢幾乎相同,可以輕松地移植。此外,可以同時在靜態(tài)表和流表上進行查詢,這和flink的愿景是一樣的,將批處理看做特殊的流處理(批看作是有限的流)。最后,使用標準SQL進行流處理意味著有很多成熟的工具支持。
下面的example展示了如何用SQL和Table API進行滑動窗口查詢:
SQL
SELECT STREAM TUMBLE_END(time, INTERVAL "1" DAY) AS day, location AS room, AVG((tempF - 32) * 0.556) AS avgTempC FROM sensorData WHERE location LIKE "room%" GROUP BY TUMBLE(time, INTERVAL "1" DAY), location
Table API
val avgRoomTemp: Table = tableEnv.ingest("sensorData") .where("location.like("room%")) .partitionBy("location) .window(Tumbling every Days(1) on "time as "w) .select("w.end, "location, , (("tempF - 32) * 0.556).avg as "avgTempCs)Table API的現狀 Batch SQL & Table API 支持:
Selection, Projection, Sort, Inner & Outer Joins, Set operations
Windows for Slide, Tumble, Session
Streaming Table API 支持:Selection, Projection, Union
Windows for Slide, Tumble, Session
Streaming SQL:Selection, Projection, Union, Tumble
Streaming SQL案例 持續(xù)的ETL和數據導入獲取流式數據,然后轉換這些數據(歸一化,聚合...),將其寫入其他系統(File,Kafka,DBMS)。這些query的結果通常會存儲到log-style的系統。
實時的Dashboards 和 報表獲取流式數據,然后對數據進行聚合來支持在線系統(dashboard,推薦)或者數據分析系統(Tableau)。通常結果被寫到k-v存儲中(Cassandra,Hbase,可查詢的Flink狀態(tài)),建立索引(Elasticsearch)或者DBMS(MySQL,PostgreSQL...)。這些查詢通常可以被更新,改進。
即席分析針對流數據的即席查詢,以實時的方式進行分析和瀏覽數據。查詢結果直接顯示在notebook(Apache Zeppelin)中。
Flink社區(qū)還提出來和數據庫中Materialized View很相似的Dynamic table 動態(tài)表概念,將在以后的版本中支持,具體細節(jié)將另開文章解釋。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/66881.html
摘要:每個在簡潔性和表達性之間提供不同的權衡,并針對不同的用例。在這些中處理的數據類型在相應的編程語言中表示為類。該是為中心的聲明性表,其可被動態(tài)地改變的表表示流時。這種抽象在語義和表達方面類似于,但是將程序表示為查詢表達式。 1 意義 1.1 分層的 APIs & 抽象層次 Flink提供三層API。 每個API在簡潔性和表達性之間提供不同的權衡,并針對不同的用例。 showImg(ht...
摘要:通過狀態(tài)演變,可以在狀態(tài)模式中添加或刪除列,以便更改應用程序部署后應捕獲的業(yè)務功能。本地恢復通過擴展的調度來完成本地恢復功能,以便在恢復時考慮先前的部署位置。此功能大大提高了恢復速度。問題導讀1.Flink1.7開始支持Scala哪個版本?2.Flink1.7狀態(tài)演變在實際生產中有什么好處?3.支持SQL/Table API中的富集連接可以做那些事情?4.Flink1.7新增了哪些連接器Ap...
摘要:擴展庫還包括用于復雜事件處理,機器學習,圖形處理和兼容性的專用代碼庫。事件時間機制使得那些事件無序到達甚至延遲到達的數據流能夠計算出精確的結果。負責接受用戶的程序代碼,然后創(chuàng)建數據流,將數據流提交給以便進一步執(zhí)行。 showImg(https://segmentfault.com/img/remote/1460000016902812); 前言 Flink 是一種流式計算框架,為什么我...
閱讀 3571·2023-04-26 02:05
閱讀 2011·2021-11-19 11:30
閱讀 4219·2021-09-30 09:59
閱讀 3178·2021-09-10 10:51
閱讀 2610·2021-09-01 10:30
閱讀 1485·2021-08-11 11:20
閱讀 2619·2019-08-30 15:54
閱讀 568·2019-08-30 10:49