Node.js从零开始-Stream(中)

接上回,顺道吐槽一下——MMP果不其然跳票了(:з」∠)

Readable Streams

可读流是对消耗数据的源的抽象。其包括的例子有:

客户端的HTTP responses
服务端的HTTP requests
fs读取的流
zlib
crypto
TCP sockets
子进程的stdoutstderr
process.stdin

所有可读流都是由stream.Readable类定义的接口实现的。

两种模式

flowing:从底层系统自动读取数据,并且通过EventEmitter接口使用事件尽快提供给应用程序。

paused:必须显式调用stream.read()方法从流中读取数据块。

所有可读流都是从paused模式开始的,可以通过下列方式转化为flowing模式:

  • 添加一个’data’事件处理器
  • 调用stream.resume()方法
  • 调用stream.pipe()方法发送数据给可写流

同时也可以根据下列方式转回paused模式:

  • 若无管道目标,调用stream.pause()方法。
  • 若有管道目标,通过移除任何’data’事件处理器,并通过调用stream.unpipe()方法删除所有管道目标。

记住一个重要的概念——只有在提供消耗或忽略该数据的机制之前,可读流才会生成数据。

这里需要注意的一点:如果一个可读流转化到flowing模式的过程中没有消费者去处理这个数据,那这些数据将会丢失。

比如readable.resume()方法没有侦听器去绑定到data事件上,或者当data事件处理器被移除的时候。

三个状态

readable._readableState.flowing的值可能为三个状态——null, false, true

当为null的时候,没有提供消耗流数据的机制,所以流不会生成数据。

null状态下,如果给data事件添加侦听器,同时调用readable.pipe()方法,或调用readable.resume()方法,这将会转换到true状态,引起可读流积极发射事件。

调用readable.pause()readable.unpipe()或收到back pressure将会把状态切换到false,暂时停止事件的流动,但不会停止生成数据,数据可能在流内部缓冲区内累积。

false状态下,为data事件添加侦听器不会导致状态切换到true

选择一个consummer

consummer应该指定一个,不要采取多个方法来消耗一个流,一般来说,推荐使用readable.pipe()方法。

如果需要更加细粒度的,可以使用readable.pause()/readable.resume()这两个API。

stream.Readable

  • Event: ‘close’

    当流和任何底层资源关闭时,发出close事件,该事件表示不再发生任何事件,并且不会进一步计算。

  • Event: ‘data’

    数据块chunk:, , 。对于不在对象模式下运行的流,将是stringBuffer。在对象模式运行下的流,可以是除了null之外的任意JS值。

    data事件在流将一大堆数据的所有权交给consummer时触发。例如调用readable.pipe()readable.resume(),或者附到侦听器时。当readable.read()方法调用返回一大堆数据的同时,也会触发data事件。

    data事件侦听器附加到未被明确暂停的流,将会切换到flowing模式。数据将会在可用时立即通过。

    如果已经使用readable.setEncoding()方法为流指定了默认的编码,则侦听器回调将作为string传递数据块;否则将作为Buffer传递。

  • Event: ‘end’

    当没有更多的数据要流中消耗时触发end

  • Event: ‘error’

    error事件可能在任何时候由Readable实现触发。如果底层流由于底层内部故障而无法生成数据,或者当流实现尝试推送无效数据块时,可能发生这样的情况。

  • Event: ‘readable’

    当流有可读的数据时触发。

1
2
3
4
const readable = getReadableStreamSomehow();
readable.on('readable', () => {
// there is some data to read now
});

foo.txt为空文件时。

1
2
3
4
5
6
7
8
9
10
11
const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
console.log('readable:', rr.read());
});
rr.on('end', () => {
console.log('end');
});
// 将输出下面的信息
// readable: null
// end

readable.isPaused()readable.pause()

readable.isPaused()用来判断流是否处于pause状态

readable.pause()时将导致flowing模式中的流停止发送data事件,切换出flowing模式。此时,任何数据都会保留在内部缓冲区。

1
2
3
4
5
6
const readable = new stream.Readable();
readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false

readable.pipe(destination[, options])

readable.pipe方法将附加一个可写流到可读流中,使其自动切换到flowing模式并将所有数据到附加到Writable上。将自动管理数据流,以便目标可写流不被更快的可读流消耗。如下:

1
2
3
4
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
readable.pipe(writable);

还可以将多个可写流附加到单个可读流,readable.pipe()方法返回对目标流的引用,从而能设置管道流的链接。

1
2
3
4
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

如果可读流在处理过程中发生错误,Writable目标不会自动关闭,冰球需要手动关闭每个流,以防止内存泄漏。

process.stderrprocess.stdout可写流从不关闭,直到Node.js进程退出

readable.read([size])

readable.read()方法将一些数据从内部缓冲区中抽出并返回。若无数据可读,则返回null

默认情况下,数据将作为Buffer对象返回,除非指定编码,或者以对象模式运行。

只有在paused模式下可以调用该方法,在flowing模式下,是自动调用的,直到内部缓冲区完全耗尽。

通常情况下,建议开发者避免使用readable事件和readable.read()方法,有利于使用readable.pipe()data事件。

readable.resume()

readable.resume()方法使显式paused下的可读流继续发送data事件,将流切换为flowing模式。

readable.resume()方法可用于完全消耗流中的数据,而不会实际处理任何数据。如下所示:

1
2
3
4
5
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Reached the end, but did not read anything.');
});

readable.setEncoding(encoding)

这个比较好理解,就是设置流的编码,例如readable.setEncoding('hex')会作为十六进制字符串格式编码。

1
2
3
4
5
6
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('got %d characters of string data', chunk.length);
});

readable.unpipe([destination])

用以分离先前连接的可写流。如果没有指定参数,所有的管道都会被分离。如果指定了,但是没有设置管道,那就什么都不做。

readable.unshift(chunk)

把数据重新返回到缓冲区,用来处理某些不消耗数据的情况,不能在end之后调用。

官方推荐使用Transform流来代替stream.unshift()

在执行读操作的过程中应避免调用这个方法,有可能导致一些预料之外的结果。

readable.wrap(stream)

这是老版本的API,无视。

readable.destroy([error])

破坏流,并且发出错误。调用之后,可读流会释放任何内部资源。

REF

官方文档

坚持原创技术分享,您的支持将鼓励我继续创作!