Node.js 设计模式笔记 —— 消息中间件及其应用模式(任务分发)

Distributing tasks to a set of consumers

将高成本的任务委派给多个工作节点,这种类型的应用并不适合由 Pub/Sub 模式实现。因为我们并不想同一个任务被多个消费者收到,相反我们更需要一种类似负载均衡的消息分发模式。在消息系统术语中,也被称为 competing consumersfanout distributionventilator
与 HTTP 负载均衡器不同的是,任务分发系统中的消费者是一种更活跃的角色。绝大多数时候都是消费者连接到任务队列,请求新的任务。这一点在可扩展系统中非常关键,允许我们在不修改生产者部分的情况下,直接平滑地增加工作节点的数量。
此外,在一个通用的消息系统中,我们没有必要强调生产者和消费者之间的请求/响应通信。多数情况下,更优先的选择是使用单向的异步通信,从而获得更优异的并行能力和扩展性。消息基本上总是沿着一个方向流动,这样的管道允许我们构建复杂的信息处理架构,又不必承受同步通信带来的开销。

A messaging pipeline

ZeroMQ Fanout/Fanin 模式

分布式 hashsum 破解器

需要以下组件实现一个标准的并行管线:

  • 一个协调节点负责在多个工作节点间分发任务
  • 多个工作节点承担具体的计算任务
  • 一个用于收集计算结果的节点

The architecture of a typical pipeline with ZeroMQ

即一个节点负责生成所有可能的字符串组合,并将它们分发给不同的工作节点;工作节点则负责计算接收到的字符串,比较 hash 值;最后一个节点负责收集暴力破解的结果。

实现 producer

为了表示所有可能的字符组合,这里使用 N 维索引树。每个节点包含一个当前位置下可能出现的字母,比如只有 ab 两个字母的话,长度为 3 的字符串组合共有图示的以下几种:
Indexed n-ary tree for alphabet (a, b)

indexed-string-variation 包可以帮助我们由索引计算出对应的字符串,这项工作可以在工作节点完成,因此 producer 这里只需要将分好组的索引值分发给工作节点。
generateTasks.js:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
export function* generateTasks(searchHash, alphabet,
maxWordLength, batchSize) {
let nVariations = 0
for (let n = 1; n <= maxWordLength; n++) {
nVariations += Math.pow(alphabet.length, n)
}

console.log('Finding the hashsum source string over ' +
`${nVariations} possible variations`)

let batchStart = 1
while (batchStart <= nVariations) {
const batchEnd = Math.min(
batchStart + batchSize - 1, nVariations)
yield {
searchHash,
alphabet: alphabet,
batchStart,
batchEnd
}

batchStart = batchEnd + 1
}
}

producer.js:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import zmq from 'zeromq'
import delay from 'delay'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000

const [, , maxLength, searchHash] = process.argv

async function main() {
const ventilator = new zmq.Push()
await ventilator.bind('tcp://*:5016')
await delay(1000)

const generatorObj = generateTasks(searchHash, ALPHABET, maxLength, BATCH_SIZE)
for (const task of generatorObj) {
await ventilator.send(JSON.stringify(task))
}
}

main().catch(err => console.log(err))

  • 创建一个 PUSH socket 并绑定给本地的 5016 端口,工作节点的 PULL socket 会连接到此端口并接收任务
  • 将每一个生成的任务字符串化,通过 PUSH socket 的 send() 方法发送给工作节点。工作节点以轮询的方式接收不同的任务
实现 worker

process Task.js:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import isv from 'indexed-string-variation'
import { createHash } from 'crypto'

export function processTask(task) {
const variationGen = isv.generator(task.alphabet)
console.log('processing from ' +
`${variationGen(task.batchStart)} (${task.batchStart})` +
`to ${variationGen(task.batchEnd)} (${task.batchEnd}`)

for (let idx = task.batchStart; idx <= task.batchEnd; idx++) {
const word = variationGen(idx)
const shasum = createHash('sha1')
shasum.update(word)
const digest = shasum.digest('hex')

if (digest === task.searchHash) {
return word
}
}
}

processTask() 遍历给定区间内的所有索引值,对每一个索引生成对应的字符串,再计算其 SHA1 值,与传入的 task 对象中的 searchHash 比较。

worker.js:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import zmq from 'zeromq'
import { processTask } from './processTask.js'

async function main() {
const fromVentilator = new zmq.Pull()
const toSink = new zmq.Push()

fromVentilator.connect('tcp://localhost:5016')
toSink.connect('tcp://localhost:5017')

for await (const rawMessage of fromVentilator) {
const found = processTask(JSON.parse(rawMessage.toString()))
if (found) {
console.log(`Found! => ${found}`)
await toSink.send(`Found: $found`)
}
}
}

main().catch(err => console.error(err))

worker.js 创建了两个 socket。PULL socket 负责连接到任务发布方(Ventilator),接收任务;PUSH socket 负责连接到结果收集方(sink),传递任务执行的结果。

实现 results collector

collector.js:

1
2
3
4
5
6
7
8
9
10
11
12
import zmq from 'zeromq'

async function main() {
const sink = new zmq.Pull()
await sink.bind('tcp://*:5017')

for await (const rawMessage of sink) {
console.log('Message from worker: ', rawMessage.toString())
}
}

main().catch(err => console.error(err))

运行以下命令测试结果:

1
2
3
4
node worker.js
node worker.js
node collector.js
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

AMQP 实现 pipeline 和 competing consumers

Task distribution architecture using a message queue broker

像前面那样在点对点的模式下,实现 pipeline 是非常直观的。假设我们需要借助 AMQP 这类系统实现任务分配模式,就必须确保每条消息都只会被一个消费者接收到。
可以直接将任务发布到目标 queue,不经过 exchange。避免了 exchange 有可能绑定了多个 queue 的情况。之后,多个消费者同时监听这一个 queue,消息即会以 fanout 的方式均匀地分发给所有的消费者。

hashsum 破解器的 AMQP 实现

producer-amqp.js:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import amqp from 'amqplib'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000

const [, , maxLength, searchHash] = process.argv

async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createConfirmChannel()
await channel.assertQueue('tasks_queue')

const generatorObj = generateTasks(searchHash, ALPHABET,
maxLength, BATCH_SIZE)
for (const task of generatorObj) {
channel.sendToQueue('tasks_queue', Buffer.from(JSON.stringify(task)))
}

await channel.waitForConfirms()
channel.close()
connection.close()
}

main().catch(err => console.error(err))

  • 此处创建的是一个 confirmChannel,它提供了一个 waitForConfirms() 函数,可以在 broker 确认收到消息前等待,确保应用不会过早地关闭到 broker 的连接
  • channel.sendToQueue() 负责将一条消息直接发送给某个 queue,跳过任何 exchange 或者路由

worker-amqp.js:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import amqp from 'amqplib'
import { processTask } from './processTask.js'

async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const { queue } = await channel.assertQueue('tasks_queue')
channel.consume(queue, async (rawMessage) => {
const found = processTask(
JSON.parse(rawMessage.content.toString()))
if (found) {
console.log(`Found! => ${found}`)
await channel.sendToQueue('results_queue',
Buffer.from(`Found: ${found}`))
}
await channel.ack(rawMessage)
})
}

main().catch(err => console.error(err))

collector-amqp.js:

1
2
3
4
5
6
7
8
9
10
11
12
import amqp from 'amqplib'

async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const { queue } = await channel.assertQueue('results_queue')
channel.consume(queue, msg => {
console.log(`Message from worker: ${msg.content.toString()}`)
})
}

main().catch(err => console.error(err))

运行如下命令测试效果:

1
2
3
4
node worker-amqp.js
node worker-amqp.js
node collector-amqp.js
node producer-amqp.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

通过 Redis Streams 实现任务分发

Redis Stream 可以借助一种叫做 consumer groups 的特性实现任务分发模式。Consumer group 是一个有状态的实体,由一组名称标识的消费者组成,组中的消费者会以 round-robin 的方式接收记录。
每条记录都必须被显式地确认,否则该记录会一直处于 pending 状态。每个消费者都只能访问它自己的 pending 记录,假如消费者突然崩溃,在其回到线上后会先尝试获取其 pending 的记录。

A Redis Stream consumer group

Consumer group 也会记录其读取的上一条消息的 ID,因而在连续的读取操作中,consumer group 知道下一条要读取的记录时是哪个。

producer-redis.js:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import Redis from 'ioredis'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const redisClient = new Redis()

const [, , maxLength, searchHash] = process.argv

async function main() {
const generatorObj = generateTasks(searchHash, ALPHABET,
maxLength, BATCH_SIZE)
for (const task of generatorObj) {
await redisClient.xadd('tasks_stream', '*',
'task', JSON.stringify(task))
}

redisClient.disconnect()
}

main().catch(err => console.error(err))

worker-redis.js:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import Redis from 'ioredis'
import { processTask } from './processTask.js'

const redisClient = new Redis()
const [, , consumerName] = process.argv

async function main() {
await redisClient.xgroup('CREATE', 'tasks_stream',
'workers_group', '$', 'MKSTREAM')
.catch(() => console.log('Consumer group already exists'))

const [[, records]] = await redisClient.xreadgroup(
'GROUP', 'workers_group', consumerName, 'STREAMS',
'tasks_stream', '0')
for (const [recordId, [, rawTask]] of records) {
await processAndAck(recordId, rawTask)
}

while (true) {
const [[, records]] = await redisClient.xreadgroup(
'GROUP', 'workers_group', consumerName, 'BLOCK', '0',
'COUNT', '1', 'STREAMS', 'tasks_stream', '>')
for (const [recordId, [, rawTask]] of records) {
await processAndAck(recordId, rawTask)
}
}
}

async function processAndAck(recordId, rawTask) {
const found = processTask(JSON.parse(rawTask))
if (found) {
console.log(`Found! => ${found}`)
await redisClient.xadd('results_stream', '*', 'result',
`Found: ${found}`)
}

await redisClient.xack('tasks_stream', 'workers_group', recordId)
}

main().catch(err => console.error(err))
  • xgroup 命令用来确保 consumer group 存在。
    • CREATE 表示我们希望创建一个 consumer group
    • tasks_stream 表示我们想要读取的 stream 的名字
    • workers_group 是 consumer group 的名字
    • 第四个参数表示 consumer group 开始读取的记录的位置。$ 表示当前 stream 中最后一条记录的 ID
    • MKSTREAM 表示如果 stream 不存在则创建它
  • 通过 xreadgroup 命令读取属于当前 consumer 的所有 pending 的记录。
    • 'GROUP''workers_group'consumerName 用来指代 consumer group 和 consumer 的名字
    • STREAMStasks_stream 用来指代我们想要读取的 stream 的名字
    • 0 用来表示我们想要开始读取的记录的位置。这里表示从属于当前 consumer 的第一条记录开始,读取所有 pending 的消息
  • 通过另外一条 xreadgroup 命令读取 stream 里新增加的记录。
    • 'BLOCK''0' 两个参数表示如果没有新的消息,就一直阻塞等待。'0' 具体表示一直等待永不超时
    • 'COUNT''1' 表示一次请求只获取一条记录
    • 特殊 ID > 表示只获取还没有被当前的 consumer group 处理过的消息
  • processAndAck() 函数负责当 xreadgroup() 返回的记录被处理完成时,调用 xack 命令进行确认,将该记录从当前 consumer 的 pending 列表里移除

collector-redis.js:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import Redis from 'ioredis'

const redisClient = new Redis()

async function main() {
let lastRecordId = '$'
while (true) {
const data = await redisClient.xread(
'BLOCK', '0', 'STREAMS', 'results_stream', lastRecordId)
for (const [, logs] of data) {
for (const [recordId, [, message]] of logs) {
console.log(`Message from worker: ${message}`)
lastRecordId = recordId
}
}
}
}

main().catch(err => console.error(err))

运行程序测试效果:

1
2
3
4
node worker-redis.js workerA
node worker-redis.js workerB
node collector-redis.js
node producer-redis.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

参考资料

Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition