接上回,顺道吐槽一下——MMP果不其然跳票了(:з」∠)
Readable Streams
可读流是对消耗数据的源的抽象。其包括的例子有:
客户端的
HTTP responses
服务端的HTTP requests
fs
读取的流zlib
流crypto
流TCP sockets
子进程的stdout
和stderr
process.stdin
所有可读流都是由stream.Readable
类定义的接口实现的。
两种模式
flowing
:从底层系统自动读取数据,并且通过EventEmitter
接口使用事件尽快提供给应用程序。
paused
:必须显式调用stream.read()
方法从流中读取数据块。
所有可读流都是从paused
模式开始的,可以通过下列方式转化为flowing
模式:
- 添加一个’data’事件处理器
- 调用
stream.resume()
方法 - 调用
stream.pipe()
方法发送数据给可写流
同时也可以根据下列方式转回paused
模式:
- 若无管道目标,调用
stream.pause()
方法。 - 若有管道目标,通过移除任何’data’事件处理器,并通过调用
stream.unpipe()
方法删除所有管道目标。
记住一个重要的概念——只有在提供消耗或忽略该数据的机制之前,可读流才会生成数据。
这里需要注意的一点:如果一个可读流转化到flowing
模式的过程中没有消费者去处理这个数据,那这些数据将会丢失。
比如readable.resume()
方法没有侦听器去绑定到data
事件上,或者当data
事件处理器被移除的时候。
三个状态
readable._readableState.flowing
的值可能为三个状态——null
, false
, true
当为null
的时候,没有提供消耗流数据的机制,所以流不会生成数据。
在null
状态下,如果给data
事件添加侦听器,同时调用readable.pipe()
方法,或调用readable.resume()
方法,这将会转换到true
状态,引起可读流积极发射事件。
调用readable.pause()
,readable.unpipe()
或收到back pressure
将会把状态切换到false
,暂时停止事件的流动,但不会停止生成数据,数据可能在流内部缓冲区内累积。
在false
状态下,为data
事件添加侦听器不会导致状态切换到true
选择一个consummer
consummer
应该指定一个,不要采取多个方法来消耗一个流,一般来说,推荐使用readable.pipe()
方法。
如果需要更加细粒度的,可以使用readable.pause()
/readable.resume()
这两个API。
stream.Readable
类
Event: ‘close’
当流和任何底层资源关闭时,发出
close
事件,该事件表示不再发生任何事件,并且不会进一步计算。Event: ‘data’
数据块
chunk
:, , 。对于不在对象模式下运行的流,将是 string
或Buffer
。在对象模式运行下的流,可以是除了null
之外的任意JS值。data
事件在流将一大堆数据的所有权交给consummer
时触发。例如调用readable.pipe()
,readable.resume()
,或者附到侦听器时。当readable.read()
方法调用返回一大堆数据的同时,也会触发data
事件。将
data
事件侦听器附加到未被明确暂停的流,将会切换到flowing
模式。数据将会在可用时立即通过。如果已经使用
readable.setEncoding()
方法为流指定了默认的编码,则侦听器回调将作为string
传递数据块;否则将作为Buffer
传递。Event: ‘end’
当没有更多的数据要流中消耗时触发
end
。Event: ‘error’
error
事件可能在任何时候由Readable
实现触发。如果底层流由于底层内部故障而无法生成数据,或者当流实现尝试推送无效数据块时,可能发生这样的情况。Event: ‘readable’
当流有可读的数据时触发。
1 | const readable = getReadableStreamSomehow(); |
当foo.txt
为空文件时。
1 | const fs = require('fs'); |
readable.isPaused()
和readable.pause()
readable.isPaused()
用来判断流是否处于pause
状态
readable.pause()
时将导致flowing
模式中的流停止发送data
事件,切换出flowing
模式。此时,任何数据都会保留在内部缓冲区。
1 | const readable = new stream.Readable(); |
readable.pipe(destination[, options])
readable.pipe
方法将附加一个可写流到可读流中,使其自动切换到flowing
模式并将所有数据到附加到Writable
上。将自动管理数据流,以便目标可写流不被更快的可读流消耗。如下:
1 | const readable = getReadableStreamSomehow(); |
还可以将多个可写流附加到单个可读流,readable.pipe()
方法返回对目标流的引用,从而能设置管道流的链接。
1 | const r = fs.createReadStream('file.txt'); |
如果可读流在处理过程中发生错误,Writable
目标不会自动关闭,冰球需要手动关闭每个流,以防止内存泄漏。
process.stderr
和process.stdout
可写流从不关闭,直到Node.js进程退出
readable.read([size])
readable.read()
方法将一些数据从内部缓冲区中抽出并返回。若无数据可读,则返回null
。
默认情况下,数据将作为Buffer
对象返回,除非指定编码,或者以对象模式运行。
只有在paused
模式下可以调用该方法,在flowing
模式下,是自动调用的,直到内部缓冲区完全耗尽。
通常情况下,建议开发者避免使用readable
事件和readable.read()
方法,有利于使用readable.pipe()
或data
事件。
readable.resume()
readable.resume()
方法使显式paused
下的可读流继续发送data
事件,将流切换为flowing
模式。
readable.resume()
方法可用于完全消耗流中的数据,而不会实际处理任何数据。如下所示:
1 | getReadableStreamSomehow() |
readable.setEncoding(encoding)
这个比较好理解,就是设置流的编码,例如readable.setEncoding('hex')
会作为十六进制字符串格式编码。
1 | const readable = getReadableStreamSomehow(); |
readable.unpipe([destination])
用以分离先前连接的可写流。如果没有指定参数,所有的管道都会被分离。如果指定了,但是没有设置管道,那就什么都不做。
readable.unshift(chunk)
把数据重新返回到缓冲区,用来处理某些不消耗数据的情况,不能在end
之后调用。
官方推荐使用Transform
流来代替stream.unshift()
。
在执行读操作的过程中应避免调用这个方法,有可能导致一些预料之外的结果。
readable.wrap(stream)
这是老版本的API,无视。
readable.destroy([error])
破坏流,并且发出错误。调用之后,可读流会释放任何内部资源。