国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

TiDB 源碼閱讀系列文章(二十二)Hash Aggregation

kelvinlee / 612人閱讀

摘要:若聚合函數不可以下推,則保持不變。目前當且僅當兩種情況下不可以并行執行存在某個聚合函數參數為時。

作者:徐懷宇
聚合算法執行原理

在 SQL 中,聚合操作對一組值執行計算,并返回單個值。TiDB 實現了 2 種聚合算法:Hash Aggregation 和 Stream Aggregation。

我們首先以 AVG 函數為例(案例參考 Stack Overflow),簡述這兩種算法的執行原理。

假設表 t 如下:

列 a 列 b
1 9
1 -8
2 -7
2 6
1 5
2 4

SQL: select avg(b) from t group by a, 要求將表 t 的數據按照 a 的值分組,對每一組的 b 值計算平均值。不管 Hash 還是 Stream 聚合,在 AVG 函數的計算過程中,我們都需要維護 2 個中間結果變量 sumcount。Hash 和 Stream 聚合算法的執行原理如下。

Hash Aggregate 的執行原理

在 Hash Aggregate 的計算過程中,我們需要維護一個 Hash 表,Hash 表的鍵為聚合計算的 Group-By 列,值為聚合函數的中間結果 sumcount。在本例中,鍵為 列 a 的值,值為 sum(b)count(b)。

計算過程中,只需要根據每行輸入數據計算出鍵,在 Hash 表中找到對應值進行更新即可。對本例的執行過程模擬如下。

輸入數據 a b Hash 表 [key] (sum, count)
1 9 [1] (9, 1)
1 -8 [1] (1, 2)
2 -7 [1] (1, 2) ?[2] (-7, 1)
2 6 [1] (1, 2) ?[2] (-1, 2)
1 5 [1] (6, 3) ?[2] (-1, 2)
2 4 [1] (6, 3) ?[2] (3, 3)

輸入數據輸入完后,掃描 Hash 表并計算,便可以得到最終結果:

Hash 表 avg(b)
[1] (6, 3) 2
[2] (3, 3) 1
Stream Aggregation 的執行原理

Stream Aggregate 的計算需要保證輸入數據按照 Group-By 列有序。在計算過程中,每當讀到一個新的 Group 的值或所有數據輸入完成時,便對前一個 Group 的聚合最終結果進行計算。

對于本例,我們首先對輸入數據按照 a 列進行排序。排序后,本例執行過程模擬如下。

輸入數據 是否為新 Group 或所有數據輸入完成 (sum, count) avg(b)
1 9 (1, 9) 前一個 Group 為空,不進行計算
1 -8 (2, 1)
1 5 (3, 6)
2 -7 (1, -7) 2
2 6 (2, -1)
2 4 (3, 3)
1

因為 Stream Aggregate 的輸入數據需要保證同一個 Group 的數據連續輸入,所以 Stream Aggregate 處理完一個 Group 的數據后可以立刻向上返回結果,不用像 Hash Aggregate 一樣需要處理完所有數據后才能正確的對外返回結果。當上層算子只需要計算部分結果時,比如 Limit,當獲取到需要的行數后,可以提前中斷 Stream Aggregate 后續的無用計算。

Group-By 列上存在索引時,由索引讀入數據可以保證輸入數據按照 Group-By 列有序,此時同一個 Group 的數據連續輸入 Stream Aggregate 算子,可以避免額外的排序操作。

TiDB 聚合函數的計算模式

由于分布式計算的需要,TiDB 對于聚合函數的計算階段進行劃分,相應定義了 5 種計算模式:CompleteMode,FinalMode,Partial1Mode,Partial2Mode,DedupMode。不同的計算模式下,所處理的輸入值和輸出值會有所差異,如下表所示:

AggFunctionMode 輸入值 輸出值
CompleteMode 原始數據 最終結果
FinalMode 中間結果 最終結果
Partial1Mode 原始數據 中間結果
Partial2Mode 中間結果 進一步聚合的中間結果
DedupMode 原始數據 去重后的原始數據

以上文提到的 select avg(b) from t group by a 為例,通過對計算階段進行劃分,可以有多種不同的計算模式的組合,如:

CompleteMode

此時 AVG 函數的整個計算過程只有一個階段,如圖所示:

Partial1Mode --> FinalMode

此時我們將 AVG 函數的計算過程拆成兩個階段進行,如圖所示:

除了上面的兩個例子外,還可能有如下的幾種計算方式:

聚合被下推到 TiKV 上進行計算(Partial1Mode),并返回經過預聚合的中間結果。為了充分利用 TiDB server 所在機器的 CPU 和內存資源,加快 TiDB 層的聚合計算,TiDB 層的聚合函數計算可以這樣進行:Partial2Mode --> FinalMode。

當聚合函數需要對參數進行去重,也就是包含 DISTINCT 屬性,且聚合算子因為一些原因不能下推到 TiKV 時,TiDB 層的聚合函數計算可以這樣進行:DedupMode --> Partial1Mode --> FinalMode。

聚合函數分為幾個階段執行, 每個階段對應的模式是什么,是否要下推到 TiKV,使用 Hash 還是 Stream 聚合算子等都由優化器根據數據分布、估算的計算代價等來決定。

TiDB 并行 Hash Aggregation 的實現 如何構建 Hash Aggregation 執行器

構建邏輯執行計劃 時,會調用 NewAggFuncDesc 將聚合函數的元信息封裝為一個 AggFuncDesc。 其中 AggFuncDesc.RetTp 由 AggFuncDesc.typeInfer 根據聚合函數類型及參數類型推導而來;AggFuncDesc.Mode 統一初始化為 CompleteMode。

構建物理執行計劃時,PhysicalHashAggPhysicalStreamAggattach2Task 方法會根據當前 task 的類型嘗試進行下推聚合計算,如果 task 類型滿足下推的基本要求,比如 copTask,接著會調用 newPartialAggregate 嘗試將聚合算子拆成 TiKV 上執行的 Partial 算子和 TiDB 上執行的 Final 算子,其中 AggFuncToPBExpr 函數用來判斷某個聚合函數是否可以下推。若聚合函數可以下推,則會在 TiKV 中進行預聚合并返回中間結果,因此需要將 TiDB 層執行的 Final 聚合算子的 AggFuncDesc.Mode 修改為 FinalMode,并將其 AggFuncDesc.Args 修改為 TiKV 預聚合后返回的中間結果,TiKV 層的 Partial 聚合算子的 AggFuncDesc 也需要作出對應的修改,這里不再詳述。若聚合函數不可以下推,則 AggFuncDesc.Mode 保持不變。

構建 HashAgg 執行器時,首先檢查當前 HashAgg 算子是否可以并行執行。目前當且僅當兩種情況下 HashAgg 不可以并行執行:

存在某個聚合函數參數為 DISTINCT 時。TiDB 暫未實現對 DedupMode 的支持,因此對于含有 DISTINCT 的情況目前僅能單線程執行。

系統變量 tidb_hashagg_partial_concurrencytidb_hashagg_final_concurrency 被同時設置為 1 時。這兩個系統變量分別用來控制 Hash Aggregation 并行計算時候,TiDB 層聚合計算 partial 和 final 階段 worker 的并發數。當它們都被設置為 1 時,選擇單線程執行。

HashAgg 算子可以并行執行,使用 AggFuncDesc.Split 根據 AggFuncDesc.Mode 將 TiDB 層的聚合算子的計算拆分為 partial 和 final 兩個階段,并分別生成對應的 AggFuncDesc,設為 partialAggDescfinalAggDesc。若 AggFuncDesc.Mode == CompleteMode,則將 TiDB 層的計算階段拆分為 Partial1Mode --> FinalMode;若 AggFuncDesc.Mode == FinalMode,則將 TiDB 層的計算階段拆分為 Partial2Mode --> FinalMode。進一步的,我們可以根據 partialAggDescfinalAggDesc 分別 構造出對應的執行函數。

并行 Hash Aggregation 執行過程詳述

TiDB 的并行 Hash Aggregation 算子執行過程中的主要線程有:Main Thead,Data Fetcher,Partial Worker,和 Final Worker:

Main Thread 一個:

啟動 Input Reader,Partial Workers 及 Final Workers

等待 Final Worker 的執行結果并返回

Data Fetcher 一個:

按 batch 讀取子節點數據并分發給 Partial Worker

Partial Worker 多個:

讀取 Data Fetcher 發送來的數據,并做預聚合

將預聚合結果根據 Group 值 shuffle 給對應的 Final Worker

Final Worker 多個:

讀取 PartialWorker 發送來的數據,計算最終結果,發送給 Main Thread

Hash Aggregation 的執行階段可分為如下圖所示的 5 步:

啟動 Data Fetcher,Partial Workers 及 Final Workers。

這部分工作由 prepare4Parallel 函數完成。該函數會啟動一個 Data Fetcher,多個 Partial Worker 以及 多個 Final Worker。Partial Worker 和 Final Worker 的數量可以分別通過 tidb_hashgg_partial_concurrencytidb_hashagg_final_concurrency 系統變量進行控制,這兩個系統變量的默認值都為 4。

DataFetcher 讀取子節點的數據并分發給 Partial Workers。

這部分工作由 fetchChildData 函數完成。

Partial Workers 預聚合計算,及根據 Group Key shuffle 給對應的 Final Workers。

這部分工作由 HashAggPartialWorker.run 函數完成。該函數調用 updatePartialResult 函數對 DataFetcher 發來數據執行 預聚合計算,并將預聚合結果存儲到 partialResultMap 中。其中 partialResultMap 的 key 為根據 Group-By 的值 encode 的結果,value 為 PartialResult 類型的數組,數組中的每個元素表示該下標處的聚合函數在對應 Group 中的預聚合結果。shuffleIntermData 函數完成根據 Group 值 shuffle 給對應的 Final Worker。

Final Worker 計算最終結果,發送給 Main Thread。

這部分工作由 HashAggFinalWorker.run 函數完成。該函數調用 consumeIntermData 函數 接收 PartialWorkers 發送來的預聚合結果,進而 合并 得到最終結果。getFinalResult 函數完成發送最終結果給 Main Thread。

Main Thread 接收最終結果并返回。

TiDB 并行 Hash Aggregation 的性能提升

此處以 TPC-H query-17 為例,測試并行 Hash Aggregation 相較于單線程計算時的性能提升。引入并行 Hash Aggregation 前,它的計算瓶頸在 HashAgg_35。

該查詢執行計劃如下:

在 TiDB 中,使用 EXPLAIN ANALYZE 可以獲取 SQL 的執行統計信息。因篇幅原因此處僅貼出 TPC-H query-17 部分算子的 EXPLAIN ANALYZE 結果。

HashAgg 單線程計算時:

查詢總執行時間 23 分 24 秒,其中 HashAgg 執行時間約 17 分 9 秒。

HashAgg 并行計算時(此時 TiDB 層 Partial 和 Final 階段的 worker 數量都設置為 16):

總查詢時間 8 分 37 秒,其中 HashAgg 執行時間約 1 分 4 秒。

并行計算時,Hash Aggregation 的計算速度提升約 16 倍。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/17856.html

相關文章

  • TiDB 源碼閱讀系列文章二十)Table Partition

    摘要:部分主要流程如下把上文提到語法解析階段會把語句中相關信息轉換成然后負責把結構轉換即的元信息。最后把的元信息追加到的元信息中,具體實現在這里。會把要刪除的分區從元信息刪除掉,刪除前會做的檢查。 作者:肖亮亮 Table Partition 什么是 Table Partition Table Partition 是指根據一定規則,將數據庫中的一張表分解成多個更小的容易管理的部分。從邏輯上看...

    K_B_Z 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<