回调函数(Callbacks)是 Node.js 中异步编程的底层构件,但它们远远达不到对用户友好的程度。对于实现代码中最常见的串行控制流,一个未经训练的开发者很容易陷入到 callback hell 问题中。即便实现是正确的,该串行控制流也会显得不必要的复杂和脆弱。
为了获得更好的异步编程体验,第一个出现的就是 promise,一种保存了异步操作的状态和最终结果的对象。Promise 可以轻易地被串联起来,实现串行控制流,可以像其他任何对象一样自由地转移。Pormise 大大简化了异步代码,后来在此基础上又有了 async 和 await,能够令异步代码看起来就像是同步代码一样。
Promises
Promises 是 ECMAScript 2015 标准(ES6)的一部分,为传递异步结果提供了一种健壮的解决方案,替代原本的 CPS 样式的回调函数。Promise 能够令所有主要的异步控制流更加易读、简洁和健壮。
Promise 是一种用来代表异步操作的最终结果(或错误)的对象。在专业术语中,当异步操作未完成时,我们称 Promise 是 pending 的;当异步操作成功结束时,Promise 是 fulfilled 的;当异步操作因为错误终止时,Promise 是 rejected 的;当 Promise 或者是 fulfilled 或者是 rejected,则将其认定为 settled。
Promise 对象的 then()
方法可以获取成功执行后的结果或者终止时报出的错误:1
promise.then(onFulfilled, onRejected)
其中 onFulfilled
是一个回调函数,最终会接收到 Promise 成功时的值;onRejected
是另一个回调函数,最终会接收 Promise 异常终止时的值(如果有的话)。
基于回调函数的如下代码:1
2
3
4
5
6asyncOperation(arg, (err, result) => {
if (err) {
// handle the error
}
// do stuff with the result
})
Promise 实现上述同样的功能,则更加优雅、结构化:1
2
3
4
5
6asyncOperationPromise(arg)
.then(result => {
// do stuff with result
}, err => {
// handle the error
})
asyncOperationPromise()
会返回一个 Promise,可以被用来获取最终结果的值或者失败的原因。但最为关键的属性是,then()
方法会同步地返回另一个 Promise。
更进一步地,如果 onFulfilled
或者 onRejected
函数返回一个值 x
,那么 then()
方法返回的 Promise 会有以下行为:
- 若
x
是一个值,则then()
返回的 Promise 使用x
作为自身完成时的值 - 若
x
是一个 Promise 且成功完成,则x
完成时返回的值作为then()
返回的 Promise 完成时的值 - 若
x
是一个 Promise 且因为错误终止,则x
终止的原因作为then()
返回的 Promise 终止的原因
上述行为能够令我们将多个 promise 连接成链,轻松地将异步操作聚合在一起。如果我们没有指定一个 onFulfilled
或者 onRejected
handler,Promise 完成时的值或者终止时的原因都会自动地传递给链条中的下一个 Promise。通过 Promise 链,任务的执行顺序突然变得很简单。1
2
3
4
5
6
7
8
9
10
11
12asyncOperationPromise(arg)
.then(result1 => {
// return another promise
return asyncOperationPromise(arg2)
})
.then(result2 => {
// return a value
return 'done'
})
.then(undefined, err => {
// any error in the chain is caught here
})
promise API
Promise 构造函数(new Promise((resolve, reject) => {})
)会创建一个新的 Promise 实例,其完成还是终止取决于作为参数传入的函数的行为。
作为参数传入的函数接收如下两个参数:
resolve(obj)
:resolve 是一个函数,在调用时为 Promise 提供完成时的值。当obj
是值时,则obj
本身作为 Promise 完成时的值;当obj
是另一个 Promise 时,则obj
完成时的值作为当前 Promise 完成时的值reject(err)
:Promise 因为err
终止
1 | function delay(milliseconds) { |
Promise 最重要的静态方法:
Promise.resolve(obj)
:从另一个 Promise、thenable 对象或者值创建一个新的 PromisePromise.reject(err)
:创建一个 Promise,该 Promise 会因为err
终止Promise.all(iterable)
:从一个可迭代对象创建 Promise,若该 iterable 中的每一项都提供了一个 fulfill 值,则 Promise 最终以包含这些值的列表作为 fulfill 值;若其中有任意一项 reject,则 Promise.all() 返回的 Promise 以第一个 reject 的 err 终止Promise.allSettled(iterable)
:此方法会等待所有输入的 Promise 或者 fulfill 或者 reject,之后返回一个包含所有 fulfill 值和 reject 原因的列表Promise.race(iterable)
:返回可迭代对象中第一个 fulfill 或 reject 的 Promise
Promise 关键的实例方法:
promise.catch(onRejected)
:实际上就是promise.then(undefined, onRejected)
的语法糖promise.finally(onFinally)
:允许我们设置一个onFinally
回调函数,在promise
fulfill 或者 reject 时调用
顺序执行
顺序执行意味着,每次只执行一系列任务中的一个,完成后再依次执行后面的任务。这一系列任务的先后顺序必须是预先定义好的,因为一个任务的结果有可能影响后续任务的执行。
上述执行流程有着不同形式的变种:
- 顺序执行一系列已知的任务,不需要在它们之间传递数据
- 前一个任务的输出作为后一个任务的输入(chain、pipeline、waterfall)
- 迭代任务集合,同时在每个元素上一个接一个地运行异步任务
package.json
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19{
"name": "03-promises-web-spider-v2",
"version": "1.0.0",
"private": true,
"type": "module",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"dependencies": {
"cheerio": "^1.0.0-rc.3",
"mkdirp": "^0.5.1",
"superagent": "^5.2.2",
"slug": "^1.1.0"
},
"engines": {
"node": ">=14"
},
"engineStrict": true
}
spider.js
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50import {promises as fsPromises} from 'fs'
import {dirname} from 'path'
import superagent from 'superagent'
import mkdirp from 'mkdirp'
import {urlToFilename, getPageLinks} from './utils.js'
import {promisify} from 'util'
const mkdirpPromises = promisify(mkdirp)
function download(url, filename) {
console.log(`Downloading ${url}`)
let content
return superagent.get(url)
.then((res) => {
content = res.text
return mkdirpPromises(dirname(filename))
})
.then(() => fsPromises.writeFile(filename, content))
.then(() => {
console.log(`Downloaded and saved: ${url}`)
return content
})
}
function spiderLinks(currentUrl, content, nesting) {
let promise = Promise.resolve()
if (nesting === 0) {
return promise
}
const links = getPageLinks(currentUrl, content)
for (const link of links) {
promise = promise.then(() => spider(link, nesting - 1))
}
return promise
}
export function spider(url, nesting) {
const filename = urlToFilename(url)
return fsPromises.readFile(filename, 'utf8')
.catch((err) => {
if (err.code !== 'ENOENT') {
throw err
}
// The file doesn't exist, so let’s download it
return download(url, filename)
})
.then(content => spiderLinks(url, content, nesting))
}
spider-cli.js
:1
2
3
4
5
6
7
8import {spider} from './spider.js'
const url = process.argv[2]
const nesting = Number.parseInt(process.argv[3], 10) || 1
spider(url, nesting)
.then(() => console.log('Download complete'))
.catch(err => console.error(err))
utils.js
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40import {join, extname} from 'path'
import {URL} from 'url'
import slug from 'slug'
import cheerio from 'cheerio'
function getLinkUrl(currentUrl, element) {
const parsedLink = new URL(element.attribs.href || '', currentUrl)
const currentParsedUrl = new URL(currentUrl)
if (parsedLink.hostname !== currentParsedUrl.hostname ||
!parsedLink.pathname) {
return null
}
return parsedLink.toString()
}
export function urlToFilename(url) {
const parsedUrl = new URL(url)
const urlPath = parsedUrl.pathname.split('/')
.filter(function (component) {
return component !== ''
})
.map(function (component) {
return slug(component, {remove: null})
})
.join('/')
let filename = join(parsedUrl.hostname, urlPath)
if (!extname(filename).match(/htm/)) {
filename += '.html'
}
return filename
}
export function getPageLinks(currentUrl, body) {
return Array.from(cheerio.load(body)('a'))
.map(function (element) {
return getLinkUrl(currentUrl, element)
})
.filter(Boolean)
}
node spider-cli.js http://www.baidu.com 2
其中的 spiderLinks()
函数通过循环动态地构建了一条 Promise 链:
- 先定义一个“空的” Promise 对象(resovle 到
undefined
),这个空 Promise 只是作为链条的起点 - 在循环中,不断将
promise
变量更新为新的 Promise 对象(通过调用上一个 Promise 的then()
方法得到)。这就是 Promise 的异步遍历模式
在 for
循环的最后,promise
变量会是最后一个 then()
方法返回的 Promise,因而只有当链条中的所有 Promise 都 resolve 时,promise
才会 resolve。
纵观所有代码,我们可以不需要像使用 callback 那样,强制地包含众多错误传递逻辑。因而大大减少了代码量和出错的机会。
并行执行
在某些情况下,一系列异步任务的执行顺序并不重要,我们需要的只是当所有的任务都完成后能收到通知。
虽然 Node.js 是单线程的,但得益于其 non-blocking nature,我们仍可以实现并发行为。
比如我们有一个 Main 函数需要执行两个异步任务:
- Main 函数首先触发异步任务 Task1 和 Task2 的执行。异步任务触发后,会将程序控制权立即交还给 Main 函数,再转交给 event loop
- 当 Task1 中的异步任务结束时,event loop 调用 Task1 的回调函数,将控制权交给 Task1。Task1 执行完成自身内部的同步指令,通知 Main 函数并返还控制权
- 当 Task2 中的异步任务结束时,event loop 调用 Task2 的回调函数,将控制权交给 Task2。在 Task2 的终点,Main 函数再次被通知。Main 函数得知 Task1 和 Task2 全部结束,继续执行或者返回结果
简单来说,在 Node.js 中,我们只能并发地执行异步操作,因为它们的并发行为是由内部的非阻塞 API 控制的。同步(阻塞)操作无法并发地执行,除非它们的执行与异步操作交织在一起,或者由 setTimeout()
、setImmediate()
包裹。
Promise 实现并发执行流,可以借助内置的 Promise.all()
方法。该方法会返回一个新的 Promise,只有当所有传入的 Promise 都 fulfill 时,新 Promise 才会 fulfill。如果传入的 Promise 之间没有因果关系,这些 Promise 就会并发地执行。
对于前面的 spider 应用,只需要将 spiderLinks()
函数改为如下形式:1
2
3
4
5
6
7
8function spiderLinks(currentUrl, content, nesting) {
if (nesting === 0) {
return Promise.resolve()
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
return Promise.all(promises)
}
Async/await
Promise 链相对于 callback hell 来说肯定是要好太多的,但是我们仍然需要调用 then()
方法,以及为链条中的每一个任务创建新的函数,对于日常编程中非常普遍的控制流来说还是比较麻烦。而 Async/await 可以帮助我们写出像同步代码一样可读性强、容易理解的异步代码。
Async 函数是一种特殊的函数,在函数体里面可以使用 await
表达式“暂停”任意一个 Promise 的执行,将控制权交还给 async 函数的调用者,等该 Promise revolve 后再返回到暂停的地方继续执行。
1 | function delay(milliseconds) { |
错误处理
Async/await 的另一个巨大的优势在于,它能够标准化 try...catch
代码块的行为,不管是针对同步代码中的 throw
,抑或是异步代码中的 Promise reject。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23function delayError(milliseconds) {
return new Promise((resolve, reject) => {
setTimeout(() => {
reject(new Error(`Error after ${milliseconds}ms`))
})
})
}
async function playingWithErrors(throwSyncError) {
try {
if (throwSyncError) {
throw new Error('This is a synchronous error')
}
await delayError(1000)
} catch (err) {
console.log(`We have an error: ${err.message}`)
} finally {
console.log('Done')
}
}
// playingWithErrors(true)
playingWithErrors(false)
串行执行
借助 Async/await,可以对之前的 spider 应用实现很多优化。比如 download()
函数:1
2
3
4
5
6
7
8async function download(url, filename) {
console.log(`Downloading ${url}`)
const {text: content} = await superagent.get(url)
await mkdirpPromises(dirname(filename))
await fsPromises.writeFile(filename, content)
console.log(`Downloaded and saved: ${url}`)
return content
}
整段代码行数大大减少,看起来也很“平整”,没有任何层级和缩进。
接下来是 spiderLinks()
函数,使用 async/await 异步地遍历一个列表:1
2
3
4
5
6
7
8
9async function spiderLinks(currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
for (const link of links) {
await spider(link, nesting - 1)
}
}
然后是 spider()
函数,如何简单地通过 try...catch
处理错误,令异步代码更加易读:1
2
3
4
5
6
7
8
9
10
11
12
13export async function spider(url, nesting) {
const filename = urlToFilename(url)
let content
try {
content = await fsPromises.readFile(filename, 'utf8')
} catch (err) {
if (err.code !== 'ENOENT') {
throw err
}
content = await download(url, filename)
}
return spiderLinks(url, content, nesting)
}
并行执行
使用纯 async/await 实现并行的异步执行流程,可以参考如下代码:1
2
3
4
5
6
7
8
9
10async function spiderLinks(currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
for (const promise of promises) {
await promise
}
}
然而上述代码存在一定的问题。如果列表中有一个 Promise reject 了,我们不得不等待列表中其他所有的 Promise 都 resolve,spiderLinks()
函数返回的 Promise 才会 reject。这种行为在多数情况下都是不理想的。
我们通常都会想要在操作发生错误的第一时间捕获错误信息。因而并行执行异步操作,最后仍建议使用下面形式的代码:1
2
3
4
5
6
7
8async function spiderLinks(currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
return Promise.all(promises)
}