国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專(zhuān)欄INFORMATION COLUMN

node那點(diǎn)事(一) -- Readable streams(可讀流)

rickchen / 1179人閱讀

摘要:流的類(lèi)型中有四種基本的流類(lèi)型可讀的流例如可寫(xiě)的流例如可讀寫(xiě)的流例如在讀寫(xiě)過(guò)程中可以修改和變換數(shù)據(jù)的流例如可讀流可讀流有兩種模式流動(dòng)模式可讀流自動(dòng)讀取數(shù)據(jù),通過(guò)接口的事件盡快將數(shù)據(jù)提供給應(yīng)用。

流的簡(jiǎn)介

流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口(abstract interface)。 stream 模塊提供了基礎(chǔ)的 API 。使用這些 API 可以很容易地來(lái)構(gòu)建實(shí)現(xiàn)流接口的對(duì)象。

Node.js 提供了多種流對(duì)象。 例如, HTTP 請(qǐng)求 和 process.stdout 就都是流的實(shí)例。

流可以是可讀的、可寫(xiě)的,或是可讀寫(xiě)的。所有的流都是 EventEmitter 的實(shí)例。

為什么要用流

這里我們舉一個(gè)簡(jiǎn)單的例子:

我們打算讀取一個(gè)文件,使用 fs.readFileSync 同步讀取一個(gè)文件,程序會(huì)被阻塞,所有的數(shù)據(jù)都會(huì)被讀取到內(nèi)存中。

換用 fs.readFile 讀取文件,程序不會(huì)被阻塞,但是所有的數(shù)據(jù)依舊會(huì)被一次性全部被讀取到內(nèi)存中。

當(dāng)處理大文件壓縮、歸檔、媒體文件和巨大的日志文件的時(shí)候,內(nèi)存使用就成了問(wèn)題,現(xiàn)在大家一般家用機(jī)內(nèi)存大多數(shù)都是8G、16G,軟件包還在日益增大,在這種情況下,流的優(yōu)勢(shì)就體現(xiàn)出來(lái)了。

流被設(shè)計(jì)為異步的方式,在內(nèi)存中只開(kāi)啟一個(gè)固定的空間,將文件化整為零,以流動(dòng)的方式進(jìn)行傳輸操作,解決了以上問(wèn)題。

流的類(lèi)型

Node.js 中有四種基本的流類(lèi)型:

Readable - 可讀的流 (例如 fs.createReadStream()).

Writable - 可寫(xiě)的流 (例如 fs.createWriteStream()).

Duplex - 可讀寫(xiě)的流 (例如 net.Socket).

Transform - 在讀寫(xiě)過(guò)程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate()).

可讀流(Readable Stream)

可讀流有兩種模式:

1、流動(dòng)模式(flowing):可讀流自動(dòng)讀取數(shù)據(jù),通過(guò)EventEmitter接口的事件盡快將數(shù)據(jù)提供給應(yīng)用。

2、暫停模式(paused):必須顯式調(diào)用stream.read()方法來(lái)從流中讀取數(shù)據(jù)片段。

可以通過(guò)三種途徑切換到流動(dòng)模式:

監(jiān)聽(tīng) "data" 事件

調(diào)用 stream.resume() 方法

調(diào)用 stream.pipe() 方法將數(shù)據(jù)發(fā)送到 Writable

流動(dòng)模式切換到暫停模式的api有:

如果不存在管道目標(biāo),調(diào)用stream.pause()方法

如果存在管道目標(biāo),調(diào)用 stream.unpipe()并取消"data"事件監(jiān)聽(tīng)

可讀流事件:"data","readable","error","close","end"

我們可以想象下家用熱水器的模型,熱水器的水箱(buffer緩存區(qū))里面存著熱水(數(shù)據(jù)),在我們用熱水的時(shí)候,開(kāi)啟水龍頭,自來(lái)水會(huì)不斷的進(jìn)入水箱,再?gòu)乃溆伤堫^流出來(lái)供我們使用。這就是進(jìn)入了“flowing”模式。當(dāng)我們關(guān)閉水龍頭時(shí)候,水箱則會(huì)暫停進(jìn)水,水龍頭也會(huì)暫停出水,這是就進(jìn)入了“paused”模式。

flowing模式
const fs = require("fs")
const path = require("path")
const rs = fs.createReadStream(path.join(__dirname, "./1.txt"))

rs.setEncoding("utf8")

rs.on("data", (data) => {
    console.log(data)
})
paused模式
const fs = require("fs")
const path = require("path")
const rs = fs.createReadStream(path.join(__dirname, "./1.txt"))

rs.setEncoding("utf8")

rs.on("readable", () => {
    let d = rs.read(1)
    console.log(d)
})
實(shí)現(xiàn)原理 流動(dòng)模式原理

我們來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的流動(dòng)模式下的可讀流介紹其原理,由NODEJS官方文檔可知,流繼承自EventEmitter模塊,然后我們定義一些默認(rèn)參數(shù)、緩存區(qū)、模式:

let EventEmitter = require("events");
let fs = require("fs");

class ReadStream extends EventEmitter {
    constructor(path,options) {
        super();
        this.path = path;
        this.flags = options.flags || "r";
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark|| 64*1024;
        this.start = options.start||0;
        this.end = options.end;
        this.encoding = options.encoding || null
        
        this.buffer = Buffer.alloc(this.highWaterMark);//定義緩存區(qū)大小
        
        this.pos = this.start; // pos 讀取的位置 可變 start不變的
        
        this.flowing = null; // null就是暫停模式
    }
}

module.exports = ReadStream;

接著在我們需要定義一個(gè)打開(kāi)文件的方法用于打開(kāi)文件。還有一個(gè)一個(gè)destroy方法,用于在文件操作出錯(cuò)或者讀完之后關(guān)閉文件。

open(){
    fs.open(this.path,this.flags,(err,fd)=>{
        if(err){
            this.emit("error",err);
            if(this.autoClose){ // 是否自動(dòng)關(guān)閉
                this.destroy();
            }
            return;
        }
        this.fd = fd; // 保存文件描述符
        this.emit("open"); // 文件打開(kāi)了
    });
}
 destroy(){
    // 先判斷有沒(méi)有fd 有關(guān)閉文件 觸發(fā)close事件
    if(typeof this.fd ==="number"){
        fs.close(this.fd,()=>{
            this.emit("close");
        });
        return;
    }
    this.emit("close"); // 銷(xiāo)毀
}

接著要在構(gòu)造函數(shù)中調(diào)用open方法,當(dāng)用戶(hù)綁定data監(jiān)聽(tīng)時(shí),修改可讀流的模式:

constructor(path,options){
    super();
    this.path = path;
    this.flags = options.flags || "r";
    this.autoClose = options.autoClose || true;
    this.highWaterMark = options.highWaterMark|| 64*1024;
    this.start = options.start||0;
    this.end = options.end;
    this.encoding = options.encoding || null
    this.flowing = null; 
    this.buffer = Buffer.alloc(this.highWaterMark);
    this.pos = this.start;
    
    this.open();//打開(kāi)文件 fd
    this.on("newListener",(eventName,callback)=>{
        if(eventName === "data"){
            // 相當(dāng)于用戶(hù)監(jiān)聽(tīng)了data事件
            this.flowing  = true;
            // 監(jiān)聽(tīng)了 就去讀
            this.read(); // 去讀內(nèi)容了
        }
    })
}

接下來(lái)我們實(shí)現(xiàn)最總要的read方法,首先要保證文件已經(jīng)打開(kāi),接著鍍組文件進(jìn)入緩存,觸發(fā)data事件傳入數(shù)據(jù),如果處于流動(dòng)模式,繼續(xù)讀取直到讀完文件。

read(){
    // 此時(shí)文件還沒(méi)打開(kāi)呢
    if(typeof this.fd !== "number"){
        // 當(dāng)文件真正打開(kāi)的時(shí)候 會(huì)觸發(fā)open事件,觸發(fā)事件后再執(zhí)行read,此時(shí)fd肯定有了
        return this.once("open",()=>this.read())
    }
    // 此時(shí)有fd了
    // 應(yīng)該填highWaterMark?
    // 想讀4個(gè) 寫(xiě)的是3  每次讀3個(gè)
    // 123 4
    let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark;
    fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{
        // 讀到了多少個(gè) 累加
        if(bytesRead>0){
            this.pos+= bytesRead;
            let data = this.encoding?this.buffer.slice(0,bytesRead).toString(this.encoding):this.buffer.slice(0,bytesRead);
            this.emit("data",data);
            // 當(dāng)讀取的位置 大于了末尾 就是讀取完畢了
            if(this.pos > this.end){
                this.emit("end");
                this.destroy();
            }
            if(this.flowing) { // 流動(dòng)模式繼續(xù)觸發(fā)
                this.read(); 
            }
        }else{
            this.emit("end");
            this.destroy();
        }
    });
}

剩下的pause和resume方法,很簡(jiǎn)單

resume() {
    this.flowing = true;
    this.read();
}
pause() {
    this.flowing = false;
}

簡(jiǎn)單的流實(shí)現(xiàn)完成了,看一下完整代碼

let EventEmitter = require("events");
let fs = require("fs");

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.flags = options.flags || "r";
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark|| 64*1024;
        this.start = options.start||0;
        this.end = options.end;
        this.encoding = options.encoding || null

        this.open();

        this.flowing = null; // null就是暫停模式

        this.buffer = Buffer.alloc(this.highWaterMark);

        this.pos = this.start; 
        this.on("newListener", (eventName,callback) => {
            if (eventName === "data") {
                this.flowing  = true;
                this.read(); 
            }
        })
    }
    
    read(){
        if (typeof this.fd !== "number") {
            return this.once("open", () => this.read())
        }
        let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos+1) : this.highWaterMark;
        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err,bytesRead) => {
            if (bytesRead > 0) {
                this.pos += bytesRead;
                let data = this.encoding ? this.buffer.slice(0, bytesRead).toString(this.encoding) : this.buffer.slice(0, bytesRead);
                this.emit("data", data);
                if(this.pos > this.end){
                    this.emit("end");
                    this.destroy();
                }
                if(this.flowing) { 
                    this.read(); 
                }
            }else{
                this.emit("end");
                this.destroy();
            }
        });
    }
    
    resume() {
        this.flowing = true;
        this.read();
    }
    
    pause() {
        this.flowing = false;
    }
    
    destroy() {
        if(typeof this.fd === "number") {
            fs.close(this.fd, () => {
                this.emit("close");
            });
            return;
        }
        this.emit("close"); 
    };
    
    open() {
        fs.open(this.path, this.flags, (err,fd) => {
            if (err) {
                this.emit("error", err);
                if (this.autoClose) { 
                    this.destroy();
                }
                return;
            }
            this.fd = fd; 
            this.emit("open"); 
        });
    }
}
module.exports = ReadStream;
暫停模式原理

以上是流動(dòng)模式的可讀流實(shí)現(xiàn)原理,暫停模式的可讀流原理與流動(dòng)模式的主要區(qū)別在于監(jiān)聽(tīng)readable事件的綁定與read方法,先實(shí)現(xiàn)監(jiān)聽(tīng)綁定readable事件回調(diào)函數(shù)時(shí),調(diào)用read方法讀取數(shù)據(jù)到緩存區(qū),定義一個(gè)讀取方法_read

constructor(path, options) {
    super();
    this.path = path;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.autoClose = options.autoClose || true;
    this.start = 0;
    this.end = options.end;
    this.flags = options.flags || "r";

    this.buffers = []; // 緩存區(qū) 
    this.pos = this.start;
    this.length = 0; // 緩存區(qū)大小
    this.emittedReadable = false;
    this.reading = false; // 不是正在讀取的
    this.open();
    this.on("newListener", (eventName) => {
        if (eventName === "readable") {
            this.read();
        }
    })
}

read(n) {
    if (this.length == 0) {
        this.emittedReadable = true;
    }
    if (this.length < this.highWaterMark) {
        if(!this.reading) {
            this.reading = true;
            this._read(); 
        }
    }
}

_read() {
    if (typeof this.fd !== "number") {
        return this.once("open", () => this._read());
    }
    let buffer = Buffer.alloc(this.highWaterMark);
    fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
        if (bytesRead > 0) {
            this.buffers.push(buffer.slice(0, bytesRead));
            this.pos += bytesRead;
            this.length += bytesRead;
            this.reading = false;
            if (this.emittedReadable) {
                this.emittedReadable = false; 
                this.emit("readable");
            }
        } else {
            this.emit("end");
            this.destroy();
        }
    })
}

由api可知,暫停模式下的可讀流手動(dòng)調(diào)用read方法參數(shù)可以大于highWaterMark,為了處理這種情況,我們先寫(xiě)一個(gè)函數(shù)computeNewHighWaterMark,取到大于等于n的最小2的n次方的整數(shù)

function computeNewHighWaterMark(n) {
      n--;
      n |= n >>> 1;
      n |= n >>> 2;
      n |= n >>> 4;
      n |= n >>> 8;
      n |= n >>> 16;
      n++;
     return n;
  }

然后寫(xiě)read方法,要考慮全n的各種情況,上代碼

read(n) { 

    if(n>this.length){
        // 更改緩存區(qū)大小  讀取五個(gè)就找 2的幾次放最近的
        this.highWaterMark = computeNewHighWaterMark(n)
        this.emittedReadable = true;
        this._read();
    }


    // 如果n>0 去緩存區(qū)中取吧
    let buffer=null;
    let index = 0; // 維護(hù)buffer的索引的
    let flag = true;
    if (n > 0 && n <= this.length) { // 讀的內(nèi)容 緩存區(qū)中有這么多
        // 在緩存區(qū)中取 [[2,3],[4,5,6]]
        buffer = Buffer.alloc(n); // 這是要返回的buffer
        let buf;
        while (flag&&(buf = this.buffers.shift())) {
            for (let i = 0; i < buf.length; i++) {
                buffer[index++] = buf[i];
                if(index === n){ // 拷貝夠了 不需要拷貝了
                    flag = false;
                    this.length -= n;
                    let bufferArr = buf.slice(i+1); // 取出留下的部分
                    // 如果有剩下的內(nèi)容 在放入到緩存中
                    if(bufferArr.length > 0){
                        this.buffers.unshift(bufferArr);
                    }
                    break;
                }
            }
        }
    }
    // 當(dāng)前緩存區(qū) 小于highWaterMark時(shí)在去讀取
    if (this.length == 0) {
        this.emittedReadable = true;
    }
    if (this.length < this.highWaterMark) {
        if(!this.reading){
            this.reading = true;
            this._read(); // 異步的
        }
    }
    return buffer
}

附上可讀流暫停模式的完整實(shí)現(xiàn)原理代碼

let fs = require("fs");
let EventEmitter = require("events");
function computeNewHighWaterMark(n) {
      n--;
      n |= n >>> 1;
      n |= n >>> 2;
      n |= n >>> 4;
      n |= n >>> 8;
      n |= n >>> 16;
      n++;
     return n;
  }
class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.autoClose = options.autoClose || true;
        this.start = 0;
        this.end = options.end;
        this.flags = options.flags || "r";

        this.buffers = []; // 緩存區(qū) 
        this.pos = this.start;
        this.length = 0; // 緩存區(qū)大小
        this.emittedReadable = false;
        this.reading = false; // 不是正在讀取的
        this.open();
        this.on("newListener", (eventName) => {
            if (eventName === "readable") {
                this.read();
            }
        })
    }
    read(n) { 

        if(n>this.length){
            // 更改緩存區(qū)大小  讀取五個(gè)就找 2的幾次放最近的
            this.highWaterMark = computeNewHighWaterMark(n)
            this.emittedReadable = true;
            this._read();
        }


        // 如果n>0 去緩存區(qū)中取吧
        let buffer=null;
        let index = 0; // 維護(hù)buffer的索引的
        let flag = true;
        if (n > 0 && n <= this.length) { // 讀的內(nèi)容 緩存區(qū)中有這么多
            // 在緩存區(qū)中取 [[2,3],[4,5,6]]
            buffer = Buffer.alloc(n); // 這是要返回的buffer
            let buf;
            while (flag&&(buf = this.buffers.shift())) {
                for (let i = 0; i < buf.length; i++) {
                    buffer[index++] = buf[i];
                    if(index === n){ // 拷貝夠了 不需要拷貝了
                        flag = false;
                        this.length -= n;
                        let bufferArr = buf.slice(i+1); // 取出留下的部分
                        // 如果有剩下的內(nèi)容 在放入到緩存中
                        if(bufferArr.length > 0){
                            this.buffers.unshift(bufferArr);
                        }
                        break;
                    }
                }
            }
        }
        // 當(dāng)前緩存區(qū) 小于highWaterMark時(shí)在去讀取
        if (this.length == 0) {
            this.emittedReadable = true;
        }
        if (this.length < this.highWaterMark) {
            if(!this.reading){
                this.reading = true;
                this._read(); // 異步的
            }
        }
        return buffer
    }
    // 封裝的讀取的方法
    _read() {
        // 當(dāng)文件打開(kāi)后在去讀取
        if (typeof this.fd !== "number") {
            return this.once("open", () => this._read());
        }
        // 上來(lái)我要喝水 先倒三升水 []
        let buffer = Buffer.alloc(this.highWaterMark);
        fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
            if (bytesRead > 0) {
                // 默認(rèn)讀取的內(nèi)容放到緩存區(qū)中
                this.buffers.push(buffer.slice(0, bytesRead));
                this.pos += bytesRead; // 維護(hù)讀取的索引
                this.length += bytesRead;// 維護(hù)緩存區(qū)的大小
                this.reading = false;
                // 是否需要觸發(fā)readable事件
                if (this.emittedReadable) {
                    this.emittedReadable = false; // 下次默認(rèn)不觸發(fā)
                    this.emit("readable");
                }
            } else {
                this.emit("end");
                this.destroy();
            }
        })
    }
    destroy() {
        if (typeof this.fd !== "number") {
            return this.emit("close")
        }
        fs.close(this.fd, () => {
            this.emit("close")
        })
    }
    open() {
        fs.open(this.path, this.flags, (err, fd) => {
            if (err) {
                this.emit("error", err);
                if (this.autoClose) {
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit("open");
        });
    }
}

module.exports = ReadStream;

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/93921.html

相關(guān)文章

  • node點(diǎn)事(二) -- Writable streams(可寫(xiě)流)、自定義流

    摘要:可寫(xiě)流可寫(xiě)流是對(duì)數(shù)據(jù)寫(xiě)入目的地的一種抽象。對(duì)象流的特點(diǎn)就是它有一個(gè)標(biāo)志,我們可以設(shè)置它讓流可以接受任何對(duì)象。 可寫(xiě)流(Writable Stream) 可寫(xiě)流是對(duì)數(shù)據(jù)寫(xiě)入目的地的一種抽象。 可寫(xiě)流的原理其實(shí)與可讀流類(lèi)似,當(dāng)數(shù)據(jù)過(guò)來(lái)的時(shí)候會(huì)寫(xiě)入緩存池,當(dāng)寫(xiě)入的速度很慢或者寫(xiě)入暫停時(shí)候,數(shù)據(jù)流便會(huì)進(jìn)入到隊(duì)列池緩存起來(lái),當(dāng)然即使緩存池滿(mǎn)了,剩余的數(shù)據(jù)也是存在內(nèi)存 可寫(xiě)流的簡(jiǎn)單用法如下代碼 l...

    mtunique 評(píng)論0 收藏0
  • 通過(guò)源碼解析 Node.js 中導(dǎo)流(pipe)的實(shí)現(xiàn)

    摘要:回調(diào)函數(shù)中檢測(cè)該次寫(xiě)入是否被緩沖,若是,觸發(fā)事件。若目標(biāo)可寫(xiě)流表示該寫(xiě)入操作需要進(jìn)行緩沖,則立刻將源可讀流切換至?xí)和DJ健1O(jiān)聽(tīng)源可讀流的事件,相應(yīng)地結(jié)束目標(biāo)可寫(xiě)流。 在Node.js中,流(Stream)是其眾多原生對(duì)象的基類(lèi),它對(duì)處理潛在的大文件提供了支持,也抽象了一些場(chǎng)景下的數(shù)據(jù)處理和傳遞。在它對(duì)外暴露的接口中,最為神奇的,莫過(guò)于導(dǎo)流(pipe)方法了。鑒于近期自己正在閱讀Node...

    defcon 評(píng)論0 收藏0
  • Node.js中流的使用

    摘要:流是基于事件的用于管理和處理數(shù)據(jù)而且有不錯(cuò)的效率借助事件和非阻塞庫(kù)流模塊允許在其可用的時(shí)候動(dòng)態(tài)處理在其不需要的時(shí)候釋放掉使用流的好處舉一個(gè)讀取文件的例子使用同步讀取一個(gè)文件程序會(huì)被阻塞所有的數(shù)據(jù)都會(huì)被讀取到內(nèi)存中換用讀取文件程序不會(huì)被阻塞但 流是基于事件的API,用于管理和處理數(shù)據(jù),而且有不錯(cuò)的效率.借助事件和非阻塞I/O庫(kù),流模塊允許在其可用的時(shí)候動(dòng)態(tài)處理,在其不需要的時(shí)候釋放掉. ...

    h9911 評(píng)論0 收藏0
  • 淺談node.js中的stream(流)

    摘要:在可讀流事件里我們就必須調(diào)用方法。當(dāng)一個(gè)對(duì)象就意味著我們想發(fā)出信號(hào)這個(gè)流沒(méi)有更多數(shù)據(jù)了自定義可寫(xiě)流為了實(shí)現(xiàn)可寫(xiě)流,我們需要使用流模塊中的構(gòu)造函數(shù)。我們只需給構(gòu)造函數(shù)傳遞一些選項(xiàng)并創(chuàng)建一個(gè)對(duì)象。 前言 什么是流呢?看字面意思,我們可能會(huì)想起生活中的水流,電流。但是流不是水也不是電,它只是描述水和電的流動(dòng);所以說(shuō)流是抽象的。在node.js中流是一個(gè)抽象接口,它不關(guān)心文件內(nèi)容,只關(guān)注是否從...

    elliott_hu 評(píng)論0 收藏0
  • [轉(zhuǎn)]nodejs Stream使用手冊(cè)

    摘要:方法也可以接收一個(gè)參數(shù)表示數(shù)據(jù)請(qǐng)求著請(qǐng)求的數(shù)據(jù)大小,但是可讀流可以根據(jù)需要忽略這個(gè)參數(shù)。讀取數(shù)據(jù)大部分情況下我們只要簡(jiǎn)單的使用方法將可讀流的數(shù)據(jù)重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取數(shù)據(jù)更有用。 介紹本文介紹了使用 node.js streams 開(kāi)發(fā)程序的基本方法。 We should have some ways of connecting programs ...

    luffyZh 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

rickchen

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<