摘要:會盡最大可能容錯,保證數據上傳上去。多線程上傳示例多線程上傳時,每個線程只需要打開一個往里面寫數據就行了。多個進程共享由于一個的上傳狀態是通過維護一個實現的,對于多線程程序來講,通過鎖很容易實現資源的分配。
摘要: MaxCompute 的數據上傳接口(Tunnel)定義了數據 block 的概念:一個 block 對應一個 http request,多個 block 的上傳可以并發而且是原子的,一次同步請求要么成功要么失敗,不會污染其他的 block。這種設計對于服務端來講十分簡潔,但是也把記錄狀態做 fa.
本文用到的
阿里云數加-大數據計算服務MaxCompute產品地址:https://www.aliyun.com/produc...
MaxCompute 的數據上傳接口(Tunnel)定義了數據 block 的概念:一個 block 對應一個 http request,多個 block 的上傳可以并發而且是原子的,一次同步請求要么成功要么失敗,不會污染其他的 block。這種設計對于服務端來講十分簡潔,但是也把記錄狀態做 failover 的工作交給了客戶端。
用戶在使用 Tunnel SDK 編程時,需要對 block 這一層的語義進行認知,并且驅動數據上傳的整個過程[1],并且自己進行容錯,畢竟『網絡錯誤是正常而不是異?!?。由于用戶文檔中并沒有強調這一點的重要性,導致很多用戶踩了坑,一種常見的出錯場景是,當客戶端寫數據的速度過慢,兩次 write 的間隔超時[2],導致整個 block 上傳失敗。
High Level API
MaxCompute Java SDK 在 0.21.3-public 之后新增了 BufferredWriter 這個更高層的 API,簡化了數據上傳的過程,并且提供了容錯的功能。 BufferedWriter 對用戶隱藏了 block 這個概念,從用戶角度看,就是在 session 上打開一個 writer 然后往里面寫記錄即可:
RecordWriter writer = null; try { int i = 0; writer = uploadSession.openBufferedWriter(); Record product = uploadSession.newRecord(); for (String item : items) { product.setString("name", item); product.setBigint("id", i); writer.write(product); i += 1; } } finally { if (writer != null) { writer.close(); } } uploadSession.commit();
具體實現時 BufferedWriter 先將記錄緩存在客戶端的緩沖區中,并在緩沖區填滿之后打開一個 http 連接進行上傳。BufferedWriter 會盡最大可能容錯,保證數據上傳上去。
由于屏蔽了底層細節,這個接口可能并不適合數據預劃分、斷點續傳、分批次上傳等需要細粒度控制的場景。
多線程上傳示例
多線程上傳時,每個線程只需要打開一個 writer 往里面寫數據就行了。
class UploadThread extends Thread { private UploadSession session; private static int RECORD_COUNT = 1200; public UploadThread(UploadSession session) { this.session = session; } @Override public void run() { RecordWriter writer = up.openBufferedWriter(); Record r = up.newRecord(); for (int i = 0; i < RECORD_COUNT; i++) { r.setBigint(0, i); writer.write(r); } writer.close(); } }; public class Example { public static void main(String args[]) { // 初始化 MaxCompute 和 tunnel 的代碼 TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(projectName, tableName); UploadThread t1 = new UploadThread(up); UploadThread t2 = new UploadThread(up); t1.start(); t2.start(); t1.join(); t2.join(); uploadSession.commit(); }
更多控制
重試策略
由于底層在上傳出錯時會回避一段固定的時間并進行重試,但如果你的程序不想花太多時間在重試上,或者你的程序位于一個極其惡劣的網絡環境中,為此 TunnelBufferedWriter 允許用戶配置重試策略。
用戶可以選擇三種重試回避策略:指數回避(EXPONENTIAL_BACKOFF)、線性時間回避(LINEAR_BACKOFF)、常數時間回避(CONSTANT_BACKOFF)。
例如下面這段代碼可以將,write 的重試次數調整為 6,每一次重試之前先分別回避 4s、8s、16s、32s、64s 和 128s(從 4 開始的指數遞增的序列)。
RetryStrategy retry = new RetryStrategy(6, 4, RetryStrategy.BackoffStrategy.EXPONENTIAL_BACKOFF) writer = (TunnelBufferedWriter) uploadSession.openBufferedWriter(); writer.setRetryStrategy(retry);
緩沖區控制
如果你的程序對 JVM 的內存有嚴格的要求,可以通過下面這個接口修改緩沖區占內存的字節數(bytes):
writer.setBufferSize(1024*1024);
默認配置每一個 Writer 的 BufferSize 是 10 MiB。TunnelBufferedWriter 一次 flush buffer 的操作上傳一個 block 的數據[3]。
多個進程共享 Session
由于一個 Session 的上傳狀態是通過維護一個 block list 實現的,對于多線程程序來講,通過鎖很容易實現資源的分配。但對于兩個進程空間里的程序想要復用一個 Session 時,必須通過一種機制對資源進行隔離。
具體地,在 getUploadSession 的時候,必須指定這個共享這個 Session 的進程數目,以及一個用來區分進程的 global id:
//程序1:這個 session 將被兩個 writer 共享,我是其中第 0 個 TableTunnel.UploadSession up = tunnel.getUploadSession(projectName, tableName, sid, 2, 0); writer = session.openBufferedWriter(); //程序1:這個 session 將被兩個 writer 共享,我是其中第 1 個 TableTunnel.UploadSession up = tunnel.getUploadSession(projectName, tableName, sid, 2, 1); writer = session.openBufferedWriter();
Notes
[1] 一次完整的上傳流程通常包括以下步驟:
先對數據進行劃分
為每個數據塊指定 block id,即調用 openRecordWriter(id)
然后用一個或多個線程分別將這些 block 上傳上去
并在某個 block 上傳失敗以后,需要對整個 block 進行重傳
在所有 block 都上傳以后,向服務端提供上傳成功的 blockid list 進行校驗,即調用 session.commit([1,2,3,...])
[2] 因為使用長連接,服務端有計時器判斷是否客戶端是否 alive
[3] block 在服務端有 20000 個的數量上限,如果 BufferSize 設得太小會導致 20000 個 block 很快被用光
[4] Session的有效期為24小時,超過24小時會導致數據上傳失敗
原文鏈接
閱讀更多干貨好文,請關注掃描以下二維碼:
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/71017.html
摘要:摘要北京云棲大會上阿里云發布了最新的功能,萬眾期待的功能終于支持啦,我怎么能不一試為快,今天就分享如何通過進行開發。注冊函數在腳本中編輯試用好了,一個簡單完整的通過開發實踐分享完成。 摘要: 2017/12/20 北京云棲大會上阿里云MaxCompute發布了最新的功能Python UDF,萬眾期待的功能終于支持啦,我怎么能不一試為快,今天就分享如何通過Studio進行Python u...
閱讀 1481·2021-11-17 09:33
閱讀 1260·2021-10-11 10:59
閱讀 2892·2021-09-30 09:48
閱讀 1905·2021-09-30 09:47
閱讀 3024·2019-08-30 15:55
閱讀 2337·2019-08-30 15:54
閱讀 1493·2019-08-29 15:25
閱讀 1646·2019-08-29 10:57