From: Shinigami Date: Thu, 11 Feb 2021 13:57:37 +0000 (+0100) Subject: Provide a cluster worker pool (#92) X-Git-Tag: v2.0.0-beta.2~54 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=325f50bc1777ea44abc9736ce9d780ec0c8f90e2;p=poolifier.git Provide a cluster worker pool (#92) Co-authored-by: aardizio Co-authored-by: Jérôme Benoit --- diff --git a/benchmarks/bench.js b/benchmarks/bench.js index 017ac20b..b2091613 100644 --- a/benchmarks/bench.js +++ b/benchmarks/bench.js @@ -1,79 +1,35 @@ const Benchmark = require('benchmark') +const { dynamicClusterTest } = require('./cluster/dynamic') +const { fixedClusterTest } = require('./cluster/fixed') +const { dynamicThreadTest } = require('./thread/dynamic') +const { fixedThreadTest } = require('./thread/fixed') + const suite = new Benchmark.Suite() -const { FixedThreadPool } = require('../lib/index') -const { DynamicThreadPool } = require('../lib/index') -const size = 30 -const tasks = 1 const LIST_FORMATTER = new Intl.ListFormat('en-US', { style: 'long', type: 'conjunction' }) -// pools -const fixedPool = new FixedThreadPool(size, './threadWorker.js', { - maxTasks: 10000 -}) -const dynamicPool = new DynamicThreadPool( - size / 2, - size * 3, - './threadWorker.js', - { maxTasks: 10000 } -) -const workerData = { proof: 'ok' } - // wait some seconds before start, my pools need to load threads !!! setTimeout(async () => { test() }, 3000) -// fixed pool proof -async function fixedTest () { - return new Promise((resolve, reject) => { - let executions = 0 - for (let i = 0; i <= tasks; i++) { - fixedPool - .execute(workerData) - .then(res => { - executions++ - if (executions === tasks) { - return resolve('FINISH') - } - return null - }) - .catch(err => { - console.error(err) - }) - } - }) -} - -async function dynamicTest () { - return new Promise((resolve, reject) => { - let executions = 0 - for (let i = 0; i <= tasks; i++) { - dynamicPool - .execute(workerData) - .then(res => { - executions++ - if (executions === tasks) { - return resolve('FINISH') - } - return null - }) - .catch(err => console.error(err)) - } - }) -} - async function test () { // add tests suite - .add('PioardiStaticPool', async function () { - await fixedTest() + .add('Pioardi:Static:ThreadPool', async function () { + await fixedThreadTest() + }) + .add('Pioardi:Dynamic:ThreadPool', async function () { + await dynamicThreadTest() + }) + .add('Pioardi:Static:ClusterPool', async function () { + await fixedClusterTest() }) - .add('PioardiDynamicPool', async function () { - await dynamicTest() + .add('Pioardi:Dynamic:ClusterPool', async function () { + await dynamicClusterTest() }) // add listeners .on('cycle', function (event) { diff --git a/benchmarks/cluster/dynamic.js b/benchmarks/cluster/dynamic.js new file mode 100644 index 00000000..9321054d --- /dev/null +++ b/benchmarks/cluster/dynamic.js @@ -0,0 +1,34 @@ +const { DynamicClusterPool } = require('../../lib/index') + +const size = 30 + +const dynamicPool = new DynamicClusterPool( + size / 2, + size * 3, + './benchmarks/cluster/worker.js', + { + maxTasks: 10000 + } +) + +async function dynamicClusterTest ( + { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } } +) { + return new Promise((resolve, reject) => { + let executions = 0 + for (let i = 0; i <= tasks; i++) { + dynamicPool + .execute(workerData) + .then(res => { + executions++ + if (executions === tasks) { + return resolve('FINISH') + } + return null + }) + .catch(err => console.error(err)) + } + }) +} + +module.exports = { dynamicClusterTest } diff --git a/benchmarks/cluster/fixed.js b/benchmarks/cluster/fixed.js new file mode 100644 index 00000000..c8fe7132 --- /dev/null +++ b/benchmarks/cluster/fixed.js @@ -0,0 +1,31 @@ +const { FixedClusterPool } = require('../../lib/index') + +const size = 30 + +const fixedPool = new FixedClusterPool(size, './benchmarks/cluster/worker.js', { + maxTasks: 10000 +}) + +async function fixedClusterTest ( + { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } } +) { + return new Promise((resolve, reject) => { + let executions = 0 + for (let i = 0; i <= tasks; i++) { + fixedPool + .execute(workerData) + .then(res => { + executions++ + if (executions === tasks) { + return resolve('FINISH') + } + return null + }) + .catch(err => { + console.error(err) + }) + } + }) +} + +module.exports = { fixedClusterTest } diff --git a/benchmarks/cluster/worker.js b/benchmarks/cluster/worker.js new file mode 100644 index 00000000..82408e69 --- /dev/null +++ b/benchmarks/cluster/worker.js @@ -0,0 +1,15 @@ +'use strict' +const { ClusterWorker } = require('../../lib/index') + +function yourFunction (data) { + for (let i = 0; i <= 1000; i++) { + const o = { + a: i + } + JSON.stringify(o) + } + // console.log('This is the main thread ' + isMainThread) + return { ok: 1 } +} + +module.exports = new ClusterWorker(yourFunction) diff --git a/benchmarks/workerThreadsWorker.js b/benchmarks/external/workerThreadsWorker.js similarity index 100% rename from benchmarks/workerThreadsWorker.js rename to benchmarks/external/workerThreadsWorker.js diff --git a/benchmarks/workerpoolWorker.js b/benchmarks/external/workerpoolWorker.js similarity index 100% rename from benchmarks/workerpoolWorker.js rename to benchmarks/external/workerpoolWorker.js diff --git a/benchmarks/myBench.js b/benchmarks/myBench.js index 216b338e..24ccba53 100644 --- a/benchmarks/myBench.js +++ b/benchmarks/myBench.js @@ -7,18 +7,18 @@ const size = 16 // pools const workerThreadsPool = new WorkerThreadsPool({ max: size }) -const workerPool = workerpool.pool('./workerpoolWorker.js', { +const workerPool = workerpool.pool('./external/workerpoolWorker.js', { minWorkers: size / 2, maxWorkers: size * 3, workerType: 'thread' }) -const fixedPool = new FixedThreadPool(size, './threadWorker.js', { +const fixedPool = new FixedThreadPool(size, './thread/worker.js', { maxTasks: 10000 }) const dynamicPool = new DynamicThreadPool( size / 2, size * 3, - './threadWorker.js', + './thread/worker.js', { maxTasks: 10000 } ) @@ -74,7 +74,7 @@ async function workerThreadsPoolTest () { for (let i = 0; i <= tasks; i++) { new Promise((resolve, reject) => { workerThreadsPool.acquire( - './workerThreadsWorker.js', + './external/workerThreadsWorker.js', { workerData: workerData }, (err, worker) => { if (err) { diff --git a/benchmarks/thread/dynamic.js b/benchmarks/thread/dynamic.js new file mode 100644 index 00000000..ad980009 --- /dev/null +++ b/benchmarks/thread/dynamic.js @@ -0,0 +1,29 @@ +const { DynamicThreadPool } = require('../../lib/index') + +const size = 30 + +const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './worker.js', { + maxTasks: 10000 +}) + +async function dynamicThreadTest ( + { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } } +) { + return new Promise((resolve, reject) => { + let executions = 0 + for (let i = 0; i <= tasks; i++) { + dynamicPool + .execute(workerData) + .then(res => { + executions++ + if (executions === tasks) { + return resolve('FINISH') + } + return null + }) + .catch(err => console.error(err)) + } + }) +} + +module.exports = { dynamicThreadTest } diff --git a/benchmarks/thread/fixed.js b/benchmarks/thread/fixed.js new file mode 100644 index 00000000..b5966828 --- /dev/null +++ b/benchmarks/thread/fixed.js @@ -0,0 +1,31 @@ +const { FixedThreadPool } = require('../../lib/index') + +const size = 30 + +const fixedPool = new FixedThreadPool(size, './worker.js', { + maxTasks: 10000 +}) + +async function fixedThreadTest ( + { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } } +) { + return new Promise((resolve, reject) => { + let executions = 0 + for (let i = 0; i <= tasks; i++) { + fixedPool + .execute(workerData) + .then(res => { + executions++ + if (executions === tasks) { + return resolve('FINISH') + } + return null + }) + .catch(err => { + console.error(err) + }) + } + }) +} + +module.exports = { fixedThreadTest } diff --git a/benchmarks/threadWorker.js b/benchmarks/thread/worker.js similarity index 83% rename from benchmarks/threadWorker.js rename to benchmarks/thread/worker.js index b8f974d8..7272a485 100644 --- a/benchmarks/threadWorker.js +++ b/benchmarks/thread/worker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker } = require('../lib/index') +const { ThreadWorker } = require('../../lib/index') function yourFunction (data) { for (let i = 0; i <= 1000; i++) { diff --git a/package-lock.json b/package-lock.json index 29383f86..e18dc723 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2175,9 +2175,9 @@ } }, "graceful-fs": { - "version": "4.2.6", - "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.6.tgz", - "integrity": "sha512-nTnJ528pbqxYanhpDYsi4Rd8MAeaBA67+RZ10CM1m3bTAVFEDcd5AuA4a6W5YkGZ1iNXHzZz8T6TBKLeBuNriQ==", + "version": "4.2.5", + "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.5.tgz", + "integrity": "sha512-kBBSQbz2K0Nyn+31j/w36fUfxkBW9/gfwRWdUY1ULReH3iokVJgddZAFcD1D0xlgTmFxJCbUkUclAlc6/IDJkw==", "dev": true }, "graphql": { @@ -3871,12 +3871,6 @@ "integrity": "sha512-N5ZAX4/LxJmF+7wN74pUD6qAh9/wnvdQcjq9TZjevvXzSUo7bfmw91saqMjzGS2xq91/odN2dW/WOl7qQHNDGA==", "dev": true }, - "queue-microtask": { - "version": "1.2.2", - "resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.2.tgz", - "integrity": "sha512-dB15eXv3p2jDlbOiNLyMabYg1/sXvppd8DP2J3EOCQ0AkuSXCW2tP7mnVouVLJKgUMY6yP0kcQDVpLCN13h4Xg==", - "dev": true - }, "randombytes": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/randombytes/-/randombytes-2.1.0.tgz", @@ -4073,13 +4067,10 @@ } }, "run-parallel": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.2.0.tgz", - "integrity": "sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==", - "dev": true, - "requires": { - "queue-microtask": "^1.2.2" - } + "version": "1.1.10", + "resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.1.10.tgz", + "integrity": "sha512-zb/1OuZ6flOlH6tQyMPUrE3x3Ulxjlo9WIVXR4yVYi4H9UXQaeIsPbLn2R3O3vQCnDKkAl2qHiuocKKX4Tz/Sw==", + "dev": true }, "safe-buffer": { "version": "5.2.1", diff --git a/src/index.ts b/src/index.ts index 4bd61868..95ef8255 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,12 +1,26 @@ -import { DynamicThreadPool } from './dynamic' -import { FixedThreadPool } from './fixed' -import { ThreadWorker } from './workers' +import { DynamicClusterPool } from './pools/cluster/dynamic' +import { FixedClusterPool } from './pools/cluster/fixed' +import { DynamicThreadPool } from './pools/thread/dynamic' +import { FixedThreadPool } from './pools/thread/fixed' +import { ClusterWorker } from './worker/cluster-worker' +import { ThreadWorker } from './worker/thread-worker' -export { DynamicThreadPoolOptions } from './dynamic' -export { - Draft, +export type { DynamicClusterPoolOptions } from './pools/cluster/dynamic' +export type { + FixedClusterPoolOptions, + WorkerWithMessageChannel as ClusterWorkerWithMessageChannel +} from './pools/cluster/fixed' +export type { DynamicThreadPoolOptions } from './pools/thread/dynamic' +export type { FixedThreadPoolOptions, - WorkerWithMessageChannel -} from './fixed' -export { ThreadWorkerOptions } from './workers' -export { FixedThreadPool, DynamicThreadPool, ThreadWorker } + WorkerWithMessageChannel as ThreadWorkerWithMessageChannel +} from './pools/thread/fixed' +export type { WorkerOptions } from './worker/worker-options' +export { + FixedThreadPool, + FixedClusterPool, + DynamicClusterPool, + DynamicThreadPool, + ThreadWorker, + ClusterWorker +} diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts new file mode 100644 index 00000000..d4bf30f3 --- /dev/null +++ b/src/pools/cluster/dynamic.ts @@ -0,0 +1,75 @@ +import { EventEmitter } from 'events' +import type { FixedClusterPoolOptions, WorkerWithMessageChannel } from './fixed' +import { FixedClusterPool } from './fixed' + +class MyEmitter extends EventEmitter {} + +export type DynamicClusterPoolOptions = FixedClusterPoolOptions + +/** + * A cluster pool with a min/max number of workers, is possible to execute tasks in sync or async mode as you prefer. + * + * This cluster pool will create new workers when the other ones are busy, until the max number of workers, + * when the max number of workers is reached, an event will be emitted, if you want to listen this event use the emitter method. + * + * @author [Christopher Quadflieg](https://github.com/Shinigami92) + * @since 2.0.0 + */ +export class DynamicClusterPool< + // eslint-disable-next-line @typescript-eslint/no-explicit-any + Data = any, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + Response = any +> extends FixedClusterPool { + public readonly emitter: MyEmitter + + /** + * @param min Min number of workers that will be always active + * @param max Max number of workers that will be active + * @param filename A file path with implementation of `ClusterWorker` class, relative path is fine. + * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` + */ + public constructor ( + public readonly min: number, + public readonly max: number, + public readonly filename: string, + public readonly opts: DynamicClusterPoolOptions = { maxTasks: 1000 } + ) { + super(min, filename, opts) + + this.emitter = new MyEmitter() + } + + protected chooseWorker (): WorkerWithMessageChannel { + let worker: WorkerWithMessageChannel | undefined + for (const entry of this.tasks) { + if (entry[1] === 0) { + worker = entry[0] + break + } + } + + if (worker) { + // a worker is free, use it + return worker + } else { + if (this.workers.length === this.max) { + this.emitter.emit('FullPool') + return super.chooseWorker() + } + // all workers are busy create a new worker + const worker = this.newWorker() + worker.on('message', (message: { kill?: number }) => { + if (message.kill) { + worker.send({ kill: 1 }) + worker.kill() + // clean workers from data structures + const workerIndex = this.workers.indexOf(worker) + this.workers.splice(workerIndex, 1) + this.tasks.delete(worker) + } + }) + return worker + } + } +} diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts new file mode 100644 index 00000000..332daa4d --- /dev/null +++ b/src/pools/cluster/fixed.ts @@ -0,0 +1,162 @@ +import type { SendHandle } from 'child_process' +import { fork, isMaster, setupMaster, Worker } from 'cluster' +import type { MessageValue } from '../../utility-types' + +export type WorkerWithMessageChannel = Worker // & Draft + +export interface FixedClusterPoolOptions { + /** + * A function that will listen for error event on each worker. + */ + errorHandler?: (this: Worker, e: Error) => void + /** + * A function that will listen for online event on each worker. + */ + onlineHandler?: (this: Worker) => void + /** + * A function that will listen for exit event on each worker. + */ + exitHandler?: (this: Worker, code: number) => void + /** + * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters). + * + * @default 1000 + */ + maxTasks?: number + /** + * Key/value pairs to add to worker process environment. + * + * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + env?: any +} + +/** + * A cluster pool with a static number of workers, is possible to execute tasks in sync or async mode as you prefer. + * + * This pool will select the worker in a round robin fashion. + * + * @author [Christopher Quadflieg](https://github.com/Shinigami92) + * @since 2.0.0 + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export class FixedClusterPool { + public readonly workers: WorkerWithMessageChannel[] = [] + public nextWorker: number = 0 + + // workerId as key and an integer value + public readonly tasks: Map = new Map< + WorkerWithMessageChannel, + number + >() + + protected id: number = 0 + + /** + * @param numWorkers Number of workers for this pool. + * @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine. + * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` + */ + public constructor ( + public readonly numWorkers: number, + public readonly filePath: string, + public readonly opts: FixedClusterPoolOptions = { maxTasks: 1000 } + ) { + if (!isMaster) { + throw new Error('Cannot start a cluster pool from a worker!') + } + // TODO christopher 2021-02-09: Improve this check e.g. with a pattern or blank check + if (!this.filePath) { + throw new Error('Please specify a file with a worker implementation') + } + + setupMaster({ + exec: this.filePath + }) + + for (let i = 1; i <= this.numWorkers; i++) { + this.newWorker() + } + } + + public destroy (): void { + for (const worker of this.workers) { + worker.kill() + } + } + + /** + * Execute the task specified into the constructor with the data parameter. + * + * @param data The input for the task specified. + * @returns Promise that is resolved when the task is done. + */ + public execute (data: Data): Promise { + // configure worker to handle message with the specified task + const worker: WorkerWithMessageChannel = this.chooseWorker() + // console.log('FixedClusterPool#execute choosen worker:', worker) + const previousWorkerIndex = this.tasks.get(worker) + if (previousWorkerIndex !== undefined) { + this.tasks.set(worker, previousWorkerIndex + 1) + } else { + throw Error('Worker could not be found in tasks map') + } + const id: number = ++this.id + const res: Promise = this.internalExecute(worker, id) + // console.log('FixedClusterPool#execute send data to worker:', worker) + worker.send({ data: data || {}, id: id }) + return res + } + + protected internalExecute ( + worker: WorkerWithMessageChannel, + id: number + ): Promise { + return new Promise((resolve, reject) => { + const listener: ( + message: MessageValue, + handle: SendHandle + ) => void = message => { + // console.log('FixedClusterPool#internalExecute listener:', message) + if (message.id === id) { + worker.removeListener('message', listener) + const previousWorkerIndex = this.tasks.get(worker) + if (previousWorkerIndex !== undefined) { + this.tasks.set(worker, previousWorkerIndex + 1) + } else { + throw Error('Worker could not be found in tasks map') + } + if (message.error) reject(message.error) + else resolve(message.data as Response) + } + } + worker.on('message', listener) + }) + } + + protected chooseWorker (): WorkerWithMessageChannel { + if (this.workers.length - 1 === this.nextWorker) { + this.nextWorker = 0 + return this.workers[this.nextWorker] + } else { + this.nextWorker++ + return this.workers[this.nextWorker] + } + } + + protected newWorker (): WorkerWithMessageChannel { + const worker: WorkerWithMessageChannel = fork(this.opts.env) + worker.on('error', this.opts.errorHandler ?? (() => {})) + worker.on('online', this.opts.onlineHandler ?? (() => {})) + // TODO handle properly when a worker exit + worker.on('exit', this.opts.exitHandler ?? (() => {})) + this.workers.push(worker) + // we will attach a listener for every task, + // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size + worker.setMaxListeners(this.opts.maxTasks ?? 1000) + // init tasks map + this.tasks.set(worker, 0) + return worker + } +} diff --git a/src/dynamic.ts b/src/pools/thread/dynamic.ts similarity index 90% rename from src/dynamic.ts rename to src/pools/thread/dynamic.ts index b4c6bddf..e80276d9 100644 --- a/src/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -1,9 +1,6 @@ import { EventEmitter } from 'events' -import { - FixedThreadPool, - FixedThreadPoolOptions, - WorkerWithMessageChannel -} from './fixed' +import type { FixedThreadPoolOptions, WorkerWithMessageChannel } from './fixed' +import { FixedThreadPool } from './fixed' class MyEmitter extends EventEmitter {} @@ -18,12 +15,12 @@ export type DynamicThreadPoolOptions = FixedThreadPoolOptions * @author [Alessandro Pio Ardizio](https://github.com/pioardi) * @since 0.0.1 */ -/* eslint-disable @typescript-eslint/no-explicit-any */ export class DynamicThreadPool< + // eslint-disable-next-line @typescript-eslint/no-explicit-any Data = any, + // eslint-disable-next-line @typescript-eslint/no-explicit-any Response = any > extends FixedThreadPool { - /* eslint-enable @typescript-eslint/no-explicit-any */ public readonly emitter: MyEmitter /** diff --git a/src/fixed.ts b/src/pools/thread/fixed.ts similarity index 95% rename from src/fixed.ts rename to src/pools/thread/fixed.ts index d2c6ba0c..36eddace 100644 --- a/src/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -1,6 +1,5 @@ import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads' - -export type Draft = { -readonly [P in keyof T]?: T[P] } +import type { Draft, MessageValue } from '../../utility-types' export type WorkerWithMessageChannel = Worker & Draft @@ -101,11 +100,7 @@ export class FixedThreadPool { id: number ): Promise { return new Promise((resolve, reject) => { - const listener = (message: { - id: number - error?: string - data: Response - }): void => { + const listener: (message: MessageValue) => void = message => { if (message.id === id) { worker.port2?.removeListener('message', listener) const previousWorkerIndex = this.tasks.get(worker) @@ -115,7 +110,7 @@ export class FixedThreadPool { throw Error('Worker could not be found in tasks map') } if (message.error) reject(message.error) - else resolve(message.data) + else resolve(message.data as Response) } } worker.port2?.on('message', listener) diff --git a/src/utility-types.ts b/src/utility-types.ts new file mode 100644 index 00000000..5a3ef4c5 --- /dev/null +++ b/src/utility-types.ts @@ -0,0 +1,9 @@ +export type Draft = { -readonly [P in keyof T]?: T[P] } + +export interface MessageValue { + readonly data?: Data + readonly id?: number + readonly kill?: number + readonly error?: string + readonly parent?: MessagePort +} diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts new file mode 100644 index 00000000..e750126b --- /dev/null +++ b/src/worker/cluster-worker.ts @@ -0,0 +1,96 @@ +import { AsyncResource } from 'async_hooks' +import { isMaster, worker } from 'cluster' +import type { MessageValue } from '../utility-types' +import type { WorkerOptions } from './worker-options' + +/** + * An example worker that will be always alive, you just need to **extend** this class if you want a static pool. + * + * When this worker is inactive for more than 1 minute, it will send this info to the main worker, + * if you are using DynamicClusterPool, the workers created after will be killed, the min num of worker will be guaranteed. + * + * @author [Christopher Quadflieg](https://github.com/Shinigami92) + * @since 2.0.0 + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export class ClusterWorker extends AsyncResource { + protected readonly maxInactiveTime: number + protected readonly async: boolean + protected lastTask: number + protected readonly interval?: NodeJS.Timeout + + public constructor ( + fn: (data: Data) => Response, + public readonly opts: WorkerOptions = {} + ) { + super('worker-cluster-pool:pioardi') + + this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60 + this.async = !!this.opts.async + this.lastTask = Date.now() + if (!fn) throw new Error('Fn parameter is mandatory') + // keep the worker active + if (!isMaster) { + // console.log('ClusterWorker#constructor', 'is not master') + this.interval = setInterval( + this.checkAlive.bind(this), + this.maxInactiveTime / 2 + ) + this.checkAlive.bind(this)() + } + worker.on('message', (value: MessageValue) => { + // console.log("cluster.on('message', value)", value) + if (value?.data && value.id) { + // here you will receive messages + // console.log('This is the main worker ' + isMaster) + if (this.async) { + this.runInAsyncScope(this.runAsync.bind(this), this, fn, value) + } else { + this.runInAsyncScope(this.run.bind(this), this, fn, value) + } + } else if (value.kill) { + // here is time to kill this worker, just clearing the interval + if (this.interval) clearInterval(this.interval) + this.emitDestroy() + } + }) + } + + protected checkAlive (): void { + if (Date.now() - this.lastTask > this.maxInactiveTime) { + worker.send({ kill: 1 }) + } + } + + protected run ( + fn: (data?: Data) => Response, + value: MessageValue + ): void { + try { + const res = fn(value.data as Data) + worker.send({ data: res, id: value.id }) + this.lastTask = Date.now() + } catch (e) { + const err = e instanceof Error ? e.message : e + worker.send({ error: err, id: value.id }) + this.lastTask = Date.now() + } + } + + protected runAsync ( + fn: (data?: Data) => Promise, + value: MessageValue + ): void { + fn(value.data) + .then(res => { + worker.send({ data: res, id: value.id }) + this.lastTask = Date.now() + return null + }) + .catch(e => { + const err = e instanceof Error ? e.message : e + worker.send({ error: err, id: value.id }) + this.lastTask = Date.now() + }) + } +} diff --git a/src/workers.ts b/src/worker/thread-worker.ts similarity index 59% rename from src/workers.ts rename to src/worker/thread-worker.ts index dc4f3194..d0af904b 100644 --- a/src/workers.ts +++ b/src/worker/thread-worker.ts @@ -1,20 +1,7 @@ import { AsyncResource } from 'async_hooks' import { isMainThread, parentPort } from 'worker_threads' - -export interface ThreadWorkerOptions { - /** - * Max time to wait tasks to work on (in ms), after this period the new worker threads will die. - * - * @default 60.000 ms - */ - maxInactiveTime?: number - /** - * `true` if your function contains async pieces, else `false`. - * - * @default false - */ - async?: boolean -} +import type { MessageValue } from '../utility-types' +import type { WorkerOptions } from './worker-options' /** * An example worker that will be always alive, you just need to **extend** this class if you want a static pool. @@ -35,7 +22,7 @@ export class ThreadWorker extends AsyncResource { public constructor ( fn: (data: Data) => Response, - public readonly opts: ThreadWorkerOptions = {} + public readonly opts: WorkerOptions = {} ) { super('worker-thread-pool:pioardi') @@ -51,33 +38,25 @@ export class ThreadWorker extends AsyncResource { ) this.checkAlive.bind(this)() } - parentPort?.on( - 'message', - (value: { - data?: Response - id?: number - parent?: MessagePort - kill?: number - }) => { - if (value?.data && value.id) { - // here you will receive messages - // console.log('This is the main thread ' + isMainThread) - if (this.async) { - this.runInAsyncScope(this.runAsync.bind(this), this, fn, value) - } else { - this.runInAsyncScope(this.run.bind(this), this, fn, value) - } - } else if (value.parent) { - // save the port to communicate with the main thread - // this will be received once - this.parent = value.parent - } else if (value.kill) { - // here is time to kill this thread, just clearing the interval - if (this.interval) clearInterval(this.interval) - this.emitDestroy() + parentPort?.on('message', (value: MessageValue) => { + if (value?.data && value.id) { + // here you will receive messages + // console.log('This is the main thread ' + isMainThread) + if (this.async) { + this.runInAsyncScope(this.runAsync.bind(this), this, fn, value) + } else { + this.runInAsyncScope(this.run.bind(this), this, fn, value) } + } else if (value.parent) { + // save the port to communicate with the main thread + // this will be received once + this.parent = value.parent + } else if (value.kill) { + // here is time to kill this thread, just clearing the interval + if (this.interval) clearInterval(this.interval) + this.emitDestroy() } - ) + }) } protected checkAlive (): void { @@ -87,8 +66,8 @@ export class ThreadWorker extends AsyncResource { } protected run ( - fn: (data: Data) => Response, - value: { readonly data: Data; readonly id: number } + fn: (data?: Data) => Response, + value: MessageValue ): void { try { const res = fn(value.data) @@ -101,8 +80,8 @@ export class ThreadWorker extends AsyncResource { } protected runAsync ( - fn: (data: Data) => Promise, - value: { readonly data: Data; readonly id: number } + fn: (data?: Data) => Promise, + value: MessageValue ): void { fn(value.data) .then(res => { diff --git a/src/worker/worker-options.ts b/src/worker/worker-options.ts new file mode 100644 index 00000000..86129901 --- /dev/null +++ b/src/worker/worker-options.ts @@ -0,0 +1,14 @@ +export interface WorkerOptions { + /** + * Max time to wait tasks to work on (in ms), after this period the new worker threads will die. + * + * @default 60.000 ms + */ + maxInactiveTime?: number + /** + * `true` if your function contains async pieces, else `false`. + * + * @default false + */ + async?: boolean +} diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js new file mode 100644 index 00000000..6a82b057 --- /dev/null +++ b/tests/pools/cluster/dynamic.test.js @@ -0,0 +1,90 @@ +const expect = require('expect') +const { DynamicClusterPool } = require('../../../lib/index') +const min = 1 +const max = 3 +const pool = new DynamicClusterPool( + min, + max, + './tests/worker/cluster/testWorker.js', + { + errorHandler: e => console.error(e), + onlineHandler: () => console.log('worker is online') + } +) + +describe('Dynamic cluster pool test suite ', () => { + it('Verify that the function is executed in a worker cluster', async () => { + const result = await pool.execute({ test: 'test' }) + expect(result).toBeDefined() + expect(result).toBeFalsy() + }) + + it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => { + const promises = [] + let closedWorkers = 0 + let fullPool = 0 + pool.emitter.on('FullPool', () => fullPool++) + for (let i = 0; i < max * 2; i++) { + promises.push(pool.execute({ test: 'test' })) + } + expect(pool.workers.length).toBeLessThanOrEqual(max) + expect(pool.workers.length).toBeGreaterThan(min) + pool.workers.forEach(w => { + w.on('exit', () => { + closedWorkers++ + }) + }) + expect(fullPool > 1).toBeTruthy() + await new Promise(resolve => setTimeout(resolve, 5000)) + expect(closedWorkers).toBe(max - min) + }) + + it('Verify scale worker up and down is working', async () => { + expect(pool.workers.length).toBe(min) + for (let i = 0; i < max * 10; i++) { + pool.execute({ test: 'test' }) + } + expect(pool.workers.length).toBeGreaterThan(min) + await new Promise(resolve => setTimeout(resolve, 3000)) + expect(pool.workers.length).toBe(min) + for (let i = 0; i < max * 10; i++) { + pool.execute({ test: 'test' }) + } + expect(pool.workers.length).toBeGreaterThan(min) + await new Promise(resolve => setTimeout(resolve, 2000)) + expect(pool.workers.length).toBe(min) + }) + it('Shutdown test', async () => { + let closedWorkers = 0 + pool.workers.forEach(w => { + w.on('exit', () => { + closedWorkers++ + }) + }) + pool.destroy() + await new Promise(resolve => setTimeout(resolve, 1000)) + expect(closedWorkers).toBe(min) + }) + + it('Validations test', () => { + let error + try { + const pool1 = new DynamicClusterPool() + console.log(pool1) + } catch (e) { + error = e + } + expect(error).toBeTruthy() + expect(error.message).toBeTruthy() + }) + + it('Should work even without opts in input', async () => { + const pool1 = new DynamicClusterPool( + 1, + 1, + './tests/worker/cluster/testWorker.js' + ) + const res = await pool1.execute({ test: 'test' }) + expect(res).toBeFalsy() + }) +}) diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js new file mode 100644 index 00000000..7e319ac6 --- /dev/null +++ b/tests/pools/cluster/fixed.test.js @@ -0,0 +1,140 @@ +const expect = require('expect') +const { FixedClusterPool } = require('../../../lib/index') +const numWorkers = 10 +const pool = new FixedClusterPool( + numWorkers, + './tests/worker/cluster/testWorker.js', + { + errorHandler: e => console.error(e), + onlineHandler: () => console.log('worker is online') + } +) +const emptyPool = new FixedClusterPool( + 1, + './tests/worker/cluster/emptyWorker.js' +) +const echoPool = new FixedClusterPool(1, './tests/worker/cluster/echoWorker.js') +const errorPool = new FixedClusterPool( + 1, + './tests/worker/cluster/errorWorker.js', + { + errorHandler: e => console.error(e), + onlineHandler: () => console.log('worker is online') + } +) + +const asyncErrorPool = new FixedClusterPool( + 1, + './tests/worker/cluster/asyncErrorWorker.js', + { + errorHandler: e => console.error(e), + onlineHandler: () => console.log('worker is online') + } +) +const asyncPool = new FixedClusterPool( + 1, + './tests/worker/cluster/asyncWorker.js' +) + +describe('Fixed cluster pool test suite ', () => { + it('Choose worker round robin test', async () => { + const results = new Set() + for (let i = 0; i < numWorkers; i++) { + results.add(pool.chooseWorker().id) + } + expect(results.size).toBe(numWorkers) + }) + + it('Verify that the function is executed in a worker cluster', async () => { + const result = await pool.execute({ test: 'test' }) + expect(result).toBeDefined() + expect(result).toBeFalsy() + }) + + it('Verify that is possible to invoke the execute method without input', async () => { + const result = await pool.execute() + expect(result).toBeDefined() + expect(result).toBeFalsy() + }) + + it('Verify that is possible to have a worker that return undefined', async () => { + const result = await emptyPool.execute() + expect(result).toBeFalsy() + }) + + it('Verify that data are sent to the worker correctly', async () => { + const data = { f: 10 } + const result = await echoPool.execute(data) + expect(result).toBeTruthy() + expect(result.f).toBe(data.f) + }) + + it('Verify that error handling is working properly:sync', async () => { + const data = { f: 10 } + let inError + try { + await errorPool.execute(data) + } catch (e) { + inError = e + } + expect(inError).toBeDefined() + expect(typeof inError === 'string').toBeTruthy() + expect(inError).toBe('Error Message from ClusterWorker') + }) + + it('Verify that error handling is working properly:async', async () => { + const data = { f: 10 } + let inError + try { + await asyncErrorPool.execute(data) + } catch (e) { + inError = e + } + expect(inError).toBeDefined() + expect(typeof inError === 'string').toBeTruthy() + expect(inError).toBe('Error Message from ClusterWorker:async') + }) + + it('Verify that async function is working properly', async () => { + const data = { f: 10 } + const startTime = new Date().getTime() + const result = await asyncPool.execute(data) + const usedTime = new Date().getTime() - startTime + expect(result).toBeTruthy() + expect(result.f).toBe(data.f) + expect(usedTime).toBeGreaterThanOrEqual(2000) + }) + + it('Shutdown test', async () => { + let closedWorkers = 0 + pool.workers.forEach(w => { + w.on('exit', () => { + closedWorkers++ + }) + }) + pool.destroy() + await new Promise(resolve => setTimeout(resolve, 200)) + expect(closedWorkers).toBe(numWorkers) + }) + + it('Validations test', () => { + let error + try { + const pool1 = new FixedClusterPool() + console.log(pool1) + } catch (e) { + error = e + } + expect(error).toBeTruthy() + expect(error.message).toBeTruthy() + }) + + it('Should work even without opts in input', async () => { + const pool1 = new FixedClusterPool( + 1, + './tests/worker/cluster/testWorker.js' + ) + const res = await pool1.execute({ test: 'test' }) + expect(res).toBeFalsy() + }) +}) diff --git a/tests/dynamic.test.js b/tests/pools/thread/dynamic.test.js similarity index 85% rename from tests/dynamic.test.js rename to tests/pools/thread/dynamic.test.js index 5b16a361..98a0dd0f 100644 --- a/tests/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -1,11 +1,16 @@ const expect = require('expect') -const { DynamicThreadPool } = require('../lib/dynamic') +const { DynamicThreadPool } = require('../../../lib/index') const min = 1 const max = 3 -const pool = new DynamicThreadPool(min, max, './tests/workers/testWorker.js', { - errorHandler: e => console.error(e), - onlineHandler: () => console.log('worker is online') -}) +const pool = new DynamicThreadPool( + min, + max, + './tests/worker/thread/testWorker.js', + { + errorHandler: e => console.error(e), + onlineHandler: () => console.log('worker is online') + } +) describe('Dynamic thread pool test suite ', () => { it('Verify that the function is executed in a worker thread', async () => { @@ -72,7 +77,11 @@ describe('Dynamic thread pool test suite ', () => { }) it('Should work even without opts in input', async () => { - const pool1 = new DynamicThreadPool(1, 1, './tests/workers/testWorker.js') + const pool1 = new DynamicThreadPool( + 1, + 1, + './tests/worker/thread/testWorker.js' + ) const res = await pool1.execute({ test: 'test' }) expect(res).toBeFalsy() }) diff --git a/tests/fixed.test.js b/tests/pools/thread/fixed.test.js similarity index 76% rename from tests/fixed.test.js rename to tests/pools/thread/fixed.test.js index b8f87264..b0cf9e57 100644 --- a/tests/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -1,17 +1,25 @@ const expect = require('expect') -const { FixedThreadPool } = require('../lib/fixed') +const { FixedThreadPool } = require('../../../lib/index') const numThreads = 10 -const pool = new FixedThreadPool(numThreads, './tests/workers/testWorker.js', { - errorHandler: e => console.error(e), - onlineHandler: () => console.log('worker is online') -}) -const emptyPool = new FixedThreadPool(1, './tests/workers/emptyWorker.js') -const echoPool = new FixedThreadPool(1, './tests/workers/echoWorker.js') -const errorPool = new FixedThreadPool(1, './tests/workers/errorWorker.js', { - errorHandler: e => console.error(e), - onlineHandler: () => console.log('worker is online') -}) -const asyncPool = new FixedThreadPool(1, './tests/workers/asyncWorker.js') +const pool = new FixedThreadPool( + numThreads, + './tests/worker/thread/testWorker.js', + { + errorHandler: e => console.error(e), + onlineHandler: () => console.log('worker is online') + } +) +const emptyPool = new FixedThreadPool(1, './tests/worker/thread/emptyWorker.js') +const echoPool = new FixedThreadPool(1, './tests/worker/thread/echoWorker.js') +const errorPool = new FixedThreadPool( + 1, + './tests/worker/thread/errorWorker.js', + { + errorHandler: e => console.error(e), + onlineHandler: () => console.log('worker is online') + } +) +const asyncPool = new FixedThreadPool(1, './tests/worker/thread/asyncWorker.js') describe('Fixed thread pool test suite ', () => { it('Choose worker round robin test', async () => { @@ -93,7 +101,7 @@ describe('Fixed thread pool test suite ', () => { }) it('Should work even without opts in input', async () => { - const pool1 = new FixedThreadPool(1, './tests/workers/testWorker.js') + const pool1 = new FixedThreadPool(1, './tests/worker/thread/testWorker.js') const res = await pool1.execute({ test: 'test' }) expect(res).toBeFalsy() }) diff --git a/tests/worker/cluster/asyncErrorWorker.js b/tests/worker/cluster/asyncErrorWorker.js new file mode 100644 index 00000000..2476853b --- /dev/null +++ b/tests/worker/cluster/asyncErrorWorker.js @@ -0,0 +1,16 @@ +'use strict' +const { ClusterWorker } = require('../../../lib/index') + +async function error (data) { + return new Promise((resolve, reject) => { + setTimeout( + () => reject(new Error('Error Message from ClusterWorker:async')), + 2000 + ) + }) +} + +module.exports = new ClusterWorker(error, { + maxInactiveTime: 500, + async: true +}) diff --git a/tests/worker/cluster/asyncWorker.js b/tests/worker/cluster/asyncWorker.js new file mode 100644 index 00000000..975b0b36 --- /dev/null +++ b/tests/worker/cluster/asyncWorker.js @@ -0,0 +1,10 @@ +'use strict' +const { ClusterWorker } = require('../../../lib/index') + +async function sleep (data) { + return new Promise((resolve, reject) => { + setTimeout(() => resolve(data), 2000) + }) +} + +module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true }) diff --git a/tests/worker/cluster/echoWorker.js b/tests/worker/cluster/echoWorker.js new file mode 100644 index 00000000..6c77bcce --- /dev/null +++ b/tests/worker/cluster/echoWorker.js @@ -0,0 +1,8 @@ +'use strict' +const { ClusterWorker } = require('../../../lib/index') + +function echo (data) { + return data +} + +module.exports = new ClusterWorker(echo, { maxInactiveTime: 500 }) diff --git a/tests/worker/cluster/emptyWorker.js b/tests/worker/cluster/emptyWorker.js new file mode 100644 index 00000000..62c8e2bb --- /dev/null +++ b/tests/worker/cluster/emptyWorker.js @@ -0,0 +1,6 @@ +'use strict' +const { ClusterWorker } = require('../../../lib/index') + +function test (data) {} + +module.exports = new ClusterWorker(test, { maxInactiveTime: 500 }) diff --git a/tests/worker/cluster/errorWorker.js b/tests/worker/cluster/errorWorker.js new file mode 100644 index 00000000..87df9254 --- /dev/null +++ b/tests/worker/cluster/errorWorker.js @@ -0,0 +1,11 @@ +'use strict' +const { ClusterWorker } = require('../../../lib/index') + +function error (data) { + throw new Error('Error Message from ClusterWorker') +} + +module.exports = new ClusterWorker(error, { + maxInactiveTime: 500, + async: false +}) diff --git a/tests/worker/cluster/testWorker.js b/tests/worker/cluster/testWorker.js new file mode 100644 index 00000000..fd8c5603 --- /dev/null +++ b/tests/worker/cluster/testWorker.js @@ -0,0 +1,15 @@ +'use strict' +const { ClusterWorker } = require('../../../lib/index') +const cluster = require('cluster') + +function test (data) { + for (let i = 0; i <= 50; i++) { + const o = { + a: i + } + JSON.stringify(o) + } + return cluster.isMaster +} + +module.exports = new ClusterWorker(test, { maxInactiveTime: 500 }) diff --git a/tests/workers/asyncWorker.js b/tests/worker/thread/asyncWorker.js similarity index 79% rename from tests/workers/asyncWorker.js rename to tests/worker/thread/asyncWorker.js index 098774e0..25401cfb 100644 --- a/tests/workers/asyncWorker.js +++ b/tests/worker/thread/asyncWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker } = require('../../lib/workers') +const { ThreadWorker } = require('../../../lib/index') async function sleep (data) { return new Promise((resolve, reject) => { diff --git a/tests/workers/echoWorker.js b/tests/worker/thread/echoWorker.js similarity index 68% rename from tests/workers/echoWorker.js rename to tests/worker/thread/echoWorker.js index 1026a369..006bf97c 100644 --- a/tests/workers/echoWorker.js +++ b/tests/worker/thread/echoWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker } = require('../../lib/workers') +const { ThreadWorker } = require('../../../lib/index') function echo (data) { return data diff --git a/tests/workers/emptyWorker.js b/tests/worker/thread/emptyWorker.js similarity index 65% rename from tests/workers/emptyWorker.js rename to tests/worker/thread/emptyWorker.js index 4c7dce51..69a83a77 100644 --- a/tests/workers/emptyWorker.js +++ b/tests/worker/thread/emptyWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker } = require('../../lib/workers') +const { ThreadWorker } = require('../../../lib/index') function test (data) {} diff --git a/tests/workers/errorWorker.js b/tests/worker/thread/errorWorker.js similarity index 70% rename from tests/workers/errorWorker.js rename to tests/worker/thread/errorWorker.js index c9b4f963..63a27513 100644 --- a/tests/workers/errorWorker.js +++ b/tests/worker/thread/errorWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker } = require('../../lib/workers') +const { ThreadWorker } = require('../../../lib/index') function error (data) { throw new Error(data) diff --git a/tests/workers/testWorker.js b/tests/worker/thread/testWorker.js similarity index 83% rename from tests/workers/testWorker.js rename to tests/worker/thread/testWorker.js index 5c436a14..3556da01 100644 --- a/tests/workers/testWorker.js +++ b/tests/worker/thread/testWorker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker } = require('../../lib/workers') +const { ThreadWorker } = require('../../../lib/index') const { isMainThread } = require('worker_threads') function test (data) {