點擊上方“IT那活兒”公眾號,關注后了解更多內容,不管IT什么活兒,干就完了!!!
流式查詢的觸發器的設置定義了流數據處理的時間,無論查詢時作為微批處理還是作為連續查詢執行。
微批觸發類型
例子:
連續處理模式(Continuous processing)
Continuous processing連續處理模式是Spark 2.3中引入的一種新的實驗性流式執行模式,可實現小于1毫秒的端到端延遲,并至少保證一次容錯。與默認的微批處理引擎相比,該引擎可以實現一次保證,但最多可實現約100ms的延遲。
對于某些類型的查詢(如下所述),您可以選擇在不修改應用程序邏輯(即不更改數據幀/數據集操作)的情況下選擇何種模式執行。
如果要在連續處理模式下執行查詢,只需要定義一個連續觸發器,并將檢查點間隔作為參數。
1秒的檢查點間隔意味著連續處理引擎將每秒記錄查詢的進度。生成的檢查點采用與微批處理引擎兼容的格式,因此可以使用任何觸發器重新啟動任何查詢。
例如,以微批處理模式啟動的受支持查詢可以在連續模式下重新啟動,反之亦然。請注意,無論何時切換到連續模式,您都將獲得至少一次容錯保證。
自spark2.4,連續處理模式下僅支持以下查詢類型:
操作(operation):在連續處理模式下僅支持類似map類型操作,像select, map, flatMap, mapPartitions,where, filter等,除了聚合函數支持所有SQL函數。
數據源(source):kafka source和rate source(用于測試)。
接收器(sink):Kafka sink,.Memory sink,Console sink。
雖然控制臺接收器適合測試,但最好使用Kafka作為源和接收器來觀察端到端的低延遲處理,因為這允許引擎在輸入主題中的輸入數據可用的毫秒內處理數據并在輸出主題中提供結果。
警告:
1)Continuous processing engine啟動多個長時間運行的任務,這些任務不斷地從源讀取數據、處理數據并不斷地向接收器寫入數據。查詢所需的任務數量取決于查詢可以并行地從源中讀取多少分區。
因此,在啟動連續處理查詢之前,您必須確保集群中有足夠的內核來并行執行所有任務。
例如,如果您正在閱讀一個有10個分區的Kafka主題,那么集群必須至少有10個核心才能使查詢取得進展。
2)停止連續處理流可能會產生虛假的任務終止警告。可以放心地忽略這些。
3)當前沒有自動重試失敗的任務。任何故障都將導致查詢停止,需要從檢查點手動重新啟動查詢。
4)查詢運行后,無法修改多個配置。要更改它們,請放棄檢查點并啟動新查詢。這些配置包括:
spark.sql.shuffle.partitions
這是由于狀態的物理分區:狀態通過對鍵應用哈希函數進行分區,因此狀態的分區數應該保持不變。如果您希望為有狀態操作運行更少的任務,那么coalesce將有助于避免不必要的重新分區。合并后,除非發生另一次洗牌,否則(減少的)任務數將保持不變。
spark.sql.streaming.stateStore.providerClass,要正確讀取查詢的上一個狀態,狀態存儲提供程序的類應保持不變
spark.sql.streaming.multipleWatermarkPolicy
當查詢包含多個水印時,修改此項會導致水印值不一致,因此策略應保持不變。
流查詢管理
調用start()方法后將生成StreamingQuery對象,可用于監視可管理流查詢。
你可以再單個SparkSession中同時運行任意數量的查詢,這些查詢將同時運行并共享集群資源,您可以使用sparkSession.streams()獲取StreamingQueryManager,其可以管理當前活動的查詢。
流查詢監控
有多種方法可以監控流查詢,可以使用spark的Dropwizard Metrics支持,或者通過編程方式進入他們。
您可以使用streamingQuery.lastProgress()和streamingQuery.status()直接獲取活動查詢的當前狀態和指標。
lastProgress()返回Scala和Java中的StreamingQueryProgress對象,以及Python中具有相同字段的字典。它包含關于流的最后一個觸發器中所取得的進展的所有信息—處理了哪些數據、處理速率、延遲等。
streamingQuery.recentProgress,它返回最后幾個進展的數組。
您還可以通過附加StreamingQueryListener(Scala/Java文檔)異步監視與SparkSession關聯的所有查詢。使用sparkSession.streams.attachListener()附加自定義StreamingQueryListener對象后,當查詢啟動和停止以及在活動查詢中取得進展時,將收到回調。
舉個例子:
啟用此配置后在SparkSession中啟動的所有查詢將通過Dropwizard向已配置的任何接收器(例如Ganglia、Graphite、JMX等)報告度量。
使用檢查點從故障中恢復
如果出現故障或故意關閉,可以恢復以前查詢的進度和狀態,并在停止時繼續。這是使用檢查點和預寫日志完成的。
您可以使用檢查點位置配置查詢,該查詢將所有進度信息(即每個觸發器中處理的偏移量范圍)和正在運行的聚合(例如,快速示例中的字數)保存到檢查點位置。此檢查點位置必須是HDFS兼容文件系統中的路徑。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/129471.html
摘要:典型實現不同的監控模塊,側重于不同領域,有著不同的職責。指標收集方面,支持多樣化的組件將被優先下使用。以上談了這么多,僅僅是聊了一下收集方面而已。 更多文章,請移步微信公眾號《小姐姐味道》 mp原文 https://mp.weixin.qq.com/s?__...監控是分布式系統的必備組件,能夠起到提前預警、問題排查、評估決策等功效,乃行走江湖、居家必備之良品。 監控系統概要 功能劃分...
摘要:典型實現不同的監控模塊,側重于不同領域,有著不同的職責。指標收集方面,支持多樣化的組件將被優先下使用。以上談了這么多,僅僅是聊了一下收集方面而已。 更多文章,請移步微信公眾號《小姐姐味道》 mp原文 https://mp.weixin.qq.com/s?__...監控是分布式系統的必備組件,能夠起到提前預警、問題排查、評估決策等功效,乃行走江湖、居家必備之良品。 監控系統概要 功能劃分...
摘要:的五種就緒狀態請求未初始化還沒有調用。請求已發送,正在處理中通常現在可以從響應中獲取內容頭。并且還提供了每個階段的事件如果請求中止,會觸發事件。網絡錯誤如太多重定向會阻止請求完成,會觸發事件。當等待服務器的響應時,對象會發生事件。 所謂web,即使你我素未謀面,便知志趣相投;足不出戶,亦知世界之大。 01 — 為什么攔截請求 現在的web應用,大都是通過請求(http)去獲取資源,拿到...
摘要:的五種就緒狀態請求未初始化還沒有調用。請求已發送,正在處理中通常現在可以從響應中獲取內容頭。并且還提供了每個階段的事件如果請求中止,會觸發事件。網絡錯誤如太多重定向會阻止請求完成,會觸發事件。當等待服務器的響應時,對象會發生事件。 所謂web,即使你我素未謀面,便知志趣相投;足不出戶,亦知世界之大。 01 — 為什么攔截請求 現在的web應用,大都是通過請求(http)去獲取資源,拿到...
閱讀 1346·2023-01-11 13:20
閱讀 1684·2023-01-11 13:20
閱讀 1132·2023-01-11 13:20
閱讀 1858·2023-01-11 13:20
閱讀 4100·2023-01-11 13:20
閱讀 2704·2023-01-11 13:20
閱讀 1385·2023-01-11 13:20
閱讀 3597·2023-01-11 13:20