Node.js从零开始-Stream(下)

双工和转换流,这是流的最后一部分了,因为是相对比较底层的概念,跟着文档走的时候总是有点枯燥的。

双工和转换流

stream.Duplex类

常见的双工流例子:

  • TCP sockets
  • zlib streams
  • crypto streams

stream.Transform类

转换流是属于输出和输入相关的双工流。和所有双工流一样,转换流实现了可读和可写的接口。

常见的转换流例子:

  • zlib streams
  • ccrypto streams

transform.destroy([error])

销毁stream,同事发送'error'事件。调用这个方法之后,转换流将释放内部资源。

流API

流模块的API的设计是为了使它可以通过javascript的原型继承模型来轻松实现流。

首先,一个流开发者将声明一个新的javascript类来继承基础的流类(stream.Writable, stream.Readable, stream.Duplex, stream.Transform),确保调用合适的父类构造函数。官方例子:

1
2
3
4
5
6
7
const { Writable } = require('stream')
class MyWritable extends Writable {
constructor(options) {
super(options)
// ...
}
}

这个新的流类然后必须实现一个或多个特定的方法,依赖于流的类型被创建,细节如下表:

例子 实现方法
只读 Readable _read
只写 Writable _write, _writev, _final
读和写 Duplex _read, _write, _writev, _final
操作写过的数据,然后读取 Transform _transform, _flush, _final

注意:流代码的实现不应调用流的公共方法,这是供消费者使用的。否则会导出应用程序的代码消耗流出现副作用。

简例

对于很多简单的例子,不依赖继承也能构建一个流。可以通过直接创建stream.Writablestream.Readablestream.Duplexstream.Transform对象实例来完成,并将适当的方法传递为构造函数选项。如:

1
2
3
4
5
6
const { Writable } = require('stream')
const myWritable = Writable({
write(chunk, encoding, callback) {
// ...
}
})

实现一个可写流

stream.Writable类是可写流的扩展实现。自定义的可写流必须调用new stream.Writable([options])构建函数并且实现writable._write()方法。
writable._writev()方法也同时需要被实现。

构造函数:new stream.Writable([options])

  • options
    • highWaterMark 缓冲区的水平(?)
    • decodeStrings 是否解码
    • objectMode 是否对象模式
    • write stream._write()方法实现
    • destroy stream._destroy()方法实现
    • final stream._final()方法实现

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const { Writable } = require('stream')
class MyWritable extends Writable {
constructor(options) {
// 调用 stream.Writable() 构造函数
super(options)
// ...
}
}

// 或者下面这种pre-ES6方式
const { Writable } = require('steram')
const util = require('util')
function MyWritable(options) {
if (!(this instanceof MyWritable)) {
return new MyWritable(options)
}
Writable.call(this, options)
}
util.inherits(MyWritable, Writable)

// 或者简单构造函数方式
const { Writable } = require('stream)
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
}
})

p.s 讲真,水平太菜,这上面三种方式我只会写最后一种,不过个人认为第二种方式可以无视。

writable._write(chunk, encoding, callback)

所有的可写流实现必须提供一个writable._write()方法,将数据发送给潜在的资源。

callback方法必须给出被调用的信号,不论是完成还是失败的情况。如果调用失败,传递给callback的第一个参数必须是错误对象,如果写成功了,则必须为null

需要注意的是调用writable.write()将发生在writable.write()被调用和callabck被调用使写数据到缓冲区时。一但callback被调用,流将会发出’drain’事件。

如果要一个流能实现多个数据块处理一次,则需要实现writable._writev()方法

如果在构造函数选项中设置了decodeStrings属性,则chunk将会变成字符串类型而不是缓冲类型。并且encoding将表明字符串的字符编码。这是支持对某些字符串数据的编码优化处理。

如果decodeString显式设置为falseencoding参数可以忽略,chunk则会保持相同的对象通过.write()方法。

writable._write()方法有下划线前缀,这意味着这是被定义为内部类,不该被用户程序直接调用。

writable._writev(chunks, callback)

当一个流要实现一次处理多个数据块时需要实现此方法。

上面有提到过,文档果然很详尽和啰嗦。

writable._destroy(err, callback)

callback,一个回调函数,接受一个可选的错误参数,该参数在可写流被销毁时调用。

writable._final(callback)

完成写入任何剩余数据时调用这个函数。

这个可选函数将在流关闭之前调用,延迟finish事件,直到回调函数被调用。这个方法在流结束之前关闭资源或写缓冲数据时很有用。

错误处理

建议在writable._write()writable.writev()方法的处理过程中发生错误时,通过调用callback将错误作为第一个参数传递,这能让可写流发出error事件。

writable._write()中抛出错误,可能会引起意外结果,具体取决于流如何使用。

使用callback则保证错误处理的一致性和可预测性。

1
2
3
4
5
6
7
8
9
10
const { Writable } = require('stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
}
})

可写流实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const { Writable } = require('stream')
class MyWritable extends Writable {
constructor(options) {
super(options)
// ...
}

_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback
}
}
}

偷懒结语

嗯,其实后面还有满多的,包含ReadableDuplexTransform这三种流的实现。

不过文档实在太TM长了。而且内容基本上和Writable一样,基本上过了一次基本概念,那后面的这些也能很快理解。

所以,你懂的,以下省略。

Stream

坚持原创技术分享,您的支持将鼓励我继续创作!