这次开始看NodeJs的又一个核心——Stream的概念。比较冗长,而且概念性的东西很多,这次先看看流基本概念和写入流部分。
Stream——流,在Node.js里是一个处理流数据的抽象接口。stream
模块提供了一些基本的API能够轻松构建实现流接口的对象。
Node.js提供了非常多的流对象,比如说HTTP的服务请求、process.stdout
都是实例。
流是可读写的,所有流都是EventEmitter
的实例。
流模块的访问方式:
1 | const stream = require('stream') |
了解流如何工作很重要,但是如果要创建新类型的流实例的话,流模块本身是最有用的,普通开发者一般很少直接使用流模块。
也就是说在做底层封装的时候会用到,如果是上层业务开发一般是用不到这玩意儿的。当然,我们还是要有梦想的,先从普通的了解开始。
流的类型
可读:如
fs.createReadStream()
可写:如
fs.createWriteStream()
双工流(Duplex):如
net.Socket
双工转换流(Transform):可同时读写,并且可以转换数据类型,如
zlib.createDeflate()
对象模式
由Node.js创建的所有流都专用于字符串和Buffer(or Unit8Array)
对象。在对象模式下,可以实现其他类型的js值和流一起工作。
当流创建的时候,流实例可以使用objectMode
选项切换到对象模式,但是将已经存在的流实例转换为对象模式则是不安全的。
Buffering
同时可读可写的流会把数据存储到内部缓冲区,在内部缓冲区可以使用writable._writableState.getBuffer()
或readable._readableState.buffer
来检索。
缓冲的数据量大小设置取决于流构造函数的highWaterMark
选项。
普通流,highWaterMark
选项指定总字节数。
对象模式运行的流,highWaterMark
指定对象的总数。
调用stream.push(chunk)
,数据被缓存在可读流中。如果流的消费者不调用stream.read()
,数据会一直在内部队列中,直到被消耗。
一旦内部缓冲区的总大小达到highWaterMark
指定的阈值,流会停止从底层资源读取数据,直到当前的缓冲的数据可以被使用(也就是说,流将停止调用用于填充read
缓冲区的内部readable._read()
方法)。
当writable.write(chunk)
方法被重复调用的时候,数据被缓存在可读流中。当内部write
缓冲区大小低于highWaterMark
阈值时,调用writable.write()
会返回true
,反之则返回false
。
stream
API(特别是stream.pipe()
方法)的关键目标是将数据缓冲限制为可接受的水平,使得不同速度的源和目标不会超过可用内存。
由于Duplex
和Transform
流都允许同时可读和可写,每个都保留用于读和写的两个单独的内部缓冲区,并且允许每一方独立于另一方操作,同时保持合适和有效的数据流。
例如net.Socket
实例就是Duplex
流,它的可读端允许消耗从socket
接收的数据,可写端则允许将数据写入socket
。
因为数据写入到socket
可能比接收更快或者更慢,因此write
端和read
读独立操作是非常重要的。
流消耗者API
几乎所有的Node.js应用程序都以某种方式使用流。比如下面的HTTP
服务。
1 | const http = require('http') |
Writable
流,如这个例子里的res
,暴露了将数据写入流的方法write()
和end()
。
Readable
流可以使用EventEmitter API
来通知应用程序代码,当数据可以从流中读取的时候,可以从流中以多种方式读取可用数据。
Writable
和Readable
流都以各种方式使用EventEmitter API
来交流流的当前状态。
将数据写入数据或从流中消耗数据的应用程序不需要直接实现流接口,通常就不需要调用require('stream')
。
Writable
流
常见例子
HTTP requests
,客户端
HTTP responses
, 服务端
fs
写入流
zlib
crypto
TCP sockets
child process stdin
process.stdout, process.stderr
所有WritAble
流通过stream.Writable
类实现接口定义
当WritAble
流的特定实例可能以各种方式不同,但所有可写流都遵循相同的基本使用模式,如下例:
1 | const myStream = getWritableStreamShomehow() |
stream.Writable
类
Event: ‘close’
当流和任何底层资源关闭时触发。之后不在发生任何事件,并且将不会进一步计算。
Event: ‘drain’
调用stream.write(chunk)
返回false
时触发。适当的时候将drain
事件发送到数据流中。
1 | function writeOneMillionTimes(writer, data, encoding, callback) { |
Event: ‘error’
在writeing
或pipe
数据时发生错误时触发。当调用时,监听器回调传递一个错误参数。
P.S. error
事件触发时流并不会关闭。
Event: ‘finish’
stream.end()
调用后触发。所有数据都会被刷新到底层系统(这句不是很明白)。
1 | const writer = getWritableStreamSomehow() |
Event: ‘pipe’
可读流调用stream.pipe()
时触发。写入目标集。
1 | const writer = getWritableStreamSomehow() |
Evnet: ‘unpipe’
stream.unpipe()
调用时触发,从目标集中移除此Writable
。
1 | const writer = getWritableStreamSomehow() |
writable.cork()
强制将所有写入的数据缓冲在内存中。调用writable.uncork()
或stream.end()
刷新缓冲区数据。
主要目的是避免在流中写入许多小块数据的情况下,不会在内部缓冲区造成对性能有不理影响的备份。在这种情况下,实现writable._writtev()
方法的实现可以以更优化的方式执行缓冲写入。
writable.end()
直接看官方例子,秒懂。
1 | // write 'hello, ' and then end with 'world!' |
writable.write(chunk[, encoding][, callback])
writable.write()
方法将一些数据写入流中,并在数据完全处理后调用提供的回调。
如果发生错误,回调可能会被调用,也可能不会以错误为第一个参数来调用。为了可靠地检测写错误,直接添加一个error
事件的监听器是更好的做法。
低于highWaterMark
配置时,返回true
,否则false
时,将进一步尝试写入数据直到触发drain
事件。
这一段官方文档非常啰嗦,但总体意思其实就是围绕着触发drain
避免内存问题来描述的。
1 | function write (data, cb) { |
如果是对象模式的话,将总会忽略encoding
这个参数
writable.destroy([error])
销毁流,并发出传递的错误。调用这个方法之后,可写入流已经结束。这个方法不应该被覆盖,可以实现writable._destroy
来替代。
短暂小结
看到这部分其实已经花了三个多小时了,一方面是因为原文不太好理解,另一方面确实底层概念有点多,需要花时间理解。立个flag,可读流部分两天后完成。
目前我学习Node.js的模块的流程其实是按照先核心概念(Modules,Events,Stream,Errors),再基本对象(Net,Path…)和基本模块(fs,HTTP,URL,Crypto…)这样的顺序来学习的。这样的一套流程相对比较科学合理,但是会存在断层问题,这个之后等全部系统过完完整一遍之后再进行练习补完。