• QQ咨詢:4001806960
  • 咨詢熱線:400-180-6960

NodeJs中的stream(流)- 基礎篇

作者:第一期架構課學員日期:2018-04-08 18:20:02 點擊:357

一、什麽是Stream(流)

流(stream)在 Node.js 中是處理流數據的抽象接口(abstract interface)。 stream 模塊提供了基礎的 API 。使用這些 API 可以很容易地來構建實現流接口的對象。

流是可讀的、可寫的,或是可讀寫的。

二、NodeJs中的Stream的幾種類型

Node.js 中有四種基本的流類型:

  • Readable - 可讀的流(fs.createReadStream())
  • Writable - 可寫的流(fs.createWriteStream())
  • Duplex - 可讀寫的流(net.Socket)
  • Transform - 在讀寫過程中可以修改和變換數據的 Duplex 流 (例如 zlib.createDeflate())

NodeJs中關于流的操作被封裝到了Stream模塊中,這個模塊也被多個核心模塊所引用。

const stream = require('stream');

在 NodeJS 中對文件的處理多數使用流來完成

  • 普通文件
  • 設備文件(stdin、stdout)
  • 網絡文件(http、net)

注:在NodeJs中所有的Stream(流)都是EventEmitter的實例

Example:

1.將1.txt的文件內容讀取爲流數據

const fs = require('fs');
 
// 創建一個可讀流(生産者)
let rs = fs.createReadStream('./1.txt');

通過fs模塊提供的createReadStream()可以輕松創建一個可讀的文件流。但我們並有直接使用Stream模塊,因爲fs模塊內部已經引用了Stream模塊並做了封裝。所以說 流(stream)在 Node.js 中是處理流數據的抽象接口,提供了基礎Api來構建實現流接口的對象。

var rs = fs.createReadStream(path,[options]);

1.path 讀取文件的路徑

2.options

  • flags打開文件的操作, 默認爲’r’
  • mode 權限位 0o666
  • encoding默認爲null
  • start開始讀取的索引位置
  • end結束讀取的索引位置(包括結束位置)
  • highWaterMark讀取緩存區默認的大小64kb

Node.js 提供了多種流對象。 例如:

  • HTTP 請求 (request response)
  • process.stdout 就都是流的實例。

2.創建可寫流(消費者)處理可讀流

將1.txt的可讀流 寫入到2.txt文件中 這時我們需要一個可寫流

const fs = require('fs');
// 創建一個可寫流
let ws = fs.createWriteStream('./2.txt');
// 通過pipe讓可讀流流入到可寫流 寫入文件
rs.pipe(ws);
var ws = fs.createWriteStream(path,[options]);

1.path 讀取文件的路徑

2.options

  • flags打開文件的操作, 默認爲’w’
  • mode 權限位 0o666
  • encoding默認爲utf8
  • autoClose:true是否自動關閉文件
  • highWaterMark讀取緩存區默認的大小16kb

pipe 它是Readable流的方法,相當于一個”管道”,數據必須從上遊 pipe 到下遊,也就是從一個 readable 流 pipe 到 writable 流。
後續將深入將介紹pipe。




 

如上圖,我們把文件比作裝水的桶,而水就是文件裏的內容,我們用一根管子(pipe)連接兩個桶使得水從一個桶流入另一個桶,這樣就慢慢的實現了大文件的傳輸過程。

三、爲什麽應該使用 Stream

當有用戶在線看視頻,假定我們通過HTTP請求返回給用戶視頻內容

const http = require('http');
const fs = require('fs');
 
http.createServer((req, res) => {
fs.readFile(videoPath, (err, data) => {
res.end(data);
});
}).listen(8080);

但這樣有兩個明顯的問題

1.視頻文件需要全部讀取完,才能返回給用戶,這樣等待時間會很長
2.視頻文件一次全放入內存中,內存吃不消

用流可以將視頻文件一點一點讀到內存中,再一點一點返回給用戶,讀一部分,寫一部分。(利用了 HTTP 協議的 Transfer-Encoding: chunked 分段傳輸特性),用戶體驗得到優化,同時對內存的開銷明顯下降

const http = require('http');
const fs = require('fs');
 
http.createServer((req, res) => {
fs.createReadStream(videoPath).pipe(res);
}).listen(8080);

四、可讀流(Readable Stream)

可讀流(Readable streams)是對提供數據的源頭(source)的抽象。

例如:

  • HTTP responses, on the client
  • HTTP requests, on the server
  • fs read streams
  • TCP sockets
  • process.stdin

所有的 Readable 都實現了 stream.Readable 類定義的接口。

可讀流的兩種模式(flowing 和 paused)

1.在 flowing 模式下, 可讀流自動從系統底層讀取數據,並通過 EventEmitter 接口的事件盡快將數據提供給應用。

2.在 paused 模式下,必須顯式調用 stream.read()方法來從流中讀取數據片段。

所有初始工作模式爲paused的Readable流,可以通過下面三種途徑切換爲flowing模式:

  • 監聽’data’事件
  • 調用stream.resume()方法
  • 調用stream.pipe()方法將數據發送到Writable

流動模式flowing

流切換到流動模式 監聽data事件

const rs = fs.createReadStream('./1.txt');
const ws = fs.createWriteStream('./2.txt');
rs.on('data', chunk => {
ws.write(chunk);
});
ws.on('end', () => {
ws.end();
});

如果寫入的速度跟不上讀取的速度,有可能導致數據丟失。正常的情況應該是,寫完一段,再讀取下一段,如果沒有寫完的話,就讓讀取流先暫停,等寫完再繼續。

var fs = require('fs');
// 讀取highWaterMark(3字節)數據,讀完之後填充緩存區,然後觸發data事件
var rs = fs.createReadStream(sourcePath, {
highWaterMark: 3
});
var ws = fs.createWriteStream(destPath, {
highWaterMark: 3
});
 
rs.on('data', function(chunk) { // 當有數據流出時,寫入數據
if (ws.write(chunk) === false) { // 如果沒有寫完,暫停讀取流
rs.pause();
}
});
 
ws.on('drain', function() { // 緩沖區清空觸發drain事件 這時再繼續讀取
rs.resume();
});
 
rs.on('end', function() { // 當沒有數據時,關閉數據流
ws.end();
});

或者使用更直接的pipe

fs.createReadStream(sourcePath).pipe(fs.createWriteStream(destPath));

暫停模式paused

1.在流沒有 pipe() 時,調用 pause() 方法可以將流暫停
2.pipe() 時,需要移除所有 data 事件的監聽,再調用 unpipe() 方法

read(size)

流在暫停模式下需要程序顯式調用 read() 方法才能得到數據。read() 方法會從內部緩沖區中拉取並返回若幹數據,當沒有更多可用數據時,會返回null。read()不會觸發’data’事件。

使用 read() 方法讀取數據時,如果傳入了 size 參數,那麽它會返回指定字節的數據;當指定的size字節不可用時,則返回null。如果沒有指定size參數,那麽會返回內部緩沖區中的所有數據。
NodeJS 爲我們提供了一個 readable 的事件,事件在可讀流准備好數據的時候觸發,也就是先監聽這個事件,收到通知又數據了我們再去讀取就好了:

const fs = require('fs');
rs = fs.createReadStream(sourcePath);
 
// 當你監聽 readable事件的時候,會進入暫停模式
rs.on('readable', () => {
console.log(rs._readableState.length);
// read如果不加參數表示讀取整個緩存區數據
// 讀取一個字段,如果可讀流發現你要讀的字節小于等于緩存字節大小,則直接返回
let ch = rs.read(1);
});

暫停模式 緩存區的數據以鏈表的形式保存在BufferList中

五、可寫流(Writable Stream)

可寫流是對數據流向設備的抽象,用來消費上遊流過來的數據,通過可寫流程序可以把數據寫入設備,常見的是本地磁盤文件或者 TCP、HTTP 等網絡響應。

Writable 的例子包括了:

  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

所有 Writable 流都實現了 stream.Writable 類定義的接口。

process.stdin.pipe(process.stdout);

process.stdout 是一個可寫流,程序把可讀流 process.stdin 傳過來的數據寫入的標准輸出設備。在了解了可讀流的基礎上理解可寫流非常簡單,流就是有方向的數據,其中可讀流是數據源,可寫流是目的地,中間的管道環節是雙向流。

可寫流使用

調用可寫流實例的 write() 方法就可以把數據寫入可寫流

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);
 
rs.setEncoding('utf-8'); // 設置編碼格式
rs.on('data', chunk => {
ws.write(chunk); // 寫入數據
});

監聽了可讀流的 data 事件就會使可讀流進入流動模式,我們在回調事件裏調用了可寫流的 write() 方法,這樣數據就被寫入了可寫流抽象的設備destPath中。

write() 方法有三個參數

  • chunk {String| Buffer},表示要寫入的數據
  • encoding 當寫入的數據是字符串的時候可以設置編碼
  • callback 數據被寫入之後的回調函數

‘drain’事件

如果調用 stream.write(chunk) 方法返回 false,表示當前緩存區已滿,流將在適當的時機(緩存區清空後)觸發 ‘drain

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);
 
rs.setEncoding('utf-8'); // 設置編碼格式
rs.on('data', chunk => {
let flag = ws.write(chunk); // 寫入數據
if (!flag) { // 如果緩存區已滿暫停讀取
rs.pause();
}
});
 
ws.on('drain', () => {
rs.resume(); // 緩存區已清空 繼續讀取寫入
});

六、總結

stream(流)分爲可讀流(flowing mode 和 paused mode)、可寫流、可讀寫流,Node.js 提供了多種流對象。 例如, HTTP 請求 和 process.stdout 就都是流的實例。stream 模塊提供了基礎的 API 。使用這些 API 可以很容易地來構建實現流接口的對象。它們底層都調用了stream模塊並進行封裝。

後續我們將繼續對stream深入解析以及Readable Writable pipe的實現

<

上一篇: Promise 詳解與實現(遵循Promise/A+規範)

下一篇: 理解TCP/IP協議(一)