From c3719753af0a9be03abf722a7543495359e817b5 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 15 Dec 2023 17:59:52 +0100 Subject: [PATCH] refactor: move worker setup into worker node constructor MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 + docs/api.md | 6 -- src/index.ts | 8 +- src/pools/abstract-pool.ts | 76 +++++++++++------- src/pools/cluster/dynamic.ts | 7 +- src/pools/cluster/fixed.ts | 36 ++------- src/pools/pool.ts | 19 +++++ src/pools/thread/dynamic.ts | 7 +- src/pools/thread/fixed.ts | 31 +------- src/pools/utils.ts | 74 +++++++++++++++--- src/pools/worker-node.ts | 49 ++++++++++-- src/pools/worker.ts | 76 +++++++++++++++--- tests/pools/abstract-pool.test.mjs | 5 +- tests/pools/cluster/dynamic.test.mjs | 2 +- tests/pools/thread/dynamic.test.mjs | 2 +- tests/pools/utils.test.mjs | 23 +++++- tests/pools/worker-node.test.mjs | 113 ++++++++++++++++++++++----- 17 files changed, 376 insertions(+), 162 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a308d289..ff6fbaca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- TypeScript breaking change: merge ThreadPoolOptions and ClusterPoolOptions types into PoolOptions type. + ## [3.0.14] - 2023-12-13 ### Fixed diff --git a/docs/api.md b/docs/api.md index 28282ac2..0a139275 100644 --- a/docs/api.md +++ b/docs/api.md @@ -14,8 +14,6 @@ - [`pool.listTaskFunctionNames()`](#poollisttaskfunctionnames) - [`pool.setDefaultTaskFunction(name)`](#poolsetdefaulttaskfunctionname) - [`PoolOptions`](#pooloptions) - - [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions) - - [`ClusterPoolOptions extends PoolOptions`](#clusterpooloptions-extends-pooloptions) - [Worker](#worker) - [`class YourWorker extends ThreadWorker/ClusterWorker`](#class-yourworker-extends-threadworkerclusterworker) - [`YourWorker.hasTaskFunction(name)`](#yourworkerhastaskfunctionname) @@ -141,12 +139,8 @@ An object with these properties: Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true }` -#### `ThreadPoolOptions extends PoolOptions` - - `workerOptions` (optional) - An object with the worker options to pass to worker. See [worker_threads](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for more details. -#### `ClusterPoolOptions extends PoolOptions` - - `env` (optional) - An object with the environment variables to pass to worker. See [cluster](https://nodejs.org/api/cluster.html#cluster_cluster_fork_env) for more details. - `settings` (optional) - An object with the cluster settings. See [cluster](https://nodejs.org/api/cluster.html#cluster_cluster_settings) for more details. diff --git a/src/index.ts b/src/index.ts index 80274289..a5ffc3a3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,9 +1,6 @@ export type { AbstractPool } from './pools/abstract-pool' export { DynamicClusterPool } from './pools/cluster/dynamic' -export { - FixedClusterPool, - type ClusterPoolOptions -} from './pools/cluster/fixed' +export { FixedClusterPool } from './pools/cluster/fixed' export { PoolEvents, PoolTypes } from './pools/pool' export type { IPool, @@ -26,6 +23,7 @@ export type { StrategyData, TaskStatistics, WorkerInfo, + WorkerNodeOptions, WorkerType, WorkerUsage } from './pools/worker' @@ -45,7 +43,7 @@ export type { } from './pools/selection-strategies/selection-strategies-types' export type { WorkerChoiceStrategyContext } from './pools/selection-strategies/worker-choice-strategy-context' export { DynamicThreadPool } from './pools/thread/dynamic' -export { FixedThreadPool, type ThreadPoolOptions } from './pools/thread/fixed' +export { FixedThreadPool } from './pools/thread/fixed' export type { AbstractWorker } from './worker/abstract-worker' export { ClusterWorker } from './worker/cluster-worker' export { ThreadWorker } from './worker/thread-worker' diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 9b0ec92f..56be9a1a 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1258,26 +1258,27 @@ export abstract class AbstractPool< transferList?: TransferListItem[] ): void - /** - * Creates a new worker. - * - * @returns Newly created worker. - */ - protected abstract createWorker (): Worker - /** * Creates a new, completely set up worker node. * * @returns New, completely set up worker node key. */ protected createAndSetupWorkerNode (): number { - const worker = this.createWorker() - - worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) - worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) - worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', error => { - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + const workerNode = this.createWorkerNode() + workerNode.registerWorkerEventHandler( + 'online', + this.opts.onlineHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler( + 'message', + this.opts.messageHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler( + 'error', + this.opts.errorHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler('error', (error: Error) => { + const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker) this.flagWorkerNodeAsNotReady(workerNodeKey) const workerInfo = this.getWorkerInfo(workerNodeKey) this.emitter?.emit(PoolEvents.error, error) @@ -1298,15 +1299,15 @@ export abstract class AbstractPool< this.redistributeQueuedTasks(workerNodeKey) } }) - worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) - worker.once('exit', () => { - this.removeWorkerNode(worker) + workerNode.registerWorkerEventHandler( + 'exit', + this.opts.exitHandler ?? EMPTY_FUNCTION + ) + workerNode.registerOnceWorkerEventHandler('exit', () => { + this.removeWorkerNode(workerNode.worker) }) - - const workerNodeKey = this.addWorkerNode(worker) - + const workerNodeKey = this.addWorkerNode(workerNode) this.afterWorkerNodeSetup(workerNodeKey) - return workerNodeKey } @@ -1806,23 +1807,38 @@ export abstract class AbstractPool< } /** - * Adds the given worker in the pool worker nodes. + * Creates a worker node. * - * @param worker - The worker. - * @returns The added worker node key. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. + * @returns The created worker node. */ - private addWorkerNode (worker: Worker): number { + private createWorkerNode (): IWorkerNode { const workerNode = new WorkerNode( - worker, - this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) + this.worker, + this.filePath, + { + env: this.opts.env, + workerOptions: this.opts.workerOptions, + tasksQueueBackPressureSize: + this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) + } ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true } + return workerNode + } + + /** + * Adds the given worker node in the pool worker nodes. + * + * @param workerNode - The worker node. + * @returns The added worker node key. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. + */ + private addWorkerNode (workerNode: IWorkerNode): number { this.workerNodes.push(workerNode) - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey === -1) { throw new Error('Worker added not found in worker nodes') } @@ -1830,7 +1846,7 @@ export abstract class AbstractPool< } /** - * Removes the given worker from the pool worker nodes. + * Removes the worker node associated to the give given worker from the pool worker nodes. * * @param worker - The worker. */ diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 99a89966..32aad22a 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -1,6 +1,7 @@ -import { type PoolType, PoolTypes } from '../pool' +import { type Worker } from 'node:cluster' +import { type PoolOptions, type PoolType, PoolTypes } from '../pool' import { checkDynamicPoolSize } from '../utils' -import { type ClusterPoolOptions, FixedClusterPool } from './fixed' +import { FixedClusterPool } from './fixed' /** * A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers. @@ -29,7 +30,7 @@ export class DynamicClusterPool< min: number, protected readonly max: number, filePath: string, - opts: ClusterPoolOptions = {} + opts: PoolOptions = {} ) { super(min, filePath, opts) checkDynamicPoolSize(this.numberOfWorkers, this.max) diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index b74f034b..f3fb54bb 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -1,27 +1,9 @@ -import cluster, { type ClusterSettings, type Worker } from 'node:cluster' +import cluster, { type Worker } from 'node:cluster' import type { MessageValue } from '../../utility-types' import { AbstractPool } from '../abstract-pool' import { type PoolOptions, type PoolType, PoolTypes } from '../pool' import { type WorkerType, WorkerTypes } from '../worker' -/** - * Options for a poolifier cluster pool. - */ -export interface ClusterPoolOptions extends PoolOptions { - /** - * Key/value pairs to add to worker process environment. - * - * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env - */ - env?: Record - /** - * Cluster settings. - * - * @see https://nodejs.org/api/cluster.html#cluster_cluster_settings - */ - settings?: ClusterSettings -} - /** * A cluster pool with a fixed number of workers. * @@ -44,7 +26,7 @@ export class FixedClusterPool< public constructor ( numberOfWorkers: number, filePath: string, - protected readonly opts: ClusterPoolOptions = {} + protected readonly opts: PoolOptions = {} ) { super(numberOfWorkers, filePath, opts) } @@ -65,18 +47,17 @@ export class FixedClusterPool< this.flushTasksQueue(workerNodeKey) // FIXME: wait for tasks to be finished const workerNode = this.workerNodes[workerNodeKey] - const worker = workerNode.worker const waitWorkerExit = new Promise(resolve => { - worker.once('exit', () => { + workerNode.registerOnceWorkerEventHandler('exit', () => { resolve() }) }) - worker.once('disconnect', () => { - worker.kill() + workerNode.registerOnceWorkerEventHandler('disconnect', () => { + workerNode.worker.kill() }) await this.sendKillMessageToWorker(workerNodeKey) workerNode.removeAllListeners() - worker.disconnect() + workerNode.worker.disconnect() await waitWorkerExit } @@ -122,11 +103,6 @@ export class FixedClusterPool< this.workerNodes[workerNodeKey].worker.off('message', listener) } - /** @inheritDoc */ - protected createWorker (): Worker { - return cluster.fork(this.opts.env) - } - /** @inheritDoc */ protected get type (): PoolType { return PoolTypes.fixed diff --git a/src/pools/pool.ts b/src/pools/pool.ts index b1b504d5..856473ff 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -1,5 +1,6 @@ import type { TransferListItem } from 'node:worker_threads' import type { EventEmitterAsyncResource } from 'node:events' +import type { ClusterSettings } from 'node:cluster' import type { TaskFunction } from '../worker/task-functions' import type { ErrorHandler, @@ -189,6 +190,24 @@ export interface PoolOptions { * Pool worker node tasks queue options. */ tasksQueueOptions?: TasksQueueOptions + /** + * Worker options. + * + * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options + */ + workerOptions?: WorkerOptions + /** + * Key/value pairs to add to worker process environment. + * + * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env + */ + env?: Record + /** + * Cluster settings. + * + * @see https://nodejs.org/api/cluster.html#cluster_cluster_settings + */ + settings?: ClusterSettings } /** diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 119e556d..6def273e 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -1,6 +1,7 @@ -import { type PoolType, PoolTypes } from '../pool' +import { type Worker } from 'node:worker_threads' +import { type PoolOptions, type PoolType, PoolTypes } from '../pool' import { checkDynamicPoolSize } from '../utils' -import { FixedThreadPool, type ThreadPoolOptions } from './fixed' +import { FixedThreadPool } from './fixed' /** * A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads. @@ -29,7 +30,7 @@ export class DynamicThreadPool< min: number, protected readonly max: number, filePath: string, - opts: ThreadPoolOptions = {} + opts: PoolOptions = {} ) { super(min, filePath, opts) checkDynamicPoolSize(this.numberOfWorkers, this.max) diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 28f8fceb..e5074e39 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -1,10 +1,8 @@ import { type MessageChannel, type MessagePort, - SHARE_ENV, type TransferListItem, - Worker, - type WorkerOptions, + type Worker, isMainThread } from 'node:worker_threads' import type { MessageValue } from '../../utility-types' @@ -12,18 +10,6 @@ import { AbstractPool } from '../abstract-pool' import { type PoolOptions, type PoolType, PoolTypes } from '../pool' import { type WorkerType, WorkerTypes } from '../worker' -/** - * Options for a poolifier thread pool. - */ -export interface ThreadPoolOptions extends PoolOptions { - /** - * Worker options. - * - * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options - */ - workerOptions?: WorkerOptions -} - /** * A thread pool with a fixed number of threads. * @@ -46,7 +32,7 @@ export class FixedThreadPool< public constructor ( numberOfThreads: number, filePath: string, - protected readonly opts: ThreadPoolOptions = {} + protected readonly opts: PoolOptions = {} ) { super(numberOfThreads, filePath, opts) } @@ -62,16 +48,15 @@ export class FixedThreadPool< this.flushTasksQueue(workerNodeKey) // FIXME: wait for tasks to be finished const workerNode = this.workerNodes[workerNodeKey] - const worker = workerNode.worker const waitWorkerExit = new Promise(resolve => { - worker.once('exit', () => { + workerNode.registerOnceWorkerEventHandler('exit', () => { resolve() }) }) await this.sendKillMessageToWorker(workerNodeKey) workerNode.closeChannel() workerNode.removeAllListeners() - await worker.terminate() + await workerNode.worker.terminate() await waitWorkerExit } @@ -135,14 +120,6 @@ export class FixedThreadPool< ) } - /** @inheritDoc */ - protected createWorker (): Worker { - return new Worker(this.filePath, { - env: SHARE_ENV, - ...this.opts.workerOptions - }) - } - /** @inheritDoc */ protected get type (): PoolType { return PoolTypes.fixed diff --git a/src/pools/utils.ts b/src/pools/utils.ts index eb800c4a..11c30ad1 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -1,4 +1,6 @@ import { existsSync } from 'node:fs' +import cluster from 'node:cluster' +import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads' import { average, isPlainObject, max, median, min } from '../utils' import { type MeasurementStatisticsRequirements, @@ -6,9 +8,21 @@ import { type WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types' import type { TasksQueueOptions } from './pool' -import type { IWorker, MeasurementStatistics } from './worker' +import { + type IWorker, + type MeasurementStatistics, + type WorkerNodeOptions, + type WorkerType, + WorkerTypes +} from './worker' export const checkFilePath = (filePath: string): void => { + if (filePath == null) { + throw new TypeError('The worker file path must be specified') + } + if (typeof filePath !== 'string') { + throw new TypeError('The worker file path must be a string') + } if (!existsSync(filePath)) { throw new Error(`Cannot find the worker file '${filePath}'`) } @@ -86,26 +100,43 @@ export const checkValidTasksQueueOptions = ( } } -export const checkWorkerNodeArguments = ( - worker: Worker, - tasksQueueBackPressureSize: number +export const checkWorkerNodeArguments = ( + type: WorkerType, + filePath: string, + opts: WorkerNodeOptions ): void => { - if (worker == null) { - throw new TypeError('Cannot construct a worker node without a worker') + if (type == null) { + throw new TypeError('Cannot construct a worker node without a worker type') + } + if (!Object.values(WorkerTypes).includes(type)) { + throw new TypeError( + `Cannot construct a worker node with an invalid worker type '${type}'` + ) } - if (tasksQueueBackPressureSize == null) { + checkFilePath(filePath) + if (opts == null) { throw new TypeError( - 'Cannot construct a worker node without a tasks queue back pressure size' + 'Cannot construct a worker node without worker node options' ) } - if (!Number.isSafeInteger(tasksQueueBackPressureSize)) { + if (opts != null && !isPlainObject(opts)) { throw new TypeError( - 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer' + 'Cannot construct a worker node with invalid options: must be a plain object' ) } - if (tasksQueueBackPressureSize <= 0) { + if (opts.tasksQueueBackPressureSize == null) { + throw new TypeError( + 'Cannot construct a worker node without a tasks queue back pressure size option' + ) + } + if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) { + throw new TypeError( + 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' + ) + } + if (opts.tasksQueueBackPressureSize <= 0) { throw new RangeError( - 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer' + 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' ) } } @@ -153,3 +184,22 @@ export const updateMeasurementStatistics = ( } } } + +export const createWorker = ( + type: WorkerType, + filePath: string, + opts: { env?: Record, workerOptions?: WorkerOptions } +): Worker => { + switch (type) { + case WorkerTypes.thread: + return new Worker(filePath, { + env: SHARE_ENV, + ...opts?.workerOptions + }) as unknown as Worker + case WorkerTypes.cluster: + return cluster.fork(opts?.env) as unknown as Worker + default: + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + throw new Error(`Unknown worker type '${type}'`) + } +} diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 59c4de7a..9a98458e 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -5,15 +5,20 @@ import type { Task } from '../utility-types' import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils' import { Deque } from '../deque' import { + type ErrorHandler, + type ExitHandler, type IWorker, type IWorkerNode, + type MessageHandler, + type OnlineHandler, type StrategyData, type WorkerInfo, + type WorkerNodeOptions, type WorkerType, WorkerTypes, type WorkerUsage } from './worker' -import { checkWorkerNodeArguments } from './utils' +import { checkWorkerNodeArguments, createWorker } from './utils' /** * Worker node. @@ -43,19 +48,23 @@ export class WorkerNode /** * Constructs a new worker node. * - * @param worker - The worker. - * @param tasksQueueBackPressureSize - The tasks queue back pressure size. + * @param type - The worker type. + * @param filePath - The worker file path. + * @param opts - The worker node options. */ - constructor (worker: Worker, tasksQueueBackPressureSize: number) { + constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) { super() - checkWorkerNodeArguments(worker, tasksQueueBackPressureSize) - this.worker = worker - this.info = this.initWorkerInfo(worker) + checkWorkerNodeArguments(type, filePath, opts) + this.worker = createWorker(type, filePath, { + env: opts.env, + workerOptions: opts.workerOptions + }) + this.info = this.initWorkerInfo(this.worker) this.usage = this.initWorkerUsage() if (this.info.type === WorkerTypes.thread) { this.messageChannel = new MessageChannel() } - this.tasksQueueBackPressureSize = tasksQueueBackPressureSize + this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize this.tasksQueue = new Deque>() this.onBackPressureStarted = false this.taskFunctionsUsage = new Map() @@ -125,6 +134,30 @@ export class WorkerNode } } + /** @inheritdoc */ + public registerWorkerEventHandler ( + event: string, + listener: + | OnlineHandler + | MessageHandler + | ErrorHandler + | ExitHandler + ): void { + this.worker.on(event, listener) + } + + /** @inheritdoc */ + public registerOnceWorkerEventHandler ( + event: string, + listener: + | OnlineHandler + | MessageHandler + | ErrorHandler + | ExitHandler + ): void { + this.worker.once(event, listener) + } + /** @inheritdoc */ public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined { if (!Array.isArray(this.info.taskFunctionNames)) { diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 5439606d..1a94ef60 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -208,27 +208,39 @@ export interface IWorker { * @param event - The event. * @param handler - The event handler. */ - readonly on: ((event: 'online', handler: OnlineHandler) => void) & - ((event: 'message', handler: MessageHandler) => void) & - ((event: 'error', handler: ErrorHandler) => void) & - ((event: 'exit', handler: ExitHandler) => void) - /** - * Registers a listener to the exit event that will only be performed once. + readonly on: ( + event: string, + handler: + | OnlineHandler + | MessageHandler + | ErrorHandler + | ExitHandler + ) => void + /** + * Registers once an event listener. * - * @param event - The `'exit'` event. - * @param handler - The exit handler. + * @param event - The event. + * @param handler - The event handler. */ - readonly once: (event: 'exit', handler: ExitHandler) => void + readonly once: ( + event: string, + handler: + | OnlineHandler + | MessageHandler + | ErrorHandler + | ExitHandler + ) => void } /** - * Worker node event detail. + * Worker node options. * * @internal */ -export interface WorkerNodeEventDetail { - workerId: number - workerNodeKey?: number +export interface WorkerNodeOptions { + workerOptions?: WorkerOptions + env?: Record + tasksQueueBackPressureSize: number } /** @@ -316,6 +328,34 @@ export interface IWorkerNode * Closes communication channel. */ readonly closeChannel: () => void + /** + * Registers a worker event handler. + * + * @param event - The event. + * @param listener - The event listener. + */ + readonly registerWorkerEventHandler: ( + event: string, + listener: + | OnlineHandler + | MessageHandler + | ErrorHandler + | ExitHandler + ) => void + /** + * Registers once a worker event handler. + * + * @param event - The event. + * @param listener - The event listener. + */ + readonly registerOnceWorkerEventHandler: ( + event: string, + listener: + | OnlineHandler + | MessageHandler + | ErrorHandler + | ExitHandler + ) => void /** * Gets task function worker usage statistics. * @@ -331,3 +371,13 @@ export interface IWorkerNode */ readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean } + +/** + * Worker node event detail. + * + * @internal + */ +export interface WorkerNodeEventDetail { + workerId: number + workerNodeKey?: number +} diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index edf03f4a..5dc28935 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -78,7 +78,10 @@ describe('Abstract pool test suite', () => { it('Verify that filePath is checked', () => { expect(() => new FixedThreadPool(numberOfWorkers)).toThrow( - new Error("Cannot find the worker file 'undefined'") + new TypeError('The worker file path must be specified') + ) + expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow( + new TypeError('The worker file path must be a string') ) expect( () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts') diff --git a/tests/pools/cluster/dynamic.test.mjs b/tests/pools/cluster/dynamic.test.mjs index 6e8580e9..2a654b6a 100644 --- a/tests/pools/cluster/dynamic.test.mjs +++ b/tests/pools/cluster/dynamic.test.mjs @@ -76,7 +76,7 @@ describe('Dynamic cluster pool test suite', () => { it('Validation of inputs test', () => { expect(() => new DynamicClusterPool(min)).toThrow( - "Cannot find the worker file 'undefined'" + 'The worker file path must be specified' ) }) diff --git a/tests/pools/thread/dynamic.test.mjs b/tests/pools/thread/dynamic.test.mjs index efcc417e..f7329710 100644 --- a/tests/pools/thread/dynamic.test.mjs +++ b/tests/pools/thread/dynamic.test.mjs @@ -76,7 +76,7 @@ describe('Dynamic thread pool test suite', () => { it('Validation of inputs test', () => { expect(() => new DynamicThreadPool(min)).toThrow( - "Cannot find the worker file 'undefined'" + 'The worker file path must be specified' ) }) diff --git a/tests/pools/utils.test.mjs b/tests/pools/utils.test.mjs index bf0c72af..5feb6596 100644 --- a/tests/pools/utils.test.mjs +++ b/tests/pools/utils.test.mjs @@ -1,9 +1,15 @@ +import { Worker as ThreadWorker } from 'node:worker_threads' +import { Worker as ClusterWorker } from 'node:cluster' import { expect } from 'expect' import { CircularArray, DEFAULT_CIRCULAR_ARRAY_SIZE } from '../../lib/circular-array.js' -import { updateMeasurementStatistics } from '../../lib/pools/utils.js' +import { + createWorker, + updateMeasurementStatistics +} from '../../lib/pools/utils.js' +import { WorkerTypes } from '../../lib/index.js' describe('Pool utils test suite', () => { it('Verify updateMeasurementStatistics() behavior', () => { @@ -92,4 +98,19 @@ describe('Pool utils test suite', () => { ) }) }) + + it('Verify createWorker() behavior', () => { + expect( + createWorker( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs' + ) + ).toBeInstanceOf(ThreadWorker) + expect( + createWorker( + WorkerTypes.cluster, + './tests/worker-files/cluster/testWorker.mjs' + ) + ).toBeInstanceOf(ClusterWorker) + }) }) diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index f70ce37b..cfca3422 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -1,5 +1,4 @@ -import { MessageChannel, Worker } from 'node:worker_threads' -import cluster from 'node:cluster' +import { MessageChannel } from 'node:worker_threads' import { expect } from 'expect' import { WorkerNode } from '../../lib/pools/worker-node.js' import { WorkerTypes } from '../../lib/index.js' @@ -8,46 +7,119 @@ import { Deque } from '../../lib/deque.js' import { DEFAULT_TASK_NAME } from '../../lib/utils.js' describe('Worker node test suite', () => { - const threadWorker = new Worker('./tests/worker-files/thread/testWorker.mjs') - const clusterWorker = cluster.fork() - const threadWorkerNode = new WorkerNode(threadWorker, 12) - const clusterWorkerNode = new WorkerNode(clusterWorker, 12) + const threadWorkerNode = new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { tasksQueueBackPressureSize: 12 } + ) + const clusterWorkerNode = new WorkerNode( + WorkerTypes.cluster, + './tests/worker-files/cluster/testWorker.js', + { tasksQueueBackPressureSize: 12 } + ) it('Worker node instantiation', () => { expect(() => new WorkerNode()).toThrow( - new TypeError('Cannot construct a worker node without a worker') + new TypeError('Cannot construct a worker node without a worker type') ) - expect(() => new WorkerNode(threadWorker)).toThrow( + expect( + () => + new WorkerNode( + 'invalidWorkerType', + './tests/worker-files/thread/testWorker.mjs', + { tasksQueueBackPressureSize: 12 } + ) + ).toThrow( + new TypeError( + "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'" + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs' + ) + ).toThrow( new TypeError( - 'Cannot construct a worker node without a tasks queue back pressure size' + 'Cannot construct a worker node without worker node options' ) ) expect( - () => new WorkerNode(threadWorker, 'invalidTasksQueueBackPressureSize') + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + '' + ) ).toThrow( new TypeError( - 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer' + 'Cannot construct a worker node with invalid options: must be a plain object' ) ) - expect(() => new WorkerNode(threadWorker, 0.2)).toThrow( + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + {} + ) + ).toThrow( new TypeError( - 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer' + 'Cannot construct a worker node without a tasks queue back pressure size option' ) ) - expect(() => new WorkerNode(threadWorker, 0)).toThrow( + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' } + ) + ).toThrow( + new TypeError( + 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { tasksQueueBackPressureSize: 0.2 } + ) + ).toThrow( + new TypeError( + 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { tasksQueueBackPressureSize: 0 } + ) + ).toThrow( new RangeError( - 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer' + 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' ) ) - expect(() => new WorkerNode(threadWorker, -1)).toThrow( + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { tasksQueueBackPressureSize: -1 } + ) + ).toThrow( new RangeError( - 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer' + 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' ) ) expect(threadWorkerNode).toBeInstanceOf(WorkerNode) - expect(threadWorkerNode.worker).toBe(threadWorker) expect(threadWorkerNode.info).toStrictEqual({ - id: threadWorker.threadId, + id: threadWorkerNode.worker.threadId, type: WorkerTypes.thread, dynamic: false, ready: false @@ -88,9 +160,8 @@ describe('Worker node test suite', () => { expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) expect(clusterWorkerNode).toBeInstanceOf(WorkerNode) - expect(clusterWorkerNode.worker).toBe(clusterWorker) expect(clusterWorkerNode.info).toStrictEqual({ - id: clusterWorker.id, + id: clusterWorkerNode.worker.id, type: WorkerTypes.cluster, dynamic: false, ready: false -- 2.34.1