国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

深入nodejs中流(stream)的理解

tianyu / 3017人閱讀

摘要:等文件一旦打開,立刻執行寫入操作發射一個緩存區清空的事件自定義可寫流為了實現可寫流,我們需要使用流模塊中的構造函數。

流的基本概念及理解
流是一種數據傳輸手段,是有順序的,有起點和終點,比如你要把數據從一個地方傳到另外一個地方
流非常重要,gulp,webpack,HTTP里的請求和響應,http里的socket都是流,包括后面壓縮,加密等

流為什么這么好用還這么重要呢?

因為有時候我們不關心文件的主體內容,只關心能不能取到數據,取到數據之后怎么進行處理

對于小型的文本文件,我們可以把文件內容全部讀入內存,然后再寫入文件,比如grunt-file-copy

對于體積較大的二進制文件,比如音頻、視頻文件,動輒幾個GB大小,如果使用這種方法,很容易使內存“爆倉”。

理想的方法應該是讀一部分,寫一部分,不管文件有多大,只要時間允許,總會處理完成,這里就需要用到流的概念

流是一個抽象接口,被Node中很多對象所實現,比如HTTP服務器request和response對象都是流

Node.js 中有四種基本的流類型:

Readable - 可讀的流 (例如 fs.createReadStream()).

Writable - 可寫的流 (例如 fs.createWriteStream()).

Duplex - 可讀寫的流 (例如 net.Socket).

Transform - 在讀寫過程中可以修改和變換數據的 Duplex 流 (例如 zlib.createDeflate()).

可以通過 require("stream") 加載 Stream 基類。其中包括了 Readable 流、Writable 流、Duplex 流和 Transform 流的基類
Readable streams可讀流
可讀流(Readable streams)是對提供數據的 源頭(source)的抽象
可讀流的例子包括:

HTTP responses, on the client :客戶端請求

HTTP requests, on the server :服務端請求

fs read streams :讀文件

zlib streams :壓縮

crypto streams :加密

TCP sockets :TCP協議

child process stdout and stderr :子進程標準輸出和錯誤輸出

process.stdin :標準輸入

所有的 Readable 都實現了 stream.Readable 類定義的接口

通過流讀取數據

用Readable創建對象readable后,便得到了一個可讀流

如果實現_read方法,就將流連接到一個底層數據源

流通過調用_read向底層請求數據,底層再調用流的push方法將需要的數據傳遞過來

當readable連接了數據源后,下游便可以調用readable.read(n)向流請求數據,同時監聽readable的data事件來接收取到的數據

下面簡單舉個可讀流的例子:

監聽可讀流的data事件,當你一旦開始監聽data事件的時候,流就可以讀文件的內容并且發射data,讀一點發射一點讀一點發射一點

默認情況下,當你監聽data事件之后,會不停的讀數據,然后觸發data事件,觸發完data事件后再次讀數據

讀的時候不是把文件整體內容讀出來再發射出來的,而且設置一個緩沖區,大小默認是64K,比如文件是128K,先讀64K發射出來,再讀64K在發射出來,會發射兩次

緩沖區的大小可以通過highWaterMark來設置

let fs = require("fs");
//通過創建一個可讀流
let rs = fs.createReadStream("./1.txt",{
    flags:"r",//我們要對文件進行何種操作
    mode:0o666,//權限位
    encoding:"utf8",//不傳默認為buffer,顯示為字符串
    start:3,//從索引為3的位置開始讀
    //這是我的見過唯一一個包括結束索引的
    end:8,//讀到索引為8結束
    highWaterMark:3//緩沖區大小
});
rs.on("open",function () {
    console.log("文件打開");
});
rs.setEncoding("utf8");//顯示為字符串
//希望流有一個暫停和恢復觸發的機制
rs.on("data",function (data) {
    console.log(data);
    rs.pause();//暫停讀取和發射data事件
    setTimeout(function(){
        rs.resume();//恢復讀取并觸發data事件
    },2000);
});
//如果讀取文件出錯了,會觸發error事件
rs.on("error",function () {
    console.log("error");
});
//如果文件的內容讀完了,會觸發end事件
rs.on("end",function () {
    console.log("讀完了");
});
rs.on("close",function () {
    console.log("文件關閉");
});

/**
文件打開
334
455
讀完了
文件關閉
**/
可讀流的簡單實現
let fs = require("fs");
let ReadStream = require("./ReadStream");
let rs = ReadStream("./1.txt", {
    flags: "r",
    encoding: "utf8",
    start: 3,
    end: 7,
    highWaterMark: 3
});
rs.on("open", function () {
    console.log("open");
});
rs.on("data", function (data) {
    console.log(data);
});
rs.on("end", function () {
    console.log("end");
});
rs.on("close", function () {
    console.log("close");
});
/**
 open
 456
 789
 end
 close
 **/
let fs = require("fs");
let EventEmitter = require("events");

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.flags = options.flags || "r";
        this.encoding = options.encoding;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end;
        this.pos = this.start;
        this.autoClose = options.autoClose || true;
        this.bytesRead = 0;
        this.closed = false;
        this.flowing;
        this.needReadable = false;
        this.length = 0;
        this.buffers = [];
        this.on("end", function () {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.on("newListener", (type) => {
            if (type == "data") {
                this.flowing = true;
                this.read();
            }
            if (type == "readable") {
                this.read(0);
            }
        });
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit("error", err);
                }
            }
            this.fd = fd;
            this.emit("open");
        });
    }

    read(n) {
        if (typeof this.fd != "number") {
            return this.once("open", () => this.read());
        }
        n = parseInt(n, 10);
        if (n != n) {
            n = this.length;
        }
        if (this.length == 0)
            this.needReadable = true;
        let ret;
        if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }

        let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit("readable");
                        }

                        this.emit("end");
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit("readable");
                            this.needReadable = false;
                        }

                    }
                } else {
                    if (this.needReadable) {
                        this.emit("readable");
                    }
                    return this.emit("end");
                }
            })
        }
        if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read(0);
        }
        return ret;
    }

    destroy() {
        fs.close(this.fd, (err) => {
            this.emit("close");
        });
    }

    pause() {
        this.flowing = false;
    }

    resume() {
        this.flowing = true;
        this.read();
    }

    pipe(dest) {
        this.on("data", (data) => {
            let flag = dest.write(data);
            if (!flag) this.pause();
        });
        dest.on("drain", () => {
            this.resume();
        });
        this.on("end", () => {
            dest.end();
        });
    }
}
module.exports = ReadStream;
自定義可讀流
為了實現可讀流,引用Readable接口并用它構造新對象

我們可以直接把供使用的數據push出去。

當push一個null對象就意味著我們想發出信號——這個流沒有更多數據了

var stream = require("stream");
var util = require("util");
util.inherits(Counter, stream.Readable);
function Counter(options) {
    stream.Readable.call(this, options);
    this._index = 0;
}
Counter.prototype._read = function() {
    if(this._index++<3){
        this.push(this._index+"");
    }else{
        this.push(null);
    }
};
var counter = new Counter();

counter.on("data", function(data){
    console.log("讀到數據: " + data.toString());//no maybe
});
counter.on("end", function(data){
    console.log("讀完了");
});
可讀流的兩種模式
Readable Stream 存在兩種模式(flowing mode 與 paused mode),這兩種模式決定了chunk數據流動的方式---自動流動還是手工流動。那如何觸發這兩種模式呢:

flowing mode: 注冊事件data、調用resume方法、調用pipe方法

paused mode: 調用pause方法(沒有pipe方法)、移除data事件 && unpipe所有pipe

如果 Readable 切換到 flowing 模式,且沒有消費者處理流中的數據,這些數據將會丟失。 比如, 調用了 readable.resume() 方法卻沒有監聽 "data" 事件,或是取消了 "data" 事件監聽,就有可能出現這種情況

可讀流的三種狀態

在任意時刻,任意可讀流應確切處于下面三種狀態之一:

readable._readableState.flowing = null

readable._readableState.flowing = false

readable._readableState.flowing = true

兩種模式取決于可讀流flowing狀態:

若為true : flowing mode;

若為false : paused mode

flowing mode

通過注冊data、pipe、resume可以自動獲取所需要的數據,我們來看下源碼的實現
// data事件觸發flowing mode
 if (ev === "data") {
    // Start flowing on next tick if stream isn"t explicitly paused
    if (this._readableState.flowing !== false)
      this.resume();
  } else if (ev === "readable") {
    const state = this._readableState;
    if (!state.endEmitted && !state.readableListening) {
      state.readableListening = state.needReadable = true;
      state.emittedReadable = false;
      if (!state.reading) {
        process.nextTick(nReadingNextTick, this);
      } else if (state.length) {
        emitReadable(this);
      }
    }
  }

// resume觸發flowing mode
Readable.prototype.resume = function() {
    var state = this._readableState;
    if (!state.flowing) {
        debug("resume");
        state.flowing = true;
    resume(this, state);
  }
  return this;
}

// pipe方法觸發flowing模式
Readable.prototype.resume = function() {
    if (!state.flowing) {
        this.resume()
    }
}
flowing mode的三種方法最后均是通過resume方法,將狀態變為true:state.flowing = true

paused mode

在paused mode下,需要手動地讀取數據,并且可以直接指定讀取數據的長度
可以通過監聽事件readable,觸發時手工讀取chunk數據:

當你監聽 readable事件的時候,會進入暫停模式

當監聽readable事件的時候,可讀流會馬上去向底層讀取文件,然后把讀到文件的文件放在緩存區里const state = this._readableState;

self.read(0); 只填充緩存,但是并不會發射data事件,但是會發射stream.emit("readable");事件

this._read(state.highWaterMark); 每次調用底層的方法讀取的時候是讀取3個字節

let fs = require("fs");
let rs = fs.createReadStream("./1.txt",{
    highWaterMark:3
});
rs.on("readable",function(){
    console.log(rs._readableState.length);
    //read如果不加參數表示讀取整個緩存區數據
    //讀取一個字段,如果可讀流發現你要讀的字節小于等于緩存字節大小,則直接返回
    let chunk = rs.read(1);
    console.log(chunk);
    console.log(rs._readableState.length);
    //當你讀完指定的字節后,如果可讀流發現剩下的字節已經比最高水位線小了。則會立馬再次讀取填滿 最高水位線
    setTimeout(function(){
        console.log(rs._readableState.length);
    },200)
});
注意:一旦注冊了readable事件,必須手工讀取read數據,否則數據就會流失,我們來看下源碼的實現
function emitReadable(stream) {
  var state = stream._readableState;
  state.needReadable = false;
  if (!state.emittedReadable) {
    debug("emitReadable", state.flowing);
    state.emittedReadable = true;
    process.nextTick(emitReadable_, stream);
  }
}

function emitReadable_(stream) {
  var state = stream._readableState;
  debug("emit readable");
  if (!state.destroyed && (state.length || state.ended)) {
    stream.emit("readable");
  }
  state.needReadable = !state.flowing && !state.ended;
  flow(stream);
}

function flow(stream) {
  const state = stream._readableState;
  debug("flow", state.flowing);
  while (state.flowing && stream.read() !== null);
}

function endReadable(stream) {
  var state = stream._readableState;
  debug("endReadable", state.endEmitted);
  if (!state.endEmitted) {
    state.ended = true;
    process.nextTick(endReadableNT, state, stream);
  }
}

Readable.prototype.read = function(n) {
  debug("read", n);
  n = parseInt(n, 10);
  var state = this._readableState;
  var nOrig = n;
  if (n !== 0)
    state.emittedReadable = false;
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    debug("read: emitReadable", state.length, state.ended);
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  n = howMuchToRead(n, state);
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }
flow方法直接read數據,將得到的數據通過事件data交付出去,然而此處沒有注冊data事件監控,因此,得到的chunk數據并沒有交付給任何對象,這樣數據就白白流失了,所以在觸發emit("readable")時,需要提前read數據
Writable streams可寫流
可寫流是對數據寫入"目的地"的一種抽象
Writable:可寫流的例子包括了:

HTTP requests, on the client 客戶端請求

HTTP responses, on the server 服務器響應

fs write streams 文件

zlib streams 壓縮

crypto streams 加密

TCP sockets TCP服務器

child process stdin 子進程標準輸入

process.stdout, process.stderr 標準輸出,錯誤輸出

下面舉個可寫流的簡單例子

當你往可寫流里寫數據的時候,不是會立刻寫入文件的,而是會很寫入緩存區,緩存區的大小就是highWaterMark,默認值是16K。然后等緩存區滿了之后再次真正的寫入文件里

let fs = require("fs");
let ws = fs.createWriteStream("./2.txt",{
   flags:"w",
   mode:0o666,
   start:3,
   highWaterMark:3//默認是16K
});

如果緩存區已滿 ,返回false,如果緩存區未滿,返回true

如果能接著寫,返回true,如果不能接著寫,返回false

按理說如果返回了false,就不能再往里面寫了,但是如果你真寫了,如果也不會丟失,會緩存在內存里。等緩存區清空之后再從內存里讀出來

let flag = ws.write("1");
console.log(flag);//true
flag =ws.write("2");
console.log(flag);//true
flag =ws.write("3");
console.log(flag);//false
flag =ws.write("4");
console.log(flag);//false

"drain" 事件

如果調用 stream.write(chunk) 方法返回 false,流將在適當的時機觸發 "drain" 事件,這時才可以繼續向流中寫入數據

當一個流不處在 drain 的狀態, 對 write() 的調用會緩存數據塊, 并且返回 false。 一旦所有當前所有緩存的數據塊都排空了(被操作系統接受來進行輸出), 那么 "drain" 事件就會被觸發

建議, 一旦 write() 返回 false, 在 "drain" 事件觸發前, 不能寫入任何數據塊

舉個簡單的例子說明一下:

let fs = require("fs");
let ws = fs.createWriteStream("2.txt",{
    flags:"w",
    mode:0o666,
    start:0,
    highWaterMark:3
});
let count = 9;
function write(){
 let flag = true;//緩存區未滿
    //寫入方法是同步的,但是寫入文件的過程是異步的。在真正寫入文件后還會執行我們的回調函數
 while(flag && count>0){
     console.log("before",count);
     flag = ws.write((count)+"","utf8",(function (i) {
         return ()=>console.log("after",i);
     })(count));
     count--;
 }
}
write();//987
//監聽緩存區清空事件
ws.on("drain",function () {
    console.log("drain");
    write();//654 321
});
ws.on("error",function (err) {
    console.log(err);
});
/**
before 9
before 8
before 7
after 9
after 8
after 7
**/
如果已經不再需要寫入了,可以調用end方法關閉寫入流,一旦調用end方法之后則不能再寫入
比如在ws.end();后寫ws.write("x");,會報錯write after end

"pipe"事件

linux精典的管道的概念,前者的輸出是后者的輸入

pipe是一種最簡單直接的方法連接兩個stream,內部實現了數據傳遞的整個過程,在開發的時候不需要關注內部數據的流動

這個方法從可讀流拉取所有數據, 并將數據寫入到提供的目標中

自動管理流量,將數據的滯留量限制到一個可接受的水平,以使得不同速度的來源和目標不會淹沒可用內存

默認情況下,當源數據流觸發 end的時候調用end(),所以寫入數據的目標不可再寫。傳 { end:false }作為options,可以保持目標流打開狀態

pipe方法的原理

var fs = require("fs");
var ws = fs.createWriteStream("./2.txt");
var rs = fs.createReadStream("./1.txt");
rs.on("data", function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on("drain", function () {
    rs.resume();
});
rs.on("end", function () {
    ws.end();
});
下面舉個簡單的例子說明一下pipe的用法:
let fs = require("fs");
let rs = fs.createReadStream("./1.txt",{
  highWaterMark:3
});
let ws = fs.createWriteStream("./2.txt",{
    highWaterMark:3
});
rs.pipe(ws);
//移除目標可寫流
rs.unpipe(ws);

當監聽可讀流data事件的時候會觸發回調函數的執行

可以實現數據的生產者和消費者速度的均衡

rs.on("data",function (data) {
    console.log(data);
    let flag = ws.write(data);
   if(!flag){
       rs.pause();
   }
});

監聽可寫流緩存區清空事件,當所有要寫入的數據寫入完成后,接著恢復從可讀流里讀取并觸發data事件

ws.on("drain",function () {
    console.log("drain");
    rs.resume();
});

unpipe

readable.unpipe()方法將之前通過stream.pipe()方法綁定的流分離

如果寫入的目標沒有傳入, 則所有綁定的流都會被分離

如果指定了寫入的目標,但是沒有綁定流,則什么事情都不會發生

簡單距離說明下unpipe的用法:
let fs = require("fs");
var from = fs.createReadStream("./1.txt");
var to = fs.createWriteStream("./2.txt");
from.pipe(to);
setTimeout(() => {
console.log("關閉向2.txt的寫入");
from.unpipe(writable);
console.log("手工關閉文件流");
to.end();
}, 1000);
pipe的簡單實現
let fs = require("fs");
let ReadStream = require("./ReadStream");
let rs = ReadStream("./1.txt", {
    flags: "r",
    encoding: "utf8",
    highWaterMark: 3
});
let FileWriteStream = require("./WriteStream");
let ws = FileWriteStream("./2.txt",{
    flags:"w",
    encoding:"utf8",
    highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
    this.on("data", (data)=>{
        let flag = dest.write(data);
        if(!flag){
            this.pause();
        }
    });
    dest.on("drain", ()=>{
        this.resume();
    });
    this.on("end", ()=>{
        dest.end();
    });
}
ReadStream.prototype.pause = function(){
    this.flowing = false;

}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}
自定義管道流
const stream = require("stream")

var index = 0;
const readable = stream.Readable({
    highWaterMark: 2,
    read: function () {
        process.nextTick(() => {
            console.log("push", ++index)
            this.push(index+"");
        })
    }
})
const writable = stream.Writable({
    highWaterMark: 2,
    write: function (chunk, encoding, next) {
        console.log("寫入:", chunk.toString())
    }
})
readable.pipe(writable);
可寫流的簡單實現
let fs = require("fs");
 let FileWriteStream = require("./FileWriteStream");
 let ws = FileWriteStream("./2.txt",{
     flags:"w",
     encoding:"utf8",
     highWaterMark:3
 });
 let i = 10;
 function write(){
     let  flag = true;
     while(i&&flag){
         flag = ws.write("1","utf8",(function(i){
             return function(){
                 console.log(i);
             }
         })(i));
         i--;
         console.log(flag);
     }
 }
 write();
 ws.on("drain",()=>{
     console.log("drain");
     write();
 });
 /**
  10
  9
  8
  drain
  7
  6
  5
  drain
  4
  3
  2
  drain
  1
  **/
let EventEmitter = require("events");
let util = require("util");
let fs = require("fs");
util.inherits(WriteStream, EventEmitter);

function WriteStream(path, options) {
    EventEmitter.call(this);
    if (!(this instanceof WriteStream)) {
        return new WriteStream(path, options);
    }
    this.path = path;
    this.fd = options.fd;
    this.encoding = options.encoding||"utf8";
    this.flags = options.flags || "w";
    this.mode = options.mode || 0o666;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.pos = this.start;//開始寫入的索引位置
    this.open();//打開文件進行操作
    this.writing = false;//沒有在寫入過程 中
    this.buffers = [];
    this.highWaterMark = options.highWaterMark||16*1024;
    //如果監聽到end事件,而且要求自動關閉的話則關閉文件
    this.on("end", function () {
        if (this.autoClose) {
            this.destroy()
        }
    });
}
WriteStream.prototype.close = function(){
    fs.close(this.fd,(err)=>{
        if(err)
            this.emit("error",err);
    });
}
WriteStream.prototype.open = function () {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err)
            return this.emit("error", err);
        this.fd = fd;//把文件描述符賦給當前實例的fd屬性
        //發射open事件
        this.emit("open", fd);
    });
}
/**
 * 會判斷當前是后臺是否在寫入過程中,如果在寫入過程中,則把這個數據放在待處理的緩存中,如果不在寫入過程中,可以直接寫。
 */
WriteStream.prototype.write = function (chunk, encoding, cb) {
    chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);

    //先把數據放在緩存里
    this.buffers.push({
        chunk,
        encoding,
        cb
    });

    let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark;
    //只有當緩存區寫滿了,那么清空緩存區的時候才會發射drain事件,否則 不發放
    this.needDrain = isFull;
    //如果說文件還沒有打開,則把寫入的方法壓入open事件的監聽函數。等文件一旦打開,立刻執行寫入操作
    if (typeof this.fd !== "number") {
         this.once("open", () => {
            this._write();
        });
        return !isFull;
    }else{
        if(!this.writing){
            setImmediate(()=>{
                this._write();
                this.writing = true;
            });
        }

        return !isFull;
    }
}
WriteStream.prototype._write = function () {
    let part = this.buffers.shift();
    if (part) {
        fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{
            if(err)return this.emit("error",err);
            part.cb && part.cb();
            this._write();
        });
    }else{
        //發射一個緩存區清空的事件
        this.emit("drain");
        this.writing = false;
    }
}
module.exports = WriteStream;
自定義可寫流
為了實現可寫流,我們需要使用流模塊中的Writable構造函數。 我們只需給Writable構造函數傳遞一些選項并創建一個對象。唯一需要的選項是write函數,該函數揭露數據塊要往哪里寫

chunk通常是一個buffer,除非我們配置不同的流。

encoding是在特定情況下需要的參數,通常我們可以忽略它。

callback是在完成處理數據塊后需要調用的函數。這是寫數據成功與否的標志。若要發出故障信號,請用錯誤對象調用回調函數

var stream = require("stream");
var util = require("util");
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
    stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
    setTimeout(()=>{
        stock.push(chunk.toString("utf8"));
        console.log("增加: " + chunk);
        callback();
    },500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
    w.write("項目:" + i, "utf8");
}
w.end("結束寫入",function(){
    console.log(stock);
});
Duplex streams可讀寫的流(雙工流)
Duplex 流是同時實現了 Readable 和 Writable 接口的流
雙工流的可讀性和可寫性操作完全獨立于彼此,這僅僅是將兩個特性組合成一個對象

Duplex 流的實例包括了:

TCP sockets

zlib streams

crypto streams

下面簡單實現雙工流:
const {Duplex} = require("stream");
const inoutStream = new Duplex({
    write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback();
    },
    read(size) {
        this.push((++this.index)+"");
        if (this.index > 3) {
            this.push(null);
        }
    }
});

inoutStream.index = 0;
process.stdin.pipe(inoutStream).pipe(process.stdout);
Transform streams轉換流
變換流(Transform streams) 是一種 Duplex 流。它的輸出與輸入是通過某種方式關聯的。和所有 Duplex 流一樣,變換流同時實現了 Readable 和 Writable 接口

轉換流的輸出是從輸入中計算出來的
對于轉換流,我們不必實現read或write的方法,我們只需要實現一個transform方法,將兩者結合起來。它有write方法的意思,我們也可以用它來push數據

變換流的實例包括:

zlib streams

crypto streams

下面簡單實現轉換流:
const {Transform} = require("stream");
const upperCase = new Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
});
process.stdin.pipe(upperCase).pipe(process.stdout);
對象流
默認情況下,流處理的數據是Buffer/String類型的值。有一個objectMode標志,我們可以設置它讓流可以接受任何JavaScript對象
const {Transform} = require("stream");
let fs = require("fs");
let rs = fs.createReadStream("./users.json");
rs.setEncoding("utf8");
let toJson = Transform({
    readableObjectMode: true,
    transform(chunk, encoding, callback) {
        this.push(JSON.parse(chunk));
        callback();
    }
});
let jsonOut = Transform({
    writableObjectMode: true,
    transform(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});
rs.pipe(toJson).pipe(jsonOut);

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/107224.html

相關文章

  • Node.js 中流操作實踐

    摘要:事件的觸發頻次同樣是由實現者決定,譬如在進行文件讀取時,可能每行都會觸發一次而在請求處理時,可能數的數據才會觸發一次。如果有參數傳入,它會讓可讀流停止流向某個特定的目的地,否則,它會移除所有目的地。 showImg(https://segmentfault.com/img/remote/1460000016328758?w=1967&h=821); 本文節選自 Node.js Chea...

    chaos_G 評論0 收藏0
  • Node.js中流使用

    摘要:流是基于事件的用于管理和處理數據而且有不錯的效率借助事件和非阻塞庫流模塊允許在其可用的時候動態處理在其不需要的時候釋放掉使用流的好處舉一個讀取文件的例子使用同步讀取一個文件程序會被阻塞所有的數據都會被讀取到內存中換用讀取文件程序不會被阻塞但 流是基于事件的API,用于管理和處理數據,而且有不錯的效率.借助事件和非阻塞I/O庫,流模塊允許在其可用的時候動態處理,在其不需要的時候釋放掉. ...

    h9911 評論0 收藏0
  • 重讀 Gulp

    摘要:當接收一個回調函數的時候,一定要注意回調函數中的參數。主要作用就是用來讀取文件或者文件夾中的數據。表示文件的名稱指的是發生的變化使用技巧的進一步使用,可以參照中文官網中的技巧集。 Gulp 簡介 Gulp 對現在的前端而言,是一個稍微老舊的工具了,但是,為了復習以前學過的內容,還是把它翻出來,放在自己的博客中。說不定哪天又用到了呢。 需要說明的是,這里使用的 Gulp 版本是 3.9....

    vpants 評論0 收藏0
  • Node事件機制小記

    摘要:事件的監聽與事件的觸發事件一事件機制的實現中大部分的模塊,都繼承自模塊。從另一個角度來看,事件偵聽器模式也是一種事件鉤子的機制,利用事件鉤子導出內部數據或狀態給外部調用者。的核心就是事件發射與事件監聽器功能的封裝。 nodejs事件的監聽與事件的觸發 nodejs事件(Events)showImg(https://segmentfault.com/img/bV0Sqi?w=692&h=...

    airborne007 評論0 收藏0
  • 通過源碼解析 Node.js 中導流(pipe)實現

    摘要:回調函數中檢測該次寫入是否被緩沖,若是,觸發事件。若目標可寫流表示該寫入操作需要進行緩沖,則立刻將源可讀流切換至暫停模式。監聽源可讀流的事件,相應地結束目標可寫流。 在Node.js中,流(Stream)是其眾多原生對象的基類,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數據處理和傳遞。在它對外暴露的接口中,最為神奇的,莫過于導流(pipe)方法了。鑒于近期自己正在閱讀Node...

    defcon 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<