摘要:的讀寫流模塊提供來創建行讀取流,即讀取文件的每一行作為持續的輸入數據模塊提供來創建寫入流,其返回的有和兩個方法,來完成流式的寫入與結束寫入。
前幾天接到任務要使用第三方API處理幾千張圖片,得到結果集。我的做法就是使用Rx.js結合node的讀寫流來完成數據讀入、接口請求、數據處理、數據寫入這些操作。本篇就來分享這個代碼和其邏輯。
Rx.js是什么Rx.js是一個響應式編程庫,能簡化事件/異步處理邏輯代碼。其視所有的事件/數據為__流__,提供各種流處理的operators,將輸入與輸出平滑的鏈接起來。可以類比為linux上的pipe操作符: ls | grep a*b | less。
Node的讀寫流readline模塊提供readline.createInterface來創建行讀取流,即讀取文件的每一行作為持續的輸入數據
fs模塊提供fs.createWriteStream來創建寫入流, 其返回的writer有write和end兩個方法,來完成流式的寫入與結束寫入。
第三方接口的使用情況并發數有限制,3個是出現其出現并發錯誤概率最低的最大并發數
接口請求過于頻繁,會較大概率出現連續的并發錯誤, 大概延遲400秒效果尚可
提供給第三方的圖片是鏈接,其需要服務器自己下載,會出現操作超時或者長時間不返回的情況。
任務列表從文件讀取圖片文件名
拼接url
發送3個并發請求
請求出現超時問題重試3次,最后如果失敗則放棄
出現非超時錯誤(如并發錯誤等)則一直重試,直到成功
請求成功后延遲400秒繼續發起下一個請求
處理返回的數據
寫入文件
代碼分析 引入依賴,創建讀取與寫入流const https = require("https"); const querystring = require("querystring"); const Rx = require("rxjs"); const readline = require("readline"); const fs = require("fs"); const imgStream = readline.createInterface({ // 創建行讀取流 input: fs.createReadStream("filelist.txt") }); const writeStream = fs.createWriteStream("output.txt"); // 創建寫入流使用Rx處理讀取并反饋結果給寫入
Rx.Observable.fromEvent(imgStream, "line") // 將行讀取流轉化為Rx的事件流 .takeUntil(Rx.Observable.fromEvent(imgStream, "close")) // 讀取流截止時終止Rx流 .map(img => generateData(img)) // 將文件名處理成post的數據 // 發起請求,并發3個,請求返回后延遲400ms后再進行下一步處理并發起下一個請求 .mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) .subscribe(data => { // 處理數據并寫入文件 let str = data.url; if (data.status === 200 && data.data.xxx.length) { zzz = data.data.xxx.map(x => x.zzz); str += ` ${JSON.stringify(zzz)}`; } writeStream.write(`${str} `); }, (err) => { console.log(err); console.log("!!!!!!!!!!!ERROR!!!!!!!!!"); }, () => { console.log("=====complete======"); writeStream.end(); });
其中的需要關注的點在.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) ,這里內部requestAPI返回一個封裝了http異步請求并延遲400ms的Rx流,當請求完成并延遲完成后將數據返回上一層繼續進行處理(可以類比為Promise的then)
使用Rx的自定義流封裝一個帶錯誤重試機制的http請求const requestFacepp = dataStr => { const options = { hostname: "api.xxx.com", port: 443, path: "/xxx", method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Content-Length": Buffer.byteLength(dataStr) } }; const reqData = querystring.parse(dataStr); const retry$ = new Rx.Subject(); // 觸發重試的流,當其發出數據時會使`retryWhen`觸發重試錯誤流 let retryTimes = 3; // 設置非正常失敗(超時)重試的上限 // 使用Rx的自定義流封裝一個帶錯誤重試機制的http請求,可以類比為new Promise // 但要注意的是Rx是流,即數據是可以持續的,而Promise則只有一個結果和狀態 return Rx.Observable.create(observer => { const req = https.request(options, res => { let data = ""; res.setEncoding("utf8"); res.on("data", chunk => { data += chunk; }); res.on("end", () => { if (res.statusCode === 200) { // 請求正常返回,向流內推送結果并結束 observer.next({ status: res.statusCode, url: reqData.image_url, data: JSON.parse(data) }); observer.complete(); } else { // 請求正常返回,但不是正常結果,拋出錯誤并重試 console.log(`retring: ${reqData.image_url}`); observer.error({ status: res.statusCode, url: reqData.image_url }); retry$.next(true); } }); }); req.setTimeout(4000, () => { // 設置請求4s超時,超時后終止,引發請求拋錯 req.abort(); }); req.on("error", err => { console.log(`retring(${retryTimes}): ${reqData.image_url}`); // 請求拋錯時重試,超出次數則終止本次請求 observer.error(`error: ${err.message}`); if (retryTimes > 0) { retryTimes--; retry$.next(true); } else { retry$.complete(); } }); req.write(dataStr); req.end(); return () => { req.abort() }; // 返回終止流的處理回調 }) .retryWhen(errs => errs.switchMap(err => { // 未超過次數返回重試流,超出則返回錯誤數據并終止本次Rx流 return retryTimes > 0 ? retry$ : Rx.Observable.of({ status: 500, url: reqData.image_url }); })); };收尾
到此就搬磚完畢,開個車讓他慢慢跑就可以了。
本篇展示了Rx在流數據處理與異步處理上的方式,邏輯與代碼都挺清晰、扁平。在處理交雜的邏輯時也不錯(重試部分)。如果喜歡或者有幫助的話可以后面在發一篇Rx在復雜DOM事件處理上的應用。;-)
本文始發于本人的公眾號:楓之葉。公眾號二維碼
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/83331.html
摘要:端輸入數據到端,對就是輸入流,得到的對象就是可讀流對就是輸出端得到的對象是可寫流。在中,這四種流都是的實例,它們都有事件,可讀流具有監聽數據到來的事件等,可寫流則具有監聽數據已傳給低層系統的事件等,和都同時實現了和的事件和接口。 原文地址在我的博客 node中的Buffer和Stream會給剛接觸Node的前端工程師們帶來困惑,原因是前端并沒有類似概念(or 有我們也沒意識到)。然而,...
摘要:中各種用于讀取數據的對象對象描述用于讀取文件代表客戶端請求或服務器端響應代表一個端口對象用于創建子進程的標準輸出流。如果子進程和父進程共享輸入輸出流,則子進程的標準輸出流被廢棄用于創建子進程的標準錯誤輸出流。 9. stream流 fs模塊中集中文件讀寫方法的區別 用途 使用異步方式 使用同步方式 將文件完整讀入緩存區 readFile readFileSync 將文件部...
摘要:為了防止某些文檔或腳本加載別的域下的未知內容,防止造成泄露隱私,破壞系統等行為發生。模式構建函數響應式前端架構過程中學到的經驗模式的不同之處在于,它主要專注于恰當地實現應用程序狀態突變。嚴重情況下,會造成惡意的流量劫持等問題。 今天是編輯周刊的日子。所以文章很多和周刊一樣。微信不能發鏈接,點了也木有用,所以請記得閱讀原文~ 發個動圖娛樂下: 使用 SVG 動畫制作游戲 使用 GASP ...
摘要:是在完成處理數據塊后需要調用的函數。這是寫數據成功與否的標志。若要發出故障信號,請用錯誤對象調用回調函數。雙工流的可讀性和可寫性操作完全獨立于彼此。這僅僅是將兩個特性組合成一個對象。 showImg(https://segmentfault.com/img/remote/1460000013228112?w=533&h=300); Streams 是一個數據集——和數組、字符串一樣。不...
摘要:內部架構上圖表示一個實例的組成部分部分緩沖數組內部函數部分緩沖鏈表內部函數實例必須實現的內部函數以及系統提供的回調函數。有三個參數,第一個為待處理的數據,第二個為編碼,第三個為回調函數。 Transform流特性 在開發中直接接觸Transform流的情況不是很多,往往是使用相對成熟的模塊或者封裝的API來完成流的處理,最為特殊的莫過于through2模塊和gulp流操作。那么,Tra...
閱讀 3514·2023-04-25 20:09
閱讀 3720·2022-06-28 19:00
閱讀 3035·2022-06-28 19:00
閱讀 3058·2022-06-28 19:00
閱讀 3131·2022-06-28 19:00
閱讀 2859·2022-06-28 19:00
閱讀 3014·2022-06-28 19:00
閱讀 2610·2022-06-28 19:00