Node.js 设计模式笔记 —— 由 Promises 和 Async、Await 实现的异步模式

回调函数(Callbacks)是 Node.js 中异步编程的底层构件,但它们远远达不到对用户友好的程度。对于实现代码中最常见的串行控制流,一个未经训练的开发者很容易陷入到 callback hell 问题中。即便实现是正确的,该串行控制流也会显得不必要的复杂和脆弱。

为了获得更好的异步编程体验,第一个出现的就是 promise,一种保存了异步操作的状态和最终结果的对象。Promise 可以轻易地被串联起来,实现串行控制流,可以像其他任何对象一样自由地转移。Pormise 大大简化了异步代码,后来在此基础上又有了 asyncawait,能够令异步代码看起来就像是同步代码一样。

Promises

Promises 是 ECMAScript 2015 标准(ES6)的一部分,为传递异步结果提供了一种健壮的解决方案,替代原本的 CPS 样式的回调函数。Promise 能够令所有主要的异步控制流更加易读、简洁和健壮。

Promise 是一种用来代表异步操作的最终结果(或错误)的对象。在专业术语中,当异步操作未完成时,我们称 Promise 是 pending 的;当异步操作成功结束时,Promise 是 fulfilled 的;当异步操作因为错误终止时,Promise 是 rejected 的;当 Promise 或者是 fulfilled 或者是 rejected,则将其认定为 settled

Promise 对象的 then() 方法可以获取成功执行后的结果或者终止时报出的错误:

1
promise.then(onFulfilled, onRejected)

其中 onFulfilled 是一个回调函数,最终会接收到 Promise 成功时的值;onRejected是另一个回调函数,最终会接收 Promise 异常终止时的值(如果有的话)。

基于回调函数的如下代码:

1
2
3
4
5
6
asyncOperation(arg, (err, result) => {
if (err) {
// handle the error
}
// do stuff with the result
})

Promise 实现上述同样的功能,则更加优雅、结构化:

1
2
3
4
5
6
asyncOperationPromise(arg)
.then(result => {
// do stuff with result
}, err => {
// handle the error
})

asyncOperationPromise() 会返回一个 Promise,可以被用来获取最终结果的值或者失败的原因。但最为关键的属性是,then() 方法会同步地返回另一个 Promise。
更进一步地,如果 onFulfilled 或者 onRejected 函数返回一个值 x,那么 then() 方法返回的 Promise 会有以下行为:

  • x 是一个值,则 then() 返回的 Promise 使用 x 作为自身完成时的值
  • x 是一个 Promise 且成功完成,则 x 完成时返回的值作为 then() 返回的 Promise 完成时的值
  • x 是一个 Promise 且因为错误终止,则 x 终止的原因作为 then() 返回的 Promise 终止的原因

上述行为能够令我们将多个 promise 连接成链,轻松地将异步操作聚合在一起。如果我们没有指定一个 onFulfilled 或者 onRejected handler,Promise 完成时的值或者终止时的原因都会自动地传递给链条中的下一个 Promise。通过 Promise 链,任务的执行顺序突然变得很简单。

1
2
3
4
5
6
7
8
9
10
11
12
asyncOperationPromise(arg)
.then(result1 => {
// return another promise
return asyncOperationPromise(arg2)
})
.then(result2 => {
// return a value
return 'done'
})
.then(undefined, err => {
// any error in the chain is caught here
})

promise API

Promise 构造函数(new Promise((resolve, reject) => {}))会创建一个新的 Promise 实例,其完成还是终止取决于作为参数传入的函数的行为。
作为参数传入的函数接收如下两个参数:

  • resolve(obj):resolve 是一个函数,在调用时为 Promise 提供完成时的值。当 obj 是值时,则 obj 本身作为 Promise 完成时的值;当 obj 是另一个 Promise 时,则 obj 完成时的值作为当前 Promise 完成时的值
  • reject(err):Promise 因为 err 终止
1
2
3
4
5
6
7
8
9
10
11
12
13
function delay(milliseconds) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(new Date())
}, milliseconds)
})
}

console.log(`${new Date().getSeconds()}s\nDelaying...`)
delay(1000)
.then(newDate => {
console.log(`${newDate.getSeconds()}s`)
})

Promise 最重要的静态方法:

  • Promise.resolve(obj):从另一个 Promise、thenable 对象或者值创建一个新的 Promise
  • Promise.reject(err):创建一个 Promise,该 Promise 会因为 err 终止
  • Promise.all(iterable):从一个可迭代对象创建 Promise,若该 iterable 中的每一项都提供了一个 fulfill 值,则 Promise 最终以包含这些值的列表作为 fulfill 值;若其中有任意一项 reject,则 Promise.all() 返回的 Promise 以第一个 reject 的 err 终止
  • Promise.allSettled(iterable):此方法会等待所有输入的 Promise 或者 fulfill 或者 reject,之后返回一个包含所有 fulfill 值和 reject 原因的列表
  • Promise.race(iterable):返回可迭代对象中第一个 fulfill 或 reject 的 Promise

Promise 关键的实例方法:

  • promise.catch(onRejected):实际上就是 promise.then(undefined, onRejected) 的语法糖
  • promise.finally(onFinally):允许我们设置一个 onFinally 回调函数,在 promise fulfill 或者 reject 时调用

顺序执行

顺序执行意味着,每次只执行一系列任务中的一个,完成后再依次执行后面的任务。这一系列任务的先后顺序必须是预先定义好的,因为一个任务的结果有可能影响后续任务的执行。

An example of sequential execution flow with three tasks

上述执行流程有着不同形式的变种:

  • 顺序执行一系列已知的任务,不需要在它们之间传递数据
  • 前一个任务的输出作为后一个任务的输入(chainpipelinewaterfall
  • 迭代任务集合,同时在每个元素上一个接一个地运行异步任务

package.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"name": "03-promises-web-spider-v2",
"version": "1.0.0",
"private": true,
"type": "module",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"dependencies": {
"cheerio": "^1.0.0-rc.3",
"mkdirp": "^0.5.1",
"superagent": "^5.2.2",
"slug": "^1.1.0"
},
"engines": {
"node": ">=14"
},
"engineStrict": true
}

spider.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import {promises as fsPromises} from 'fs'
import {dirname} from 'path'
import superagent from 'superagent'
import mkdirp from 'mkdirp'
import {urlToFilename, getPageLinks} from './utils.js'
import {promisify} from 'util'

const mkdirpPromises = promisify(mkdirp)

function download(url, filename) {
console.log(`Downloading ${url}`)
let content
return superagent.get(url)
.then((res) => {
content = res.text
return mkdirpPromises(dirname(filename))
})
.then(() => fsPromises.writeFile(filename, content))
.then(() => {
console.log(`Downloaded and saved: ${url}`)
return content
})
}

function spiderLinks(currentUrl, content, nesting) {
let promise = Promise.resolve()
if (nesting === 0) {
return promise
}
const links = getPageLinks(currentUrl, content)
for (const link of links) {
promise = promise.then(() => spider(link, nesting - 1))
}

return promise
}

export function spider(url, nesting) {
const filename = urlToFilename(url)
return fsPromises.readFile(filename, 'utf8')
.catch((err) => {
if (err.code !== 'ENOENT') {
throw err
}

// The file doesn't exist, so let’s download it
return download(url, filename)
})
.then(content => spiderLinks(url, content, nesting))
}

spider-cli.js

1
2
3
4
5
6
7
8
import {spider} from './spider.js'

const url = process.argv[2]
const nesting = Number.parseInt(process.argv[3], 10) || 1

spider(url, nesting)
.then(() => console.log('Download complete'))
.catch(err => console.error(err))

utils.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import {join, extname} from 'path'
import {URL} from 'url'
import slug from 'slug'
import cheerio from 'cheerio'

function getLinkUrl(currentUrl, element) {
const parsedLink = new URL(element.attribs.href || '', currentUrl)
const currentParsedUrl = new URL(currentUrl)
if (parsedLink.hostname !== currentParsedUrl.hostname ||
!parsedLink.pathname) {
return null
}
return parsedLink.toString()
}

export function urlToFilename(url) {
const parsedUrl = new URL(url)
const urlPath = parsedUrl.pathname.split('/')
.filter(function (component) {
return component !== ''
})
.map(function (component) {
return slug(component, {remove: null})
})
.join('/')
let filename = join(parsedUrl.hostname, urlPath)
if (!extname(filename).match(/htm/)) {
filename += '.html'
}

return filename
}

export function getPageLinks(currentUrl, body) {
return Array.from(cheerio.load(body)('a'))
.map(function (element) {
return getLinkUrl(currentUrl, element)
})
.filter(Boolean)
}

node spider-cli.js http://www.baidu.com 2

其中的 spiderLinks() 函数通过循环动态地构建了一条 Promise 链:

  • 先定义一个“空的” Promise 对象(resovle 到 undefined),这个空 Promise 只是作为链条的起点
  • 在循环中,不断将 promise 变量更新为新的 Promise 对象(通过调用上一个 Promise 的 then() 方法得到)。这就是 Promise 的异步遍历模式

for 循环的最后,promise 变量会是最后一个 then() 方法返回的 Promise,因而只有当链条中的所有 Promise 都 resolve 时,promise 才会 resolve。

纵观所有代码,我们可以不需要像使用 callback 那样,强制地包含众多错误传递逻辑。因而大大减少了代码量和出错的机会。

并行执行

在某些情况下,一系列异步任务的执行顺序并不重要,我们需要的只是当所有的任务都完成后能收到通知。

An example of parallel execution with three tasks

虽然 Node.js 是单线程的,但得益于其 non-blocking nature,我们仍可以实现并发行为。

An example of how asynchronous tasks run in parallel

比如我们有一个 Main 函数需要执行两个异步任务:

  • Main 函数首先触发异步任务 Task1 和 Task2 的执行。异步任务触发后,会将程序控制权立即交还给 Main 函数,再转交给 event loop
  • 当 Task1 中的异步任务结束时,event loop 调用 Task1 的回调函数,将控制权交给 Task1。Task1 执行完成自身内部的同步指令,通知 Main 函数并返还控制权
  • 当 Task2 中的异步任务结束时,event loop 调用 Task2 的回调函数,将控制权交给 Task2。在 Task2 的终点,Main 函数再次被通知。Main 函数得知 Task1 和 Task2 全部结束,继续执行或者返回结果

简单来说,在 Node.js 中,我们只能并发地执行异步操作,因为它们的并发行为是由内部的非阻塞 API 控制的。同步(阻塞)操作无法并发地执行,除非它们的执行与异步操作交织在一起,或者由 setTimeout()setImmediate() 包裹。

Promise 实现并发执行流,可以借助内置的 Promise.all() 方法。该方法会返回一个新的 Promise,只有当所有传入的 Promise 都 fulfill 时,新 Promise 才会 fulfill。如果传入的 Promise 之间没有因果关系,这些 Promise 就会并发地执行。

对于前面的 spider 应用,只需要将 spiderLinks() 函数改为如下形式:

1
2
3
4
5
6
7
8
function spiderLinks(currentUrl, content, nesting) {
if (nesting === 0) {
return Promise.resolve()
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
return Promise.all(promises)
}

Async/await

Promise 链相对于 callback hell 来说肯定是要好太多的,但是我们仍然需要调用 then() 方法,以及为链条中的每一个任务创建新的函数,对于日常编程中非常普遍的控制流来说还是比较麻烦。而 Async/await 可以帮助我们写出像同步代码一样可读性强、容易理解的异步代码。
Async 函数是一种特殊的函数,在函数体里面可以使用 await 表达式“暂停”任意一个 Promise 的执行,将控制权交还给 async 函数的调用者,等该 Promise revolve 后再返回到暂停的地方继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
function delay(milliseconds) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(new Date())
}, milliseconds)
})
}

async function playingWithDelays() {
console.log('Initial date: ', new Date())
const dateAfterOneSecond = await delay(1000)
console.log('Date after one second: ', dateAfterOneSecond)

const dateAfterThreeSeconds = await delay(3000)
console.log('Date after 3 secnods: ', dateAfterThreeSeconds)
return 'done'
}

playingWithDelays()
.then(result => {
console.log(`After 4 seconds: ${result}`)
})

错误处理

Async/await 的另一个巨大的优势在于,它能够标准化 try...catch 代码块的行为,不管是针对同步代码中的 throw,抑或是异步代码中的 Promise reject。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
function delayError(milliseconds) {
return new Promise((resolve, reject) => {
setTimeout(() => {
reject(new Error(`Error after ${milliseconds}ms`))
})
})
}

async function playingWithErrors(throwSyncError) {
try {
if (throwSyncError) {
throw new Error('This is a synchronous error')
}
await delayError(1000)
} catch (err) {
console.log(`We have an error: ${err.message}`)
} finally {
console.log('Done')
}
}

// playingWithErrors(true)
playingWithErrors(false)

串行执行

借助 Async/await,可以对之前的 spider 应用实现很多优化。比如 download() 函数:

1
2
3
4
5
6
7
8
async function download(url, filename) {
console.log(`Downloading ${url}`)
const {text: content} = await superagent.get(url)
await mkdirpPromises(dirname(filename))
await fsPromises.writeFile(filename, content)
console.log(`Downloaded and saved: ${url}`)
return content
}

整段代码行数大大减少,看起来也很“平整”,没有任何层级和缩进。

接下来是 spiderLinks() 函数,使用 async/await 异步地遍历一个列表:

1
2
3
4
5
6
7
8
9
async function spiderLinks(currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
for (const link of links) {
await spider(link, nesting - 1)
}
}

然后是 spider() 函数,如何简单地通过 try...catch 处理错误,令异步代码更加易读:

1
2
3
4
5
6
7
8
9
10
11
12
13
export async function spider(url, nesting) {
const filename = urlToFilename(url)
let content
try {
content = await fsPromises.readFile(filename, 'utf8')
} catch (err) {
if (err.code !== 'ENOENT') {
throw err
}
content = await download(url, filename)
}
return spiderLinks(url, content, nesting)
}

并行执行

使用纯 async/await 实现并行的异步执行流程,可以参考如下代码:

1
2
3
4
5
6
7
8
9
10
async function spiderLinks(currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
for (const promise of promises) {
await promise
}
}

然而上述代码存在一定的问题。如果列表中有一个 Promise reject 了,我们不得不等待列表中其他所有的 Promise 都 resolve,spiderLinks() 函数返回的 Promise 才会 reject。这种行为在多数情况下都是不理想的。
我们通常都会想要在操作发生错误的第一时间捕获错误信息。因而并行执行异步操作,最后仍建议使用下面形式的代码:

1
2
3
4
5
6
7
8
async function spiderLinks(currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
return Promise.all(promises)
}

参考资料

Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition