摘要:本文對使用分析文本數據的實踐過程中遇到的一些問題和優化經驗進行了總結。作為前提,讀者需要詳細了解讀取文本數據的一些基礎知識,對這篇官方文檔訪問非結構化數據最好有過實踐經驗。這需要針對含有非英文字符的文本數據做一些特殊處理。
摘要: 1. 本文背景 很多行業的信息系統中,例如金融行業的信息系統,相當多的數據交互工作是通過傳統的文本文件進行交互的。此外,很多系統的業務日志和系統日志由于各種原因并沒有進入ELK之類的日志分析系統,也是以文本文件的形式存在的。
本文背景
很多行業的信息系統中,例如金融行業的信息系統,相當多的數據交互工作是通過傳統的文本文件進行交互的。此外,很多系統的業務日志和系統日志由于各種原因并沒有進入ELK之類的日志分析系統,也是以文本文件的形式存在的。隨著數據量的指數級增長,對超大文本文件的分析越來越成為挑戰。好在阿里云的MaxCompute產品從2.0版本開始正式支持了直接讀取并分析存儲在OSS上的文本文件,可以用結構化查詢的方式去分析非結構化的數據。
本文對使用MaxCompute分析OSS文本數據的實踐過程中遇到的一些問題和優化經驗進行了總結。作為前提,讀者需要詳細了解MaxCompute讀取OSS文本數據的一些基礎知識,對這篇官方文檔 《訪問 OSS 非結構化數據》最好有過實踐經驗。本文所描述的內容主要是針對這個文檔中提到的自定義Extractor做出的一些適配和優化。
場景實踐
2.1 場景一:分析zip壓縮后的文本文件
場景說明
很多時候我們會對歷史的文本數據進行壓縮,然后上傳到OSS上進行歸檔,那么如果要對這部分數據導入MaxCompute進行離線分析,我們可以自定義Extractor讓MaxCompute直接讀取OSS上的歸檔文件,避免了把歸檔文件下載到本地、解壓縮、再上傳回OSS這樣冗長的鏈路。
實現思路
如 《訪問 OSS 非結構化數據》文檔中所述,MaxCompute讀取OSS上的文本數據本質上是讀取一個InputStream流,那么我們只要構造出適當的歸檔字節流,就可以直接獲取這個InputStream中的數據了。
以Zip格式的歸檔文件為例,我們可以參考 DataX 中關于讀取OSS上Zip文件的源碼,構造一個Zip格式的InputStream,代碼見 ZipCycleInputStream.java 。構造出這個Zip格式的InputStream后,在自定義Extractor中獲取文件流的部分就可以直接使用了,例如:
private BufferedReader moveToNextStream() throws IOException { SourceInputStream stream = inputs.next(); // ...... ZipCycleInputStream zipCycleInputStream = new ZipCycleInputStream(stream); return new BufferedReader(new InputStreamReader(zipCycleInputStream, "UTF-8"), 8192); // ...... }
優化經驗
大家可能知道,MaxCompute中進行批量計算的時候,可以通過設置 odps.stage.mapper.split.size 這個參數來調整數據分片的大小,從而影響到執行計算任務的Mapper的個數,在一定程度上提高Mapper的個數可以增加計算的并行度,進而提高計算效率 (但也不是說Mapper個數越多越好,因為這樣可能會造成較長時間的資源等待,或者可能會造成長尾的后續Reducer任務,反而降低整體的計算效率) 。
同樣道理,對OSS上的文本文件進行解析的時候,也可以通過設置 odps.sql.unstructured.data.split.size 這個參數來達到調整Mapper個數的目的 (注意這個參數可能需要提工單開通使用權限):
set odps.sql.unstructured.data.split.size=16;
上述設定的含義是,將OSS上的文件拆分為若干個16M左右大小的分片,讓MaxCompute盡力做到每個分片啟動一個Mapper任務進行計算——之所以說是“盡力做到”,是因為MaxCompute默認不會對單個文件進行拆分及分片處理(除非設定了其他參數,我們后面會講到),也就是說,如果把單個分片按照上面的設定為16M,而OSS上某個文件大小假設為32M,則MaxCompute仍然會把這個文件整體(即32M)的數據量作為一個分片進行Mapper任務計算。
注意點
我們在這個場景中處理的是壓縮后的文件,而InputStream處理的字節量大小是不會因壓縮而變小的。舉個例子,假設壓縮比為1:10,則上述這個32M的壓縮文件實際代表了320M的數據量,即MaxCompute會把1個Mapper任務分配給這320M的數據量進行處理;同理假設壓縮比為1:20,則MaxCompute會把1個Mapper任務分配給640M的數據量進行處理,這樣就會較大的影響計算效率。因此,我們需要根據實際情況調整分片參數的大小,并盡量把OSS上的壓縮文件大小控制在一個比較小的范圍內,從而可以靈活配置分片參數,否則分片參數的值會因為文件太大并且文件不會被拆分而失效。
2.2 場景二:過濾文本文件中的特定行
場景說明
對于一些業務數據文件,特別是金融行業的數據交換文件,通常會有文件頭或文件尾的設定要求,即文件頭部的若干行數據是一些元數據信息,真正要分析的業務數據需要把這些元信息的行過濾掉,只分析業務數據部分的行,否則執行結構化查詢的SQL語句的時候必然會造成任務失敗。
實現思路
在 《訪問 OSS 非結構化數據》文檔中提到的 代碼示例 中,對 readNextLine() 方法進行一些改造,對讀取的每一個文件,即每個 currentReader 讀取下一行的時候,記錄下來當前處理的行數,用這個行數判斷是否到達了業務數據行,如果未到業務數據行,則繼續讀取下一條記錄,如果已經到達數據行,則將該行內容返回處理;而當跳轉到下一個文件的時候,將 該行數值重置。
代碼示例:
private String readNextLine() throws IOException { if (firstRead) { firstRead = false; currentReader = moveToNextStream(); if (currentReader == null) { return null; } } // 讀取行級數據 while (currentReader != null) { String line = currentReader.readLine(); if (line != null) { if (currentLine < dataLineStart) { // 若當前行小于數據起始行,則繼續讀取下一條記錄 currentLine++; continue; } if (!"EOF".equals(line)) { // 若未到達文件尾則將該行內容返回,若到達文件尾則直接跳到下個文件 return line; } } currentReader = moveToNextStream(); currentLine = 1; } return null; }
此處 dataLineStart 表示業務數據的起始行,可以通過 DataAttributes 在建立外部表的時候從外部作為參數傳入。當然也可以隨便定義其他邏輯來過濾掉特定行,比如本例中的對文件尾的“EOF”行進行了簡單的丟棄處理。
2.3 場景三:忽略文本中的空行
場景說明
在 《訪問 OSS 非結構化數據》文檔中提到的 代碼示例 中,已可以應對大多數場景下的文本數據處理,但有時候在業務數據文本中會存在一些空行,這些空行可能會造成程序的誤判,因此我們需要忽略掉這些空行,讓程序繼續分析處理后面有內容的行。
實現思路
類似于上述 場景二 ,只需要判斷為空行后,讓程序繼續讀取下一行文本即可。
代碼示例:
public Record extract() throws IOException {
String line = readNextLine(); if (line == null) { return null;// 返回null標志已經讀取完成 } while ("".equals(line.trim()) || line.length() == 0 || line.charAt(0) == " " // 遇到空行則繼續處理 || line.charAt(0) == " ") { line = readNextLine(); if (line == null) return null; } return textLineToRecord(line);
}
2.4 場景四:選擇OSS上文件夾下的部分文件進行處理
場景說明
閱讀 《訪問 OSS 非結構化數據》文檔可知,一張MaxCompute的外部表連接的是OSS上的一個文件夾(嚴格來說OSS沒有“文件夾”這個概念,所有對象都是以Object來存儲的,所謂的文件夾其實就是在OSS創建的一個字節數為0且名稱以“/”結尾的對象。MaxCompute建立外部表時連接的是OSS上這樣的以“/”結尾的對象,即連接一個“文件夾”),在處理外部表時,默認會對該文件夾下 所有的文件 進行解析處理。該文件夾下所有的文件集合即被封裝為 InputStreamSet ,然后通過其 next() 方法來依次獲得每一個InputStream流、即每個文件流。
但有時我們可能會希望只處理OSS上文件夾下的 部分 文件,而不是全部文件,例如只分析那些文件名中含有“2018_”字樣的文件,表示只分析2018年以來的業務數據文件。
實現思路
在獲取到每一個InputStream的時候,通過 SourceInputStream 類的 getFileName() 方法獲取正在處理的文件流所代表的文件名,然后可以通過正則表達式等方式判斷該文件流是否為所需要處理的文件,如果不是則繼續調用 next() 方法來獲取下一個文件流。
代碼示例:
private BufferedReader moveToNextStream() throws IOException { SourceInputStream stream = null; while ((stream = inputs.next()) != null) { String fileName = stream.getFileName(); System.out.println("========inputs.next():" + fileName + "========"); if (patternModel.matcher(fileName).matches()) { System.out.println(String .format("- match fileName:[%s], pattern:[%s]", fileName, patternModel .pattern())); ZipCycleInputStream zipCycleInputStream = new ZipCycleInputStream(stream); return new BufferedReader(new InputStreamReader(zipCycleInputStream, "UTF-8"), 8192); } else { System.out.println(String.format( "-- discard fileName:[%s], pattern:[%s]", fileName, patternModel.pattern())); continue; } } return null; }
本例中的 patternModel 為通過 DataAttributes 在建立外部表的時候從外部作為參數傳入的正則規則。
寫到這里可能有讀者會問,如果一個文件夾下有很多文件,比如上萬個文件,整個遍歷一遍后只選擇一小部分文件進行處理這樣的方式會不會效率太低了?其實大可不必擔心,因為相對于MaxCompute對外部表執行批量計算的過程,循環遍歷文件流的時間消耗是非常小的,通常情況下是不會影響批量計算任務的。
2.5 場景五:針對單個大文件進行拆分
場景說明
在 場景一 中提到,要想提高計算效率,我們需要調整 odps.sql.unstructured.data.split.size 參數值來增加Mapper的并行度,但是對于單個大文件來講,MaxCompute默認是不進行拆分的,也就是說OSS上的單個大文件只會被分配給一個Mapper任務進行處理,如果這個文件非常大的話,處理效率將會及其低下,我們需要一種方式來實現對單個文件進行拆分,使其可以被多個Mapper任務進行并行處理。
實現思路
仍然是要依靠調整 odps.sql.unstructured.data.split.size 參數來增加Mapper的并行度,并且設定 odps.sql.unstructured.data.single.file.split.enabled 參數來允許拆分單個文件 (同odps.sql.unstructured.data.split.size,該參數也可能需要提工單申請使用權限) ,例如:
set odps.sql.unstructured.data.split.size=128; set odps.sql.unstructured.data.single.file.split.enabled=true;
設置好這些參數后,就需要編寫特定的Reader類來進行單個大文件的拆分了。
核心的思路是,根據 odps.sql.unstructured.data.split.size 所設定的值,大概將文件按照這個大小拆分開,但是拆分點極大可能會切在一條記錄的中間,這時就需要調整字節數,向前或向后尋找換行符,來保證最終的切分點落在一整條記錄的尾部。具體的實現細節相對來講比較復雜,可以參考在 《訪問 OSS 非結構化數據》文檔中提到的 代碼示例 來進行分析。
注意點
在計算字節數的過程中,可能會遇到非英文字符造成計算切分點的位置計算不準確,進而出現讀取的字節流仍然沒有把一整行覆蓋到的情況。這需要針對含有非英文字符的文本數據做一些特殊處理。
代碼示例:
@Override public int read(char[] cbuf, int off, int len) throws IOException { if (this.splitReadLen >= this.splitSize) { return -1; } if (this.splitReadLen + len >= this.splitSize) { len = (int) (this.splitSize - this.splitReadLen); } int readSize = this.internalReader.read(cbuf, off, len); int totalBytes = 0; for (char ch : cbuf) { String str = String.valueOf(ch); byte[] bytes = str.getBytes(charset); totalBytes += bytes.length; } this.splitReadLen += totalBytes; return readSize; }
其他建議
在編寫自定義Extractor的程序中,適當加入System.out作為日志信息輸出,這些日志信息會在MaxCompute執行時輸出在LogView的視圖中,對于調試過程和線上問題排查過程非常有幫助。
上文中提到通過調整 odps.sql.unstructured.data.split.size 參數值來適當提高Mapper任務的并行度,但是并行度并不是越高越好,具體什么值最合適是與OSS上的文件大小、總數據量、MaxCompute產品自身的集群狀態緊密聯系在一起的,需要多次調試,并且可能需要與 odps.stage.reducer.num、odps.sql.reshuffle.dynamicpt、odps.merge.smallfile.filesize.threshold 等參數配合使用才能找到最優值。并且由于MaxCompute產品自身的集群狀態也是很重要的因素,可能今天申請500個Mapper資源是很容易的事情,過幾個月就變成經常需要等待很長時間才能申請到,這就需要持續關注任務的執行時間并及時調整參數設定。
外部表的讀取和解析是依靠Extractor對文本的解析來實現的,因此在執行效率上是遠不能和MaxCompute的普通表相比的,所以在需要頻繁讀取和分析OSS上的文本文件的情況下,建議將OSS文件先 INSERT OVERWRITE 到MaxCompute中字段完全對等的一張普通表中,然后針對普通表進行分析計算,這樣通常會獲得更好的計算效率。
原文鏈接
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/71255.html
摘要:作為阿里云大數據旗艦產品,的級別性能處理達到了全球領先性,被評為全球云端數據倉庫領導者。天弘基金天弘基金旗下的余額寶,是中國規模最大的貨幣基金。場景二阿里云產品消費賬單分析準備工作完成案例中準備工作步驟。 摘要: 一、 MaxCompute是什么 你的OSS數據是否作堆積在一旁沉睡已久存儲成本變為企業負擔你是否想喚醒沉睡的數據驅動你的業務前行MaxCompute可以幫助你高效且低成本的...
摘要:在之前的文章中,我們已經介紹過怎樣在上對存儲在上的文本,音頻,圖像等格式的數據,以及的數據進行計算處理。外部表的必須與具體上存儲存儲數據的相符合。唯一不同的只是在內部計算引擎將從上去讀取對應的數據來進行處理。 前言MaxCompute作為使用最廣泛的大數據平臺,內部存儲的數據以EB量級計算。巨大的數據存儲量以及大規模計算下高性能數據讀寫的需求,對于MaxCompute提出了各種高要求及...
閱讀 2376·2021-09-22 15:15
閱讀 640·2021-09-02 15:11
閱讀 1784·2021-08-30 09:48
閱讀 1884·2019-08-30 15:56
閱讀 1480·2019-08-30 15:52
閱讀 2042·2019-08-30 15:44
閱讀 431·2019-08-29 16:29
閱讀 1538·2019-08-29 11:06