Node.js从零开始-Stream(上)

这次开始看NodeJs的又一个核心——Stream的概念。比较冗长,而且概念性的东西很多,这次先看看流基本概念和写入流部分。

Stream——流,在Node.js里是一个处理流数据的抽象接口。stream模块提供了一些基本的API能够轻松构建实现流接口的对象。

Node.js提供了非常多的流对象,比如说HTTP的服务请求、process.stdout都是实例。

流是可读写的,所有流都是EventEmitter的实例。

流模块的访问方式:

1
const stream = require('stream')

了解流如何工作很重要,但是如果要创建新类型的流实例的话,流模块本身是最有用的,普通开发者一般很少直接使用流模块。

也就是说在做底层封装的时候会用到,如果是上层业务开发一般是用不到这玩意儿的。当然,我们还是要有梦想的,先从普通的了解开始。

流的类型

可读:如fs.createReadStream()

可写:如fs.createWriteStream()

双工流(Duplex):如net.Socket

双工转换流(Transform):可同时读写,并且可以转换数据类型,如zlib.createDeflate()

对象模式

由Node.js创建的所有流都专用于字符串和Buffer(or Unit8Array)对象。在对象模式下,可以实现其他类型的js值和流一起工作。

当流创建的时候,流实例可以使用objectMode选项切换到对象模式,但是将已经存在的流实例转换为对象模式则是不安全的。

Buffering

同时可读可写的流会把数据存储到内部缓冲区,在内部缓冲区可以使用writable._writableState.getBuffer()readable._readableState.buffer来检索。

缓冲的数据量大小设置取决于流构造函数的highWaterMark选项。

普通流,highWaterMark选项指定总字节数。

对象模式运行的流,highWaterMark指定对象的总数。

调用stream.push(chunk),数据被缓存在可读流中。如果流的消费者不调用stream.read(),数据会一直在内部队列中,直到被消耗。

一旦内部缓冲区的总大小达到highWaterMark指定的阈值,流会停止从底层资源读取数据,直到当前的缓冲的数据可以被使用(也就是说,流将停止调用用于填充read缓冲区的内部readable._read()方法)。

writable.write(chunk)方法被重复调用的时候,数据被缓存在可读流中。当内部write缓冲区大小低于highWaterMark阈值时,调用writable.write()会返回true,反之则返回false

streamAPI(特别是stream.pipe()方法)的关键目标是将数据缓冲限制为可接受的水平,使得不同速度的源和目标不会超过可用内存。

由于DuplexTransform流都允许同时可读和可写,每个都保留用于读和写的两个单独的内部缓冲区,并且允许每一方独立于另一方操作,同时保持合适和有效的数据流。

例如net.Socket实例就是Duplex流,它的可读端允许消耗从socket接收的数据,可写端则允许将数据写入socket

因为数据写入到socket可能比接收更快或者更慢,因此write端和read读独立操作是非常重要的。

流消耗者API

几乎所有的Node.js应用程序都以某种方式使用流。比如下面的HTTP服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const http = require('http')
const server = http.createServer((req, res) => {
let body = ''
req.setEncoding('utf8')
req.on('data', (chunk) => {
body += chunk
})
req.on('end', () => {
try {
const data = JSON.parse(body)
res.write(typeof data)
res.end()
} catch {
res.statusCode = 400
return res.end(`error: ${er.message}`)
}
})
})
server.listen(1337)
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token o in JSON at position 1

Writable流,如这个例子里的res,暴露了将数据写入流的方法write()end()

Readable流可以使用EventEmitter API来通知应用程序代码,当数据可以从流中读取的时候,可以从流中以多种方式读取可用数据。

WritableReadable流都以各种方式使用EventEmitter API来交流流的当前状态。

将数据写入数据或从流中消耗数据的应用程序不需要直接实现流接口,通常就不需要调用require('stream')

Writable

常见例子
HTTP requests,客户端
HTTP responses, 服务端
fs写入流
zlib
crypto
TCP sockets
child process stdin
process.stdout, process.stderr

所有WritAble流通过stream.Writable类实现接口定义

WritAble流的特定实例可能以各种方式不同,但所有可写流都遵循相同的基本使用模式,如下例:

1
2
3
4
const myStream = getWritableStreamShomehow()
myStream.write('some data')
myStream.write('some more data')
mystream.end('done writing data')

stream.Writable

Event: ‘close’

当流和任何底层资源关闭时触发。之后不在发生任何事件,并且将不会进一步计算。

Event: ‘drain’

调用stream.write(chunk)返回false时触发。适当的时候将drain事件发送到数据流中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000
write()
function write () {
let ok = true
do {
i--
if (i === 0) {
writer.write(data, encoding, callback)
} else {
ok = writer.write(data, encoding)
}
} while (i > 0 && ok)
if (i > 0) {
writer.once('drain', write)
}
}
}

Event: ‘error’

writeingpipe数据时发生错误时触发。当调用时,监听器回调传递一个错误参数。

P.S. error事件触发时流并不会关闭。

Event: ‘finish’

stream.end()调用后触发。所有数据都会被刷新到底层系统(这句不是很明白)。

1
2
3
4
5
6
7
8
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`)
}
writer.end('This is the end\n')
writer.on('finish', () => {
console.error('All writes are now complete')
})

Event: ‘pipe’

可读流调用stream.pipe()时触发。写入目标集。

1
2
3
4
5
6
7
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', (src) => {
console.error('something is piping into the writer')
assert.equal(src, reader)
})
reader.pipe(writer)

Evnet: ‘unpipe’

stream.unpipe()调用时触发,从目标集中移除此Writable

1
2
3
4
5
6
7
8
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', (src) => {
console.error('Something has stopped piping into the writer.')
assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)

writable.cork()

强制将所有写入的数据缓冲在内存中。调用writable.uncork()stream.end()刷新缓冲区数据。

主要目的是避免在流中写入许多小块数据的情况下,不会在内部缓冲区造成对性能有不理影响的备份。在这种情况下,实现writable._writtev()方法的实现可以以更优化的方式执行缓冲写入。

writable.end()

直接看官方例子,秒懂。

1
2
3
4
5
// write 'hello, ' and then end with 'world!'
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// writing more now is not allowed!

writable.write(chunk[, encoding][, callback])

writable.write()方法将一些数据写入流中,并在数据完全处理后调用提供的回调。

如果发生错误,回调可能会被调用,也可能不会以错误为第一个参数来调用。为了可靠地检测写错误,直接添加一个error事件的监听器是更好的做法。

低于highWaterMark配置时,返回true,否则false时,将进一步尝试写入数据直到触发drain事件。

这一段官方文档非常啰嗦,但总体意思其实就是围绕着触发drain避免内存问题来描述的。

1
2
3
4
5
6
7
8
9
10
function write (data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb)
} else {
process.nextTick(cb)
}
}
write('hello', () => {
console.log('write complated, do more writes now')
})

如果是对象模式的话,将总会忽略encoding这个参数

writable.destroy([error])

销毁流,并发出传递的错误。调用这个方法之后,可写入流已经结束。这个方法不应该被覆盖,可以实现writable._destroy来替代。

短暂小结

看到这部分其实已经花了三个多小时了,一方面是因为原文不太好理解,另一方面确实底层概念有点多,需要花时间理解。立个flag,可读流部分两天后完成。

目前我学习Node.js的模块的流程其实是按照先核心概念(Modules,Events,Stream,Errors),再基本对象(Net,Path…)和基本模块(fs,HTTP,URL,Crypto…)这样的顺序来学习的。这样的一套流程相对比较科学合理,但是会存在断层问题,这个之后等全部系统过完完整一遍之后再进行练习补完。

坚持原创技术分享,您的支持将鼓励我继续创作!