- assert断言
- async_hooks异步钩子
- async_hooks/context异步上下文
- buffer缓冲区
- C++插件
- C/C++插件(使用Node-API)
- C++嵌入器
- child_process子进程
- cluster集群
- CLI命令行
- console控制台
- Corepack核心包
- crypto加密
- crypto/webcrypto网络加密
- debugger调试器
- deprecation弃用
- dgram数据报
- diagnostics_channel诊断通道
- dns域名服务器
- domain域
- Error错误
- events事件触发器
- fs文件系统
- global全局变量
- http超文本传输协议
- http2超文本传输协议2.0
- https安全超文本传输协议
- inspector检查器
- Intl国际化
- module模块
- module/cjsCommonJS模块
- module/esmECMAScript模块
- module/package包模块
- net网络
- os操作系统
- path路径
- perf_hooks性能钩子
- policy安全策略
- process进程
- punycode域名代码
- querystring查询字符串
- readline逐行读取
- repl交互式解释器
- report诊断报告
- stream流
- stream/web网络流
- string_decoder字符串解码器
- test测试
- timers定时器
- tls安全传输层
- trace_events跟踪事件
- tty终端
- url网址
- util实用工具
- v8引擎
- vm虚拟机
- wasi网络汇编系统接口
- worker_threads工作线程
- zlib压缩
Node.js v18.7.0 文档
- Node.js 18.7.0
-
►
目录
- stream 流
- 本文档的组织结构
- 流的类型
- 流消费者的 API
- 可写流
stream.Writable
类'close'
事件'drain'
事件'error'
事件'finish'
事件'pipe'
事件'unpipe'
事件writable.cork()
writable.destroy([error])
writable.closed
writable.destroyed
writable.end([chunk[, encoding]][, callback])
writable.setDefaultEncoding(encoding)
writable.uncork()
writable.writable
writable.writableAborted
writable.writableEnded
writable.writableCorked
writable.errored
writable.writableFinished
writable.writableHighWaterMark
writable.writableLength
writable.writableNeedDrain
writable.writableObjectMode
writable.write(chunk[, encoding][, callback])
- 可读流
- 两种读取模式
- 三种状态
- 选择一种接口风格
stream.Readable
类'close'
事件'data'
事件'end'
事件'error'
事件'pause'
事件'readable'
事件'resume'
事件readable.destroy([error])
readable.closed
readable.destroyed
readable.isPaused()
readable.pause()
readable.pipe(destination[, options])
readable.read([size])
readable.readable
readable.readableAborted
readable.readableDidRead
readable.readableEncoding
readable.readableEnded
readable.errored
readable.readableFlowing
readable.readableHighWaterMark
readable.readableLength
readable.readableObjectMode
readable.resume()
readable.setEncoding(encoding)
readable.unpipe([destination])
readable.unshift(chunk[, encoding])
readable.wrap(stream)
readable[Symbol.asyncIterator]()
readable.iterator([options])
readable.map(fn[, options])
readable.filter(fn[, options])
readable.forEach(fn[, options])
readable.toArray([options])
readable.some(fn[, options])
readable.find(fn[, options])
readable.every(fn[, options])
readable.flatMap(fn[, options])
readable.drop(limit[, options])
readable.take(limit[, options])
readable.asIndexedPairs([options])
readable.reduce(fn[, initial[, options]])
- 双工流与转换流
stream.finished(stream[, options], callback)
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
stream.compose(...streams)
stream.Readable.from(iterable[, options])
stream.Readable.fromWeb(readableStream[, options])
stream.Readable.isDisturbed(stream)
stream.isErrored(stream)
stream.isReadable(stream)
stream.Readable.toWeb(streamReadable[, options])
stream.Writable.fromWeb(writableStream[, options])
stream.Writable.toWeb(streamWritable)
stream.Duplex.from(src)
stream.Duplex.fromWeb(pair[, options])
stream.Duplex.toWeb(streamDuplex)
stream.addAbortSignal(signal, stream)
- 可写流
- 流实现者的 API
- 其他注意事项
- stream 流
-
►
索引
- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- Corepack 核心包
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS模块
- module/esm ECMAScript模块
- module/package 包模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- policy 安全策略
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- stream 流
- stream/web 网络流
- string_decoder 字符串解码器
- test 测试
- timers 定时器
- tls 安全传输层
- trace_events 跟踪事件
- tty 终端
- url 网址
- util 实用工具
- v8 引擎
- vm 虚拟机
- wasi 网络汇编系统接口
- worker_threads 工作线程
- zlib 压缩
- ► 其他版本
- 文档搜索
- 会员登录
目录
- stream 流
- 本文档的组织结构
- 流的类型
- 流消费者的 API
- 可写流
stream.Writable
类'close'
事件'drain'
事件'error'
事件'finish'
事件'pipe'
事件'unpipe'
事件writable.cork()
writable.destroy([error])
writable.closed
writable.destroyed
writable.end([chunk[, encoding]][, callback])
writable.setDefaultEncoding(encoding)
writable.uncork()
writable.writable
writable.writableAborted
writable.writableEnded
writable.writableCorked
writable.errored
writable.writableFinished
writable.writableHighWaterMark
writable.writableLength
writable.writableNeedDrain
writable.writableObjectMode
writable.write(chunk[, encoding][, callback])
- 可读流
- 两种读取模式
- 三种状态
- 选择一种接口风格
stream.Readable
类'close'
事件'data'
事件'end'
事件'error'
事件'pause'
事件'readable'
事件'resume'
事件readable.destroy([error])
readable.closed
readable.destroyed
readable.isPaused()
readable.pause()
readable.pipe(destination[, options])
readable.read([size])
readable.readable
readable.readableAborted
readable.readableDidRead
readable.readableEncoding
readable.readableEnded
readable.errored
readable.readableFlowing
readable.readableHighWaterMark
readable.readableLength
readable.readableObjectMode
readable.resume()
readable.setEncoding(encoding)
readable.unpipe([destination])
readable.unshift(chunk[, encoding])
readable.wrap(stream)
readable[Symbol.asyncIterator]()
readable.iterator([options])
readable.map(fn[, options])
readable.filter(fn[, options])
readable.forEach(fn[, options])
readable.toArray([options])
readable.some(fn[, options])
readable.find(fn[, options])
readable.every(fn[, options])
readable.flatMap(fn[, options])
readable.drop(limit[, options])
readable.take(limit[, options])
readable.asIndexedPairs([options])
readable.reduce(fn[, initial[, options]])
- 双工流与转换流
stream.finished(stream[, options], callback)
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
stream.compose(...streams)
stream.Readable.from(iterable[, options])
stream.Readable.fromWeb(readableStream[, options])
stream.Readable.isDisturbed(stream)
stream.isErrored(stream)
stream.isReadable(stream)
stream.Readable.toWeb(streamReadable[, options])
stream.Writable.fromWeb(writableStream[, options])
stream.Writable.toWeb(streamWritable)
stream.Duplex.from(src)
stream.Duplex.fromWeb(pair[, options])
stream.Duplex.toWeb(streamDuplex)
stream.addAbortSignal(signal, stream)
- 可写流
- 流实现者的 API
- 其他注意事项
stream 流#
流是用于在 Node.js 中处理流数据的抽象接口。
node:stream
模块提供了用于实现流接口的 API。
本文档的组织结构#
本文档包含两个主要章节和第三章节的注意事项。 第一章节描述了如何在应用程序中使用现有的流。 第二章节描述了如何创建新类型的流。
流的类型#
Node.js 中有四种基本的流类型:
流的 Promise API#
stream/promises
API 为返回 Promise
对象(而不是使用回调)的流提供了一组替代的异步实用函数。
API 可通过 require('node:stream/promises')
或 require('node:stream').promises
访问。
对象模式#
Node.js API 创建的所有流都只对字符串和 Buffer
(或 Uint8Array
)对象进行操作。
但是,流的实现可以使用其他类型的 JavaScript 值(除了 null
,它在流中具有特殊用途)。
这样的流被认为是在"对象模式"下运行的。
缓冲#
流消费者的 API#
几乎所有的 Node.js 应用程序,无论多么简单,都以某种方式使用流。 以下是在实现 HTTP 服务器的 Node.js 应用程序中使用流的示例:
可写流#
可写流是数据写入目标的抽象。
stream.Writable
类#
'close'
事件#
当流及其任何底层资源(例如文件描述符)已关闭时,则会触发 'close'
事件。
该事件表明将不再触发更多事件,并且不会发生进一步的计算。
'drain'
事件#
如果对 stream.write(chunk)
的调用返回 false
,则 'drain'
事件将在适合继续将数据写入流时触发。
'error'
事件#
如果在写入或管道数据时发生错误,则会触发 'error'
事件。
监听器回调在调用时传入单个 Error
参数。
'finish'
事件#
在调用 stream.end()
方法之后,并且所有数据都已刷新到底层系统,则触发 'finish'
事件。
'pipe'
事件#
当在可读流上调用 stream.pipe()
方法将此可写流添加到其目标集时,则触发 'pipe'
事件。
'unpipe'
事件#
当在 Readable
流上调用 stream.unpipe()
方法时,则会触发 'unpipe'
事件,从其目标集合中删除此 Writable
。
writable.cork()
#
writable.cork()
方法强制所有写入的数据都缓存在内存中。
当调用 stream.uncork()
或 stream.end()
方法时,缓冲的数据将被刷新。
writable.destroy([error])
#
销毁流
可选地触发 'error'
事件,并且触发 'close'
事件(除非 emitClose
设置为 false
)。
在此调用之后,则可写流已结束,随后对 write()
或 end()
的调用将导致 ERR_STREAM_DESTROYED
错误。
这是销毁流的破坏性和直接的方式。
先前对 write()
的调用可能没有排空,并且可能触发 ERR_STREAM_DESTROYED
错误。
如果数据应该在关闭之前刷新,或者在销毁流之前等待 'drain'
事件,则使用 end()
而不是销毁。
writable.closed
#
触发 'close'
之后为 true
。
writable.destroyed
#
在调用 writable.destroy()
之后是 true
。
writable.end([chunk[, encoding]][, callback])
#
调用 writable.end()
方法表示不再有数据写入 Writable
。
可选的 chunk
和 encoding
参数允许在关闭流之前立即写入最后一个额外的数据块。
writable.setDefaultEncoding(encoding)
#
writable.setDefaultEncoding()
方法为 Writable
流设置默认的 encoding
。
writable.uncork()
#
writable.uncork()
方法会刷新自调用 stream.cork()
以来缓冲的所有数据。
writable.writable
#
如果调用 writable.write()
是安全的,则为 true
,这意味着流没有被销毁、出错或结束。
writable.writableAborted
#
返回在触发 'finish'
之前流是被破销毁或出错。
writable.writableEnded
#
在调用 writable.end()
之后是 true
。
此属性不指示数据是否已刷新,为此则使用 writable.writableFinished
代替。
writable.writableCorked
#
需要调用 writable.uncork()
以完全解开流的次数。
writable.errored
#
如果流因错误而被销毁,则返回错误。
writable.writableFinished
#
在触发 'finish'
事件之前立即设置为 true
。
writable.writableHighWaterMark
#
返回创建此 Writable
时传入的 highWaterMark
的值。
writable.writableLength
#
此属性包含队列中准备写入的字节数(或对象数)。
该值提供有关 highWaterMark
状态的内省数据。
writable.writableNeedDrain
#
如果流的缓冲区已满并且流将触发 'drain'
,则为 true
。
writable.writableObjectMode
#
给定 Writable
流的属性 objectMode
的获取器。
writable.write(chunk[, encoding][, callback])
#
writable.write()
方法将一些数据写入流,并在数据完全处理后调用提供的 callback
。
如果发生错误,则 callback
将使用错误作为其第一个参数进行调用。
callback
是异步地调用,并且在 'error'
触发之前。
可读流#
可读流是对被消费的数据的来源的抽象。
两种读取模式#
Readable
流以两种模式之一有效地运行:流动和暂停。
这些模式与对象模式是分开的。
Readable
流可以处于或不处于对象模式,无论其是处于流动模式还是暂停模式。
三种状态#
Readable
流的操作的"两种模式"是对 Readable
流实现中发生的更复杂的内部状态管理的简化抽象。
选择一种接口风格#
Readable
流的 API 跨越多个 Node.js 版本的演进,并提供了多种消费流数据的方法。
一般情况下,开发者应该选择其中一种消费数据的方式,切忌使用多种方式消费单一流中的数据。
具体来说,使用 on('data')
、on('readable')
、pipe()
或异步迭代器的组合可能会导致不直观的行为。
stream.Readable
类#
'close'
事件#
当流及其任何底层资源(例如文件描述符)已关闭时,则会触发 'close'
事件。
该事件表明将不再触发更多事件,并且不会发生进一步的计算。
'data'
事件#
每当流将数据块的所有权移交给消费者时,则会触发 'data'
事件。
每当通过调用 readable.pipe()
、readable.resume()
、或通过将监听器回调绑定到 'data'
事件而将流切换到流动模式时,就会发生这种情况。
每当调用 readable.read()
方法并且可以返回数据块时,也会触发 'data'
事件。
'end'
事件#
当流中没有更多数据可供消费时,则会触发 'end'
事件。
'error'
事件#
'error'
事件可以随时由 Readable
的实现触发。
通常,如果底层流由于底层内部故障而无法生成数据,或者当流实现尝试推送无效数据块时,可能会发生这种情况。
'pause'
事件#
当调用 stream.pause()
并且 readableFlowing
不是 false
时,则会触发 'pause'
事件。
'readable'
事件#
当有可从流中读取的数据或已到达流的末尾时,则将触发 'readable'
事件。
实际上,'readable'
事件表明流有新的信息。
如果数据可用,则 stream.read()
将返回该数据。
'resume'
事件#
当调用 stream.resume()
并且 readableFlowing
不是 true
时,则会触发 'resume'
事件。
readable.destroy([error])
#
销毁流
可选地触发 'error'
事件,并且触发 'close'
事件(除非 emitClose
设置为 false
)。
在此调用之后,可读流将释放任何内部资源,随后对 push()
的调用将被忽略。
readable.closed
#
触发 'close'
之后为 true
。
readable.destroyed
#
在调用 readable.destroy()
之后是 true
。
readable.isPaused()
#
readable.isPaused()
方法返回 Readable
的当前运行状态。
这主要由作为 readable.pipe()
方法基础的机制使用。
在大多数典型情况下,没有理由直接使用此方法。
readable.pause()
#
readable.pause()
方法将导致处于流动模式的流停止触发 'data'
事件,切换出流动模式。
任何可用的数据都将保留在内部缓冲区中。
readable.pipe(destination[, options])
#
readable.pipe()
方法将 Writable
流绑定到 readable
,使其自动切换到流动模式并将其所有数据推送到绑定的 Writable
。
数据流将被自动管理,以便目标 Writable
流不会被更快的 Readable
流漫过。
readable.read([size])
#
readable.read()
方法从内部缓冲区中读取数据并返回。
如果没有数据可以读取,则返回 null
。
默认情况下,除非使用 readable.setEncoding()
方法指定了编码或流在对象模式下运行,否则数据将作为 Buffer
对象返回。
readable.readable
#
如果调用 readable.read()
是安全的,则为 true
,这意味着流尚未被销毁或触发 'error'
或 'end'
。
readable.readableAborted
#
返回在触发 'end'
之前流是被破销毁或出错。
readable.readableDidRead
#
返回是否已触发 'data'
。
readable.readableEncoding
#
给定 Readable
流的属性 encoding
的获取器。
可以使用 readable.setEncoding()
方法设置 encoding
属性。
readable.readableEnded
#
当触发 'end'
事件时变为 true
。
readable.errored
#
如果流因错误而被销毁,则返回错误。
readable.readableFlowing
#
此属性反映了 Readable
流的当前状态,如三种状态章节所述。
readable.readableHighWaterMark
#
返回创建此 Readable
时传入的 highWaterMark
的值。
readable.readableLength
#
此属性包含队列中准备读取的字节数(或对象数)。
该值提供有关 highWaterMark
状态的内省数据。
readable.readableObjectMode
#
给定 Readable
流的属性 objectMode
的获取器。
readable.resume()
#
readable.resume()
方法使被显式暂停的 Readable
流恢复触发 'data'
事件,将流切换到流动模式。
readable.setEncoding(encoding)
#
readable.setEncoding()
方法为从 Readable
流读取的数据设置字符编码。
readable.unpipe([destination])
#
readable.unpipe()
方法分离先前使用 stream.pipe()
方法绑定的 Writable
流。
readable.unshift(chunk[, encoding])
#
将 chunk
作为 null
传入信号流结束 (EOF),其行为与 readable.push(null)
相同,之后无法写入更多数据。
EOF 信号放在缓冲区的末尾,任何缓冲的数据仍将被刷新。
readable.wrap(stream)
#
在 Node.js 0.10 之前,流没有实现当前定义的整个 node:stream
模块 API。
(有关更多信息,请参阅兼容性。)
readable[Symbol.asyncIterator]()
#
如果循环以 break
、return
或 throw
终止,则流将被销毁。
换句话说,遍历流将完全消费流。
流将以大小等于 highWaterMark
选项的块读取。
在上面的代码示例中,如果文件的数据少于 64 KiB,则数据将位于单个块中,因为没有为 fs.createReadStream()
提供 highWaterMark
选项。
readable.iterator([options])
#
如果 for await...of
循环由 return
、break
或 throw
退出,或者如果流在迭代期间发出错误,迭代器是否应该销毁流,则此方法创建的迭代器为用户提供了取消流销毁的选项。
readable.map(fn[, options])
#
此方法允许映射流。
将为流中的每个块调用 fn
函数。
如果 fn
函数返回 promise,则该 promise 将在被传到结果流之前被 await
。
readable.filter(fn[, options])
#
此方法允许过滤流。
对于流中的每个块,将调用 fn
函数,如果它返回真值,则该块将被传给结果流。
如果 fn
函数返回 promise,则该 Promise 将被 await
。
readable.forEach(fn[, options])
#
此方法允许迭代流。
对于流中的每个块,将调用 fn
函数。
如果 fn
函数返回 promise,则该 Promise 将被 await
。
readable.toArray([options])
#
此方法可以轻松获取流的内容。
readable.some(fn[, options])
#
此方法类似于 Array.prototype.some
,并在流中的每个块上调用 fn
,直到等待的返回值为 true
(或任何真值)。
一旦对块等待返回值的 fn
调用为真,则流将被销毁,并使用 true
履行 promise。
如果对块的 fn
调用都没有返回真值,则 promise 使用 false
履行。
readable.find(fn[, options])
#
此方法类似于 Array.prototype.find
,并在流中的每个块上调用 fn
以查找具有 fn
的真值的块。
一旦 fn
调用的等待返回值为真值,则流就会被销毁,并且 promise 使用 fn
返回真值的值来履行。
如果对块的所有 fn
调用都返回非真值,则 promise 使用 undefined
履行。
readable.every(fn[, options])
#
此方法类似于 Array.prototype.every
,并在流中的每个块上调用 fn
以检查所有等待的返回值是否为 fn
的真值。
一旦对块等待返回值的 fn
调用是非真的,则流就会被销毁,并且 promise 会使用 false
履行
如果对块的所有 fn
调用都返回真值,则该 promise 使用 true
履行。
readable.flatMap(fn[, options])
#
此方法通过将给定的回调应用到流的每个块然后展平结果来返回新的流。
readable.drop(limit[, options])
#
此方法返回新的流,其前 limit
个块被丢弃。
readable.take(limit[, options])
#
此方法返回带有前 limit
个块的新流。
readable.asIndexedPairs([options])
#
此方法返回新的流,其中包含与 [index, chunk]
形式的计数器配对的底层流块。
第一个索引值为 0,每产生一个块,则增加 1。
readable.reduce(fn[, initial[, options]])
#
此方法按顺序在流的每个块上调用 fn
,并将对前一个元素的计算结果传给它。
它返回减数最终值的 promise。
双工流与转换流#
stream.Duplex
类#
双工流是同时实现 Readable
和 Writable
接口的流。
duplex.allowHalfOpen
#
如果为 false
,则当可读端结束时,流将自动结束可写端。
最初由 allowHalfOpen
构造函数选项设置,默认为 true
。
stream.Transform
类#
转换流是 Duplex
流,其中输出以某种方式与输入相关。
与所有 Duplex
流一样,Transform
流实现了 Readable
和 Writable
接口。
transform.destroy([error])
#
销毁流,并可选择地触发 'error'
事件。
在此调用之后,转换流将释放任何内部资源。
实现者不应覆盖此方法,而应实现 readable._destroy()
。
Transform
的 _destroy()
的默认实现也会触发 'close'
,除非 emitClose
设置为 false。
stream.finished(stream[, options], callback)
#
当流不再可读、可写或遇到错误或过早关闭事件时获得通知的函数。
stream.pipeline(source[, ...transforms], destination, callback)
#
stream.pipeline(streams, callback)
#
模块方法,用于在流和生成器之间进行管道转发错误并正确清理并在管道完成时提供回调。
stream.compose(...streams)
#
将两个或多个流组合成一个 Duplex
流,其写入第一个流并从最后一个流读取。
每个提供的流都通过管道传输到下一个,使用 stream.pipeline
。
如果任何流错误,则所有流都将被销毁,包括外部的 Duplex
流。
stream.Readable.from(iterable[, options])
#
一个从迭代器中创建可读流的实用方法。
stream.Readable.fromWeb(readableStream[, options])
#
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
- 返回: <stream.Readable>
stream.Readable.isDisturbed(stream)
#
返回流是否已被读取或取消。
stream.isErrored(stream)
#
返回流是否遇到错误。
stream.isReadable(stream)
#
返回流是否可读。
stream.Readable.toWeb(streamReadable[, options])
#
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number>size
<Function>
- 返回: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
#
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
- 返回: <stream.Writable>
stream.Writable.toWeb(streamWritable)
#
streamWritable
<stream.Writable>- 返回: <WritableStream>
stream.Duplex.from(src)
#
创建双工流的实用方法。
stream.Duplex.fromWeb(pair[, options])
#
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>- 返回: <stream.Duplex>
import { Duplex } from 'node:stream';
import {
ReadableStream,
WritableStream
} from 'node:stream/web';
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world');
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk);
}
});
const pair = {
readable,
writable
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
for await (const chunk of duplex) {
console.log('readable', chunk);
}
const { Duplex } = require('node:stream');
const {
ReadableStream,
WritableStream
} = require('node:stream/web');
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world');
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk);
}
});
const pair = {
readable,
writable
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
duplex.once('readable', () => console.log('readable', duplex.read()));
stream.Duplex.toWeb(streamDuplex)
#
streamDuplex
<stream.Duplex>- 返回: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream';
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
}
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
const { value } = await readable.getReader().read();
console.log('readable', value);
const { Duplex } = require('node:stream');
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
}
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
readable.getReader().read().then((result) => {
console.log('readable', result.value);
});
stream.addAbortSignal(signal, stream)
#
将中止信号绑定到可读或可写的流。
这让代码可以使用 AbortController
来控制流销毁。
流实现者的 API#
node:stream
模块 API 旨在使使用 JavaScript 的原型继承模型轻松实现流成为可能。
简单的实现#
对于许多简单的情况,可以在不依赖继承的情况下创建流。
这可以通过直接创建 stream.Writable
、stream.Readable
、stream.Duplex
或 stream.Transform
对象的实例并传入适当的方法作为构造函数选项来实现。
实现可写流#
stream.Writable
类被扩展以实现 Writable
流。
new stream.Writable([options])
#
或者,当使用 ES6 之前的风格构造函数时:
writable._construct(callback)
#
不得直接调用 _construct()
方法。
它可以由子类实现,如果是,则只能由内部 Writable
类方法调用。
writable._write(chunk, encoding, callback)
#
所有 Writable
流实现都必须提供 writable._write()
和/或 writable._writev()
方法来将数据发送到底层资源。
writable._writev(chunks, callback)
#
此函数不得由应用程序代码直接调用。
它应该由子类实现,并且只能由内部 Writable
类方法调用。
writable._destroy(err, callback)
#
_destroy()
方法由 writable.destroy()
调用。
它可以被子类覆盖,但不能直接调用。
此外,callback
不应与 async/await 混合,一旦它在 promise 被解决时执行。
writable._final(callback)
#
不得直接调用 _final()
方法。
它可以由子类实现,如果是,则只能由内部 Writable
类方法调用。
写入时出错#
在 writable._write()
、writable._writev()
和 writable._final()
方法的处理过程中发生的错误必须通过调用回调并将错误作为第一个参数传入来传播。
从这些方法中抛出 Error
或手动触发 'error'
事件会导致未定义的行为。
可写流的示例#
下面说明了一个相当简单(有些毫无意义)的自定义 Writable
流的实现。
虽然这个特定的 Writable
流实例没有任何真正的特殊用途,但该示例说明了自定义 Writable
流实例的每个必需元素:
在可写流中解码缓冲区#
解码缓冲区是一项常见任务,例如,在使用输入为字符串的转换器时。
在使用多字节字符编码(例如 UTF-8)时,这不是一个简单的过程。
以下示例显示如何使用 StringDecoder
和 Writable
解码多字节字符串。
实现可读流#
stream.Readable
类被扩展以实现 Readable
流。
new stream.Readable([options])
#
或者,当使用 ES6 之前的风格构造函数时:
readable._construct(callback)
#
不得直接调用 _construct()
方法。
它可以由子类实现,如果是,则只能由内部 Readable
类方法调用。
readable._read(size)
#
此函数不得由应用程序代码直接调用。
它应该由子类实现,并且只能由内部 Readable
类方法调用。
readable._destroy(err, callback)
#
_destroy()
方法由 readable.destroy()
调用。
它可以被子类覆盖,但不能直接调用。
readable.push(chunk[, encoding])
#
当 chunk
为 Buffer
、Uint8Array
或 string
时,数据的 chunk
将被添加到内部队列中供流的用户消费。
将 chunk
作为 null
传递信号表示流结束 (EOF),之后不能再写入数据。
读取时出错#
readable._read()
处理过程中发生的错误必须通过 readable.destroy(err)
方法传播。
从 readable._read()
中抛出 Error
或手动触发 'error'
事件会导致未定义的行为。
可读流的示例#
下面是一个 Readable
流的基本示例,它按升序触发从 1 到 1,000,000 的数字,然后结束。
实现双工流#
Duplex
流是同时实现 Readable
和 Writable
的流,例如 TCP 套接字连接。
new stream.Duplex(options)
#
或者,当使用 ES6 之前的风格构造函数时:
双工流的例子#
下面说明了一个简单的 Duplex
流的示例,它封装了一个假设的低层源对象,可以向其中写入数据,也可以从中读取数据,尽管使用的 API 与 Node.js 流不兼容。
下面说明了一个简单的 Duplex
流的示例,它缓冲通过 Writable
接口传入的写入数据,然后通过 Readable
接口读回。
对象模式的双工流#
对于 Duplex
流,可以分别使用 readableObjectMode
和 writableObjectMode
选项为 Readable
或 Writable
侧专门设置 objectMode
。
实现转换流#
Transform
流是 Duplex
流,其中输出以某种方式从输入计算。
示例包括压缩、加密、或解密数据的压缩流或加密流。
new stream.Transform([options])
#
或者,当使用 ES6 之前的风格构造函数时:
'end'
事件#
'end'
事件来自 stream.Readable
类。
'end'
事件在所有数据输出后触发,该事件发生在调用 transform._flush()
中的回调之后。
在出现错误的情况下,不应触发 'end'
。
'finish'
事件#
'finish'
事件来自 stream.Writable
类。
'finish'
事件在调用 stream.end()
并且所有块都已被 stream._transform()
处理后触发。
在出现错误的情况下,不应触发 'finish'
。
transform._flush(callback)
#
此函数不得由应用程序代码直接调用。
它应该由子类实现,并且只能由内部 Readable
类方法调用。
transform._transform(chunk, encoding, callback)
#
此函数不得由应用程序代码直接调用。
它应该由子类实现,并且只能由内部 Readable
类方法调用。
stream.PassThrough
类#
stream.PassThrough
类是 Transform
流的简单实现,它只是将输入字节传到输出。
它的目的主要是用于示例和测试,但在某些用例中,stream.PassThrough
可用作新型流的构建块。
其他注意事项#
流与异步生成器和异步迭代器的兼容性#
在 JavaScript 中异步生成器和迭代器的支持下,异步生成器在这一点上实际上是一流的语言级流构造。
使用异步迭代器消费可读流#
异步迭代器在流上注册一个永久的错误句柄,以防止任何未处理的销毁后错误。
使用异步生成器创建可读流#
可以使用 Readable.from()
实用方法从异步生成器创建 Node.js 可读流:
从异步迭代器管道到可写流#
当从异步迭代器写入可写流时,确保正确处理背压和错误。
stream.pipeline()
抽象了背压和背压相关错误的处理:
兼容旧版本的 Node.js#
在 Node.js 0.10 之前,Readable
流接口更简单,但功能更弱,实用性也更低。
readable.read(0)
#
在某些情况下,需要触发底层可读流机制的刷新,而不实际消耗任何数据。
在这种情况下,可以调用 readable.read(0)
,它总是返回 null
。
readable.push('')
#
不推荐使用 readable.push('')
。
调用 `readable.setEncoding()` 之后 `highWaterMark` 的差异#
readable.setEncoding()
的使用将改变 highWaterMark
在非对象模式下的操作方式。