From a86b6df187001e7e2e5c248ddb828286f985096c Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 4 May 2023 22:04:27 +0200 Subject: [PATCH] feat: support multiple functions per worker MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Close #550 Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++ README.md | 4 +- examples/multiFunctionExample.js | 4 +- examples/multifunctionWorker.js | 18 +++---- src/pools/abstract-pool.ts | 3 +- src/pools/pool.ts | 3 +- src/pools/worker.ts | 4 ++ src/utility-types.ts | 13 +++++ src/worker/abstract-worker.ts | 81 +++++++++++++++++++--------- src/worker/cluster-worker.ts | 14 +++-- src/worker/thread-worker.ts | 8 ++- src/worker/worker-options.ts | 1 + tests/worker/abstract-worker.test.js | 50 ++++++++++++----- 13 files changed, 149 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 631e2d66..b3e71fc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Support multiple task functions per worker. + ## [2.4.11] - 2023-04-23 ### Changed diff --git a/README.md b/README.md index 857d9c26..7767c75c 100644 --- a/README.md +++ b/README.md @@ -139,7 +139,7 @@ pool.execute({}).then(res => { You can do the same with the classes ClusterWorker, FixedClusterPool and DynamicClusterPool. -**See examples folder for more details (in particular if you want to use a pool for [multiple functions](./examples/multiFunctionExample.js))**. +**See examples folder for more details (in particular if you want to use a pool with [multiple worker functions](./examples/multiFunctionExample.js))**. **Now TypeScript is also supported, find how to use it into the example folder**. Remember that workers can only send and receive serializable data. @@ -209,7 +209,7 @@ This method will call the terminate method on each worker. ### `class YourWorker extends ThreadWorker/ClusterWorker` -`fn` (mandatory) The function that you want to execute on the worker +`taskFunctions` (mandatory) The task function(s) that you want to execute on the worker `opts` (optional) An object with these properties: - `maxInactiveTime` (optional) - Max time to wait tasks to work on in milliseconds, after this period the new worker will die. diff --git a/examples/multiFunctionExample.js b/examples/multiFunctionExample.js index 31fd8809..213736ea 100644 --- a/examples/multiFunctionExample.js +++ b/examples/multiFunctionExample.js @@ -5,11 +5,11 @@ const pool = new FixedThreadPool(15, './multiFunctionWorker.js', { }) pool - .execute({ functionName: 'fn0', input: 'hello' }) + .execute({ text: 'hello' }, 'fn0') .then(res => console.log(res)) .catch(err => console.error(err)) pool - .execute({ functionName: 'fn1', input: 'multiple functions' }) + .execute({ text: 'multiple functions' }, 'fn1') .then(res => console.log(res)) .catch(err => console.error(err)) diff --git a/examples/multifunctionWorker.js b/examples/multifunctionWorker.js index 61369cfa..a38d8bb0 100644 --- a/examples/multifunctionWorker.js +++ b/examples/multifunctionWorker.js @@ -1,14 +1,14 @@ 'use strict' const { ThreadWorker } = require('poolifier') -function yourFunction (data) { - if (data.functionName === 'fn0') { - console.log('Executing function 0') - return { data: '0 your input was' + data.input } - } else if (data.functionName === 'fn1') { - console.log('Executing function 1') - return { data: '1 your input was' + data.input } - } +function fn0 (data) { + console.log('Executing function 0') + return { data: 'fn0 your input was' + data.text } } -module.exports = new ThreadWorker(yourFunction) +function fn1 (data) { + console.log('Executing function 1') + return { data: 'fn1 your input was' + data.text } +} + +module.exports = new ThreadWorker({ fn0, fn1 }) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 2ee3a0cc..298c0334 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -304,9 +304,10 @@ export abstract class AbstractPool< } /** @inheritDoc */ - public async execute (data?: Data): Promise { + public async execute (data?: Data, name?: string): Promise { const [workerNodeKey, workerNode] = this.chooseWorkerNode() const submittedTask: Task = { + name, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), id: crypto.randomUUID() diff --git a/src/pools/pool.ts b/src/pools/pool.ts index f6188d8e..992aaaf1 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -144,9 +144,10 @@ export interface IPool< * Executes the function specified in the worker constructor with the task data input parameter. * * @param data - The task input data for the specified worker function. This can only be serializable data. + * @param name - The name of the worker function to execute. If not specified, the default worker function will be executed. * @returns Promise that will be fulfilled when the task is completed. */ - execute: (data?: Data) => Promise + execute: (data?: Data, name?: string) => Promise /** * Shutdowns every current worker in this pool. */ diff --git a/src/pools/worker.ts b/src/pools/worker.ts index adec0138..d2d2a56f 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -36,6 +36,10 @@ export type ExitHandler = ( * @internal */ export interface Task { + /** + * Task name. + */ + readonly name?: string /** * Task input data that will be passed to the worker. */ diff --git a/src/utility-types.ts b/src/utility-types.ts index ea90afd6..54494354 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -68,6 +68,19 @@ export type WorkerAsyncFunction = ( export type WorkerFunction = | WorkerSyncFunction | WorkerAsyncFunction +/** + * Worker functions object that can be executed. + * This object can contain synchronous or asynchronous functions. + * The key is the name of the function. + * The value is the function itself. + * + * @typeParam Data - Type of data sent to the worker. This can only be serializable data. + * @typeParam Response - Type of execution response. This can only be serializable data. + */ +export type TaskFunctions = Record< +string, +WorkerFunction +> /** * An object holding the execution response promise resolve/reject callbacks. diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index fbf99ab8..788c5dfa 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -1,11 +1,12 @@ import { AsyncResource } from 'node:async_hooks' import type { Worker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' -import type { - MessageValue, - WorkerAsyncFunction, - WorkerFunction, - WorkerSyncFunction +import { + type MessageValue, + type TaskFunctions, + type WorkerAsyncFunction, + type WorkerFunction, + type WorkerSyncFunction } from '../utility-types' import { EMPTY_FUNCTION } from '../utils' import type { KillBehavior, WorkerOptions } from './worker-options' @@ -26,6 +27,10 @@ export abstract class AbstractWorker< Data = unknown, Response = unknown > extends AsyncResource { + /** + * Task function(s) processed by the worker when the pool's `execution` function is invoked. + */ + protected taskFunctions!: Map> /** * Timestamp of the last task processed by this worker. */ @@ -39,14 +44,16 @@ export abstract class AbstractWorker< * * @param type - The type of async event. * @param isMain - Whether this is the main worker or not. - * @param fn - Function processed by the worker when the pool's `execution` function is invoked. + * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. * @param mainWorker - Reference to main worker. * @param opts - Options for the worker. */ public constructor ( type: string, protected readonly isMain: boolean, - fn: WorkerFunction, + taskFunctions: + | WorkerFunction + | TaskFunctions, protected mainWorker: MainWorker | undefined | null, protected readonly opts: WorkerOptions = { /** @@ -62,7 +69,7 @@ export abstract class AbstractWorker< ) { super(type) this.checkWorkerOptions(this.opts) - this.checkFunctionInput(fn) + this.checkTaskFunctions(taskFunctions) if (!this.isMain) { this.lastTaskTimestamp = performance.now() this.aliveInterval = setInterval( @@ -75,7 +82,7 @@ export abstract class AbstractWorker< this.mainWorker?.on( 'message', (message: MessageValue) => { - this.messageListener(message, fn) + this.messageListener(message) } ) } @@ -88,19 +95,41 @@ export abstract class AbstractWorker< } /** - * Checks if the `fn` parameter is passed to the constructor. + * Checks if the `taskFunctions` parameter is passed to the constructor. * - * @param fn - The function that should be defined. + * @param taskFunctions - The task function(s) that should be defined. */ - private checkFunctionInput (fn: WorkerFunction): void { - if (fn == null) throw new Error('fn parameter is mandatory') - if (typeof fn !== 'function') { - throw new TypeError('fn parameter is not a function') + private checkTaskFunctions ( + taskFunctions: + | WorkerFunction + | TaskFunctions + ): void { + if (taskFunctions == null) { throw new Error('taskFunctions parameter is mandatory') } + if ( + typeof taskFunctions !== 'function' && + typeof taskFunctions !== 'object' + ) { + throw new Error('taskFunctions parameter is not a function or an object') } - if (fn.constructor.name === 'AsyncFunction' && this.opts.async === false) { - throw new Error( - 'fn parameter is an async function, please set the async option to true' - ) + if ( + typeof taskFunctions === 'object' && + taskFunctions.constructor !== Object && + Object.prototype.toString.call(taskFunctions) !== '[object Object]' + ) { + throw new Error('taskFunctions parameter is not an object literal') + } + this.taskFunctions = new Map>() + if (typeof taskFunctions !== 'function') { + for (const [name, fn] of Object.entries(taskFunctions)) { + if (typeof fn !== 'function') { + throw new Error( + 'A taskFunctions parameter object value is not a function' + ) + } + this.taskFunctions.set(name, fn.bind(this)) + } + } else { + this.taskFunctions.set('default', taskFunctions.bind(this)) } } @@ -108,15 +137,17 @@ export abstract class AbstractWorker< * Worker message listener. * * @param message - Message received. - * @param fn - Function processed by the worker when the pool's `execution` function is invoked. */ - protected messageListener ( - message: MessageValue, - fn: WorkerFunction - ): void { + protected messageListener (message: MessageValue): void { if (message.id != null && message.data != null) { + let fn: WorkerFunction | undefined + if (message.name == null) { + fn = this.taskFunctions.get('default') + } else { + fn = this.taskFunctions.get(message.name) + } // Task message received - if (this.opts.async === true) { + if (fn?.constructor.name === 'AsyncFunction') { this.runInAsyncScope(this.runAsync.bind(this), this, fn, message) } else { this.runInAsyncScope(this.run.bind(this), this, fn, message) diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 8725dc89..49b0f984 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -1,6 +1,10 @@ import type { Worker } from 'node:cluster' import cluster from 'node:cluster' -import type { MessageValue, WorkerFunction } from '../utility-types' +import type { + MessageValue, + TaskFunctions, + WorkerFunction +} from '../utility-types' import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' @@ -25,17 +29,19 @@ export class ClusterWorker< /** * Constructs a new poolifier cluster worker. * - * @param fn - Function processed by the worker when the pool's `execution` function is invoked. + * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. * @param opts - Options for the worker. */ public constructor ( - fn: WorkerFunction, + taskFunctions: + | WorkerFunction + | TaskFunctions, opts: WorkerOptions = {} ) { super( 'worker-cluster-pool:poolifier', cluster.isPrimary, - fn, + taskFunctions, cluster.worker, opts ) diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index 03566a4c..ce17c59a 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -1,6 +1,10 @@ import type { MessagePort } from 'node:worker_threads' import { isMainThread, parentPort } from 'node:worker_threads' -import type { MessageValue, WorkerFunction } from '../utility-types' +import type { + MessageValue, + TaskFunctions, + WorkerFunction +} from '../utility-types' import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' @@ -29,7 +33,7 @@ export class ThreadWorker< * @param opts - Options for the worker. */ public constructor ( - fn: WorkerFunction, + fn: WorkerFunction | TaskFunctions, opts: WorkerOptions = {} ) { super('worker-thread-pool:poolifier', isMainThread, fn, parentPort, opts) diff --git a/src/worker/worker-options.ts b/src/worker/worker-options.ts index 9ef50ade..791f915d 100644 --- a/src/worker/worker-options.ts +++ b/src/worker/worker-options.ts @@ -53,6 +53,7 @@ export interface WorkerOptions { * Whether your worker will perform asynchronous or not. * * @defaultValue false + * @deprecated This option will be removed in the next major version. */ async?: boolean /** diff --git a/tests/worker/abstract-worker.test.js b/tests/worker/abstract-worker.test.js index b62ba942..25e09723 100644 --- a/tests/worker/abstract-worker.test.js +++ b/tests/worker/abstract-worker.test.js @@ -27,26 +27,52 @@ describe('Abstract worker test suite', () => { expect(worker.opts.async).toBe(true) }) - it('Verify that fn parameter is mandatory', () => { - expect(() => new ClusterWorker()).toThrowError('fn parameter is mandatory') + it('Verify that taskFunctions parameter is mandatory', () => { + expect(() => new ClusterWorker()).toThrowError( + 'taskFunctions parameter is mandatory' + ) }) - it('Verify that fn parameter is a function', () => { - expect(() => new ClusterWorker({})).toThrowError( - new TypeError('fn parameter is not a function') + it('Verify that taskFunctions parameter is a function or an object', () => { + expect(() => new ClusterWorker(0)).toThrowError( + new TypeError('taskFunctions parameter is not a function or an object') ) expect(() => new ClusterWorker('')).toThrowError( - new TypeError('fn parameter is not a function') + new TypeError('taskFunctions parameter is not a function or an object') + ) + expect(() => new ClusterWorker(true)).toThrowError( + new TypeError('taskFunctions parameter is not a function or an object') ) }) - it('Verify that async fn parameter without async option throw error', () => { - const fn = async () => { - return new Promise() - } - expect(() => new ClusterWorker(fn)).toThrowError( - 'fn parameter is an async function, please set the async option to true' + it('Verify that taskFunctions parameter is an object literal', () => { + expect(() => new ClusterWorker([])).toThrowError( + new TypeError('taskFunctions parameter is not an object literal') + ) + expect(() => new ClusterWorker(new Map())).toThrowError( + new TypeError('taskFunctions parameter is not an object literal') + ) + expect(() => new ClusterWorker(new Set())).toThrowError( + new TypeError('taskFunctions parameter is not an object literal') + ) + expect(() => new ClusterWorker(new WeakMap())).toThrowError( + new TypeError('taskFunctions parameter is not an object literal') ) + expect(() => new ClusterWorker(new WeakSet())).toThrowError( + new TypeError('taskFunctions parameter is not an object literal') + ) + }) + + it('Verify that taskFunctions parameter with multiple task functions is taken', () => { + const fn1 = () => { + return 1 + } + const fn2 = () => { + return 2 + } + const worker = new ClusterWorker({ fn1, fn2 }) + expect(typeof worker.taskFunctions.get('fn1') === 'function').toBe(true) + expect(typeof worker.taskFunctions.get('fn2') === 'function').toBe(true) }) it('Verify that handleError() method is working properly', () => { -- 2.34.1