Node.js 设计模式笔记 —— Streams 流编程

Streams 是 Node.js 的组件和模式中最重要的几个之一。在 Node.js 这类基于 event 的平台上,最高效的实时地处理 I/O 的方式,就是当有输入时就立即接收数据,应用产生输出时就立即发送数据。

Buffering vs streaming

对于输入数据的处理,buffer 模式会将来自资源的所有数据收集到 buffer 中,待操作完成再将数据作为单一的 blob of data 传递给调用者;相反地,streams 允许我们一旦接收到数据就立即对其进行处理。
单从效率上说,streams 在空间(内存使用)和时间(CPU 时钟)的使用上都更加高效。此外 Node.js 中的 streams 还有另一个重要的优势:组合性

空间效率

使用 buffered API 完成 Gzip 压缩:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import {promises as fs} from 'fs'
import {gzip} from 'zlib'
import {promisify} from 'util'

const gzipPromise = promisify(gzip)
const filename = process.argv[2]

async function main() {
const data = await fs.readFile(filename)
const gzippedData = await gzipPromise(data)
await fs.writeFile(`${filename}.gz`, gzippedData)
console.log('File successfully compressed')
}

main()

node gzip-buffer.js <path to file>

如果我们使用上述代码压缩一个足够大的文件(比如说 8G),我们很有可能会收到一个错误信息,类似文件大小超过了允许的最大 buffer 大小。

1
2
RangeError [ERR_FS_FILE_TOO_LARGE]: File size (8130792448) is greater
than possible Buffer: 2147483647 bytes

即便没有超过 V8 的 buffer 大小限制,也有可能出现物理内存不够用的情况。

使用 streams 实现 Gzip 压缩:

1
2
3
4
5
6
7
8
9
import {createReadStream, createWriteStream} from 'fs'
import {createGzip} from 'zlib'

const filename = process.argv[2]

createReadStream(filename)
.pipe(createGzip())
.pipe(createWriteStream(`${filename}.gz`))
.on('finish', () => console.log('File successfully compressed'))

streams 的优势来自于其接口和可组合性,允许我们实现干净、优雅、简洁的代码。对于此处的示例,它可以对任意大小的文件进行压缩,只需要消耗常量的内存。

时间效率

假设我们需要创建一个应用,能够压缩一个文件并将其上传到一个远程的 HTTP 服务器。而服务器端则负责将接收到的文件解压缩并保存。
如果我们使用 buffer API 实现客户端组件,则只有当整个文件读取和压缩完成之后,上传操作才开始触发。同时在服务器端,也只有当所有数据都接收完毕之后才开始解压缩操作。

更好一些的方案是使用 streams。在客户端,streams 允许我们以 chunk 为单位从文件系统逐个、分段地读取数据,并立即进行压缩和发送。同时在服务器端,每个 chunk 被接收到后会立即进行解压缩。

服务端程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import {createServer} from 'http'
import {createWriteStream} from 'fs'
import {createGunzip} from 'zlib'
import {basename, join} from 'path'

const server = createServer((req, res) => {
const filename = basename(req.headers['x-filename'])
const destFilename = join('received_files', filename)
console.log(`File request received: ${filename}`)
req
.pipe(createGunzip())
.pipe(createWriteStream(destFilename))
.on('finish', () => {
res.writeHead(201, {'Content-Type': 'text/plain'})
res.end('OK\n')
console.log(`File saved: ${destFilename}`)
})
})

server.listen(3000, () => console.log('Listening on http://localhost:3000'))

客户端程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import {request} from 'http'
import {createGzip} from 'zlib'
import {createReadStream} from 'fs'
import {basename} from 'path'

const filename = process.argv[2]
const serverHost = process.argv[3]

const httpRequestOptions = {
hostname: serverHost,
port: 3000,
path: '/',
method: 'PUT',
headers: {
'Content-Type': 'application/octet-stream',
'Content-Encoding': 'gzip',
'X-Filename': basename(filename)
}
}

const req = request(httpRequestOptions, (res) => {
console.log(`Server response: ${res.statusCode}`)
})

createReadStream(filename)
.pipe(createGzip())
.pipe(req)
.on('finish', () => {
console.log('File successfully sent')
})

mkdir received_files
node gzip-receive.js
node gzip-send.js <path to file> localhost

借助 streams,整套流程的流水线在我们接收到第一个数据块的时候就开始启动了,完全不需要等待整个文件被读取。除此之外,下一个数据块能够被读取时,不需要等到之前的任务完成就能被处理。即另一条流水线被并行地被装配执行,Node.js 可以将这些异步的任务并行化地执行。只需要保证数据块最终的顺序是固定的,而 Node.js 中 streams 的内部实现机制保证了这一点。

组合性

借助于 pipe() 方法,不同的 stream 能够被组合在一起。每个处理单元负责各自的单一功能,最终被 pipe() 连接起来。因为 streams 拥有统一的接口,它们彼此之间在 API 层面是互通的。只需要 pipeline 支持前一个 stream 生成的数据类型(可以是二进制、纯文本甚至对象等)。

客户端加密
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import {createCipheriv, randomBytes} from 'crypto'
import {request} from 'http'
import {createGzip} from 'zlib'
import {createReadStream} from 'fs'
import {basename} from 'path'

const filename = process.argv[2]
const serverHost = process.argv[3]
const secret = Buffer.from(process.argv[4], 'hex')
const iv = randomBytes(16)

const httpRequestOptions = {
hostname: serverHost,
port: 3000,
path: '/',
method: 'PUT',
headers: {
'Content-Type': 'application/octet-stream',
'Content-Encoding': 'gzip',
'X-Filename': basename(filename),
'X-Initialization-Vector': iv.toString('hex')
}
}

const req = request(httpRequestOptions, (res) => {
console.log(`Server response: ${res.statusCode}`)
})

createReadStream(filename)
.pipe(createGzip())
.pipe(createCipheriv('aes192', secret, iv))
.pipe(req)
.on('finish', () => {
console.log('File successfully sent')
})
服务端加密
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import {createServer} from 'http'
import {createWriteStream} from 'fs'
import {createGunzip} from 'zlib'
import {basename, join} from 'path'
import {createDecipheriv, randomBytes} from 'crypto'

const secret = randomBytes(24)
console.log(`Generated secret: ${secret.toString('hex')}`)

const server = createServer((req, res) => {
const filename = basename(req.headers['x-filename'])
const iv = Buffer.from(
req.headers['x-initialization-vector'], 'hex'
)
const destFilename = join('received_files', filename)
console.log(`File request received: ${filename}`)
req
.pipe(createDecipheriv('aes192', secret, iv))
.pipe(createGunzip())
.pipe(createWriteStream(destFilename))
.on('finish', () => {
res.writeHead(201, {'Content-Type': 'text/plain'})
res.end('OK\n')
console.log(`File saved: ${destFilename}`)
})
})

server.listen(3000, () => console.log('Listening on http://localhost:3000'))

Streams 详解

实际上在 Node.js 中的任何地方都可见到 streams。比如核心模块 fs 有 createReadStream() 方法用来读取文件内容,createWriteStream() 方法用来向文件写入数据;HTTP requestresponse 对象本质上也是 stream;zlib 模块允许我们通过流接口压缩和解压缩数据;甚至 crypto 模块也提供了一些有用的流函数比如 createCipherivcreateDecipheriv

streams 的结构

Node.js 中的每一个 stream 对象,都是对以下四种虚拟基类里任意一种的实现,这四个虚拟类都属于 stream 核心模块:

  • Readable
  • Writable
  • Duplex
  • Transform

每一个 stream 类同时也是 EventEmitter 的实例,实际上 Streams 可以生成几种类型的 event。比如当一个 Readable 流读取完毕时触发 end 事件,Writable 流吸入完毕时触发 finish 事件,或者当任意错误发生时抛出 error

Steams 之所以足够灵活,一个重要的原因就是它们不仅仅能够处理 binary data,还支持几乎任意的 JavaScript 值。实际上 streams 有以下两种操作模式:

  • Binary mode:以 chunk 的形式(比如 buffers 或 strings)传输数据
  • Object mode:通过由独立对象(可以包含任意 JavaScript 值)组成的序列传输数据

上述两种模式使得我们不仅仅可以利用 streams 处理 I/O 操作,还能够帮助我们以函数式的方式将多个处理单元优雅地组合起来。

从 Readable streams 读取数据

non-flowing mode

默认模式。readable 事件表示有新的数据可供读取,再通过 read() 方法同步地从内部 buffer 读取数据,返回一个 Buffer 对象。
即从 stream 按需拉取数据。当 stream 以 Binary 模式工作时,我们还可以给 read() 方法指定一个 size 值,以读取特定数量的数据。

1
2
3
4
5
6
7
8
9
10
11
process.stdin
.on('readable', () => {
let chunk
console.log('New data available')
while ((chunk = process.stdin.read()) !== null) {
console.log(
`Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
)
}
})
.on('end', () => console.log('End of stream'))

flowing mode

此模式下,数据并不会像之前那样通过 read() 方法拉取,而是一旦有数据可用,就主动推送给 data 事件的 listener。flowing 模式对于数据流的控制,相对而言灵活性较低一些。
由于默认是 non-flowing 模式,为了使用 flowing 模式,需要绑定一个 listener 给 data 事件或者显式地调用 resume() 方法。调用 pause() 方法会导致 stream 暂时停止发送 data 事件,任何传入的数据会先被缓存到内部 buffer。即 stream 又切换回 non-flowing 模式。

1
2
3
4
5
6
7
8
9
10
11
process.stdin
.on('readable', () => {
let chunk
console.log('New data available')
while ((chunk = process.stdin.read()) !== null) {
console.log(
`Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
)
}
})
.on('end', () => console.log('End of stream'))

Async iterators

Readable 流同时也是 async iterators。

1
2
3
4
5
6
7
8
9
10
11
async function main() {
for await (const chunk of process.stdin) {
console.log('New data available')
console.log(
`Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
)
}
console.log('End of stream')
}

main()

实现 Readable streams

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import {Readable} from 'stream'
import Chance from 'chance'

const chance = Chance()

export class RandomStream extends Readable {
constructor(options) {
super(options)
this.emittedBytes = 0
}

_read(size) {
const chunk = chance.string({length: size})
this.push(chunk, 'utf8')
this.emittedBytes += chunk.length
if (chance.bool({likelihood: 5})) {
this.push(null)
}
}
}

const randomStream = new RandomStream()
randomStream
.on('data', (chunk) => {
console.log(`Chunk received (${chunk.length} bytes): ${chunk.toString()}`)
})

为了实现一个自定义的 Readable stream,首先必须创建一个新的类,该类继承自 stream 模块中的 Readable。其次新创建的类中必须包含 _read() 方法的实现。
上面代码中的 _read() 方法做了以下几件事:

  • 借助第三方的 chance 模块,生成一个长度为 size 的随机字符串
  • 通过 push() 方法将字符传推送到内部 buffer
  • 依据 5% 的几率自行终止,终止时推送 null 到内部 buffer,作为 stream 的结束标志

简化版实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import {Readable} from 'stream'
import Chance from 'chance'

const chance = new Chance()
let emittedBytes = 0

const randomStream = new Readable({
read(size) {
const chunk = chance.string({length: size})
this.push(chunk, 'utf8')
emittedBytes += chunk.length
if (chance.bool({likelihood: 5})) {
this.push(null)
}
}
})

randomStream
.on('data', (chunk) => {
console.log(`Chunk received (${chunk.length} bytes): ${chunk.toString()}`)
})

从可迭代对象创建 Readable streams

Readable.from() 方法支持从数组或者其他可迭代对象(比如 generators, iterators, async iterators)创建 Readable streams。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import {Readable} from 'stream'

const mountains = [
{name: 'Everest', height: 8848},
{name: 'K2', height: 8611},
{name: 'Kangchenjunga', height: 8586},
{name: 'Lhotse', height: 8516},
{name: 'Makalu', height: 8481}
]

const mountainsStream = Readable.from(mountains)
mountainsStream.on('data', (mountain) => {
console.log(`${mountain.name.padStart(14)}\t${mountain.height}m`)
})

Writable streams

向流写入数据

write() 方法可以向 Writable stream 写入数据。
writable.write(chunk, [encoding], [callback])

end() 方法可以向 stream 表明没有更多的数据需要写入。
writable.end([chunk], [encoding], [callback])

callback 回调函数等同于为 finish 事件注册了一个 listener,会在流中写入的所有数据刷新到底层资源中时触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import {createServer} from 'http'
import Chance from 'chance'

const chance = new Chance()
const server = createServer((req, res) => {
res.writeHead(200, {'Content-Type': 'text/plain'})
while (chance.bool({likelihood: 95})) {
res.write(`${chance.string()}\n`)
}
res.end('\n\n')
res.on('finish', () => console.log('All data sent'))
})

server.listen(8080, () => {
console.log('listening on http://localhost:8080')
})

上面代码中 HTTP 服务里的 res 对象是一个 http.ServerResponse 对象,实际上也是一个 Writable stream。

实现 Writable stream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import {Writable} from 'stream'
import {promises as fs} from 'fs'

class ToFileStream extends Writable {
constructor(options) {
super({...options, objectMode: true})
}

_write(chunk, encoding, cb) {
fs.writeFile(chunk.path, chunk.content)
.then(() => cb())
.catch(cb)
}
}

const tfs = new ToFileStream()

tfs.write({path: 'file1.txt', content: 'Hello'})
tfs.write({path: 'file2.txt', content: 'Node.js'})
tfs.write({path: 'file3.txt', content: 'streams'})
tfs.end(() => console.log('All files created'))

简化形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import {Writable} from 'stream'
import {promises as fs} from 'fs'

const tfs = new Writable({
objectMode: true,
write(chunk, encoding, cb) {
fs.writeFile(chunk.path, chunk.content)
.then(() => cb())
.catch(cb)
}
})

tfs.write({path: 'file1.txt', content: 'Hello'})
tfs.write({path: 'file2.txt', content: 'Node.js'})
tfs.write({path: 'file3.txt', content: 'streams'})
tfs.end(() => console.log('All files created'))

Duplex streams

Duplex 流,既 Readable 又 Writable 的流。它的场景在于,有时候我们描述的实体既是数据源,也是数据的接收者,比如网络套接字。
Duplex 流同时继承来着 stream.Readablestream.Writable 的方法。
为了创建一个自定义的 Duplex 流,我们必须同时提供 _read()_write() 的实现。

Transform streams

Transform 流是一种特殊类型的 Duplex 流,主要针对数据的转换。
对于 Duplex 流来说,流入和流出的数据之间并没有直接的联系。比如一个 TCP 套接字,只是从远端接收或者发送数据,套接字本身不知晓输入输出之间的任何关系。

Duplex stream

而 Transform 流则会对收到的每一段数据都应用某种转换操作,从 Writable 端接收数据,进行某种形式地转换后再通过 Readable 端提供给外部。

Transform stream

实现 Transform 流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import {Transform} from 'stream'

class ReplaceStream extends Transform {
constructor(searchStr, replaceStr, options) {
super({...options})
this.searchStr = searchStr
this.replaceStr = replaceStr
this.tail = ''
}

_transform(chunk, encoding, callback) {
const pieces = (this.tail + chunk).split(this.searchStr)
const lastPiece = pieces[pieces.length - 1]
const tailLen = this.searchStr.length - 1
this.tail = lastPiece.slice(-tailLen)
pieces[pieces.length - 1] = lastPiece.slice(0, -tailLen)
this.push(pieces.join(this.replaceStr))
callback()
}

_flush(callback) {
this.push(this.tail)
callback()
}
}


const replaceStream = new ReplaceStream('World', 'Node.js')
replaceStream.on('data', chunk => console.log(chunk.toString()))
replaceStream.write('Hello W')
replaceStream.write('orld')
replaceStream.end()

其中核心的 _transform() 方法,其有着和 Writable 流的 _write() 方法基本一致的签名,但并不会将处理后的数据写入底层资源,而是通过 this.push() 推送给内部 buffer,正如 Readable 流中 _read() 方法的行为。
所以形成了 Transform 流整体上接收、转换、发送的行为。
_flush() 则会在流结束前调用。

简化形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import {Transform} from 'stream'

const searchStr = 'World'
const replaceStr = 'Node.js'
let tail = ''

const replaceStream = new Transform({
defaultEncoding: 'utf-8',

transform(chunk, encoding, cb) {
const pieces = (tail + chunk).split(searchStr)
const lastPiece = pieces[pieces.length - 1]
const tailLen = searchStr.length - 1
tail = lastPiece.slice(-tailLen)
pieces[pieces.length - 1] = lastPiece.slice(0, -tailLen)
this.push(pieces.join(replaceStr))
cb()
},
flush(cb) {
this.push(tail)
cb()
}
})
replaceStream.on('data', chunk => console.log(chunk.toString()))
replaceStream.write('Hello W')
replaceStream.write('orld')
replaceStream.end()

Transform 流筛选和聚合数据

数据源 data.csv

1
2
3
4
5
6
7
8
type,country,profit
Household,Namibia,597290.92
Baby Food,Iceland,808579.10
Meat,Russia,277305.60
Meat,Italy,413270.00
Cereal,Malta,174965.25
Meat,Indonesia,145402.40
Household,Italy,728880.54

package.json:

1
2
3
4
5
6
7
8
9
10
11
{
"type": "module",
"main": "index.js",
"dependencies": {
"csv-parse": "^4.10.1"
},
"engines": {
"node": ">=14"
},
"engineStrict": true
}

FilterByCountry Transform 流 filter-by-country.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import {Transform} from 'stream'

export class FilterByCountry extends Transform {
constructor(country, options = {}) {
options.objectMode = true
super(options)
this.country = country
}

_transform(record, enc, cb) {
if (record.country === this.country) {
this.push(record)
}
cb()
}
}

SumProfit Transform 流 sum-profit.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import {Transform} from 'stream'

export class SumProfit extends Transform {
constructor(options = {}) {
options.objectMode = true
super(options)
this.total = 0
}

_transform(record, enc, cb) {
this.total += Number.parseFloat(record.profit)
cb()
}

_flush(cb) {
this.push(this.total.toString())
cb()
}
}

index.js

1
2
3
4
5
6
7
8
9
10
11
12
import {createReadStream} from 'fs'
import parse from 'csv-parse'
import {FilterByCountry} from './filter-by-conutry.js'
import {SumProfit} from './sum-profit.js'

const csvParser = parse({columns: true})

createReadStream('data.csv')
.pipe(csvParser)
.pipe(new FilterByCountry('Italy'))
.pipe(new SumProfit())
.pipe(process.stdout)

参考资料

Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition