From: Jérôme Benoit Date: Sun, 17 Sep 2023 19:34:30 +0000 (+0200) Subject: Merge branch 'master' into feature/task-functions X-Git-Tag: v2.7.0~1^2~20 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=059cf20e59d680db96e1d812dd6320ba9af55c1c;hp=9b38ab2d1a0473ecc714895e538a81e381a383ad;p=poolifier.git Merge branch 'master' into feature/task-functions --- diff --git a/CHANGELOG.md b/CHANGELOG.md index ad5bfde0..5f5bda97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Rename `listTaskFunctions()` to `listTaskFunctionNames()` in pool and worker API. + +### Added + +- Add `addTaskFunction()`, `removeTaskFunction()`, `setDefaultTaskFunction()` methods to pool API. - Stricter worker constructor arguments validation. ## [2.6.45] - 2023-09-17 diff --git a/docs/api.md b/docs/api.md index c8a135f1..7bddac43 100644 --- a/docs/api.md +++ b/docs/api.md @@ -8,7 +8,7 @@ - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist) - [`pool.start()`](#poolstart) - [`pool.destroy()`](#pooldestroy) - - [`pool.listTaskFunctions()`](#poollisttaskfunctions) + - [`pool.listTaskFunctionNames()`](#poollisttaskfunctionnames) - [`PoolOptions`](#pooloptions) - [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions) - [`ClusterPoolOptions extends PoolOptions`](#clusterpooloptions-extends-pooloptions) @@ -17,7 +17,7 @@ - [`YourWorker.hasTaskFunction(name)`](#yourworkerhastaskfunctionname) - [`YourWorker.addTaskFunction(name, fn)`](#yourworkeraddtaskfunctionname-fn) - [`YourWorker.removeTaskFunction(name)`](#yourworkerremovetaskfunctionname) - - [`YourWorker.listTaskFunctions()`](#yourworkerlisttaskfunctions) + - [`YourWorker.listTaskFunctionNames()`](#yourworkerlisttaskfunctionnames) - [`YourWorker.setDefaultTaskFunction(name)`](#yourworkersetdefaulttaskfunctionname) ## Pool @@ -51,7 +51,7 @@ This method is available on both pool implementations and will start the minimum This method is available on both pool implementations and will call the terminate method on each worker. -### `pool.listTaskFunctions()` +### `pool.listTaskFunctionNames()` This method is available on both pool implementations and returns an array of the task function names. @@ -163,7 +163,7 @@ This method is available on both worker implementations and returns a boolean. This method is available on both worker implementations and returns a boolean. -#### `YourWorker.listTaskFunctions()` +#### `YourWorker.listTaskFunctionNames()` This method is available on both worker implementations and returns an array of the task function names. diff --git a/src/index.ts b/src/index.ts index 08943274..8faba255 100644 --- a/src/index.ts +++ b/src/index.ts @@ -60,6 +60,7 @@ export type { export type { TaskAsyncFunction, TaskFunction, + TaskFunctionOperationReturnType, TaskFunctions, TaskSyncFunction } from './worker/task-functions' @@ -67,7 +68,7 @@ export type { MessageValue, PromiseResponseWrapper, Task, - TaskError, + WorkerError, TaskPerformance, WorkerStatistics, Writable diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index cbc24e7d..3d10391c 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -21,6 +21,7 @@ import { updateMeasurementStatistics } from '../utils' import { KillBehaviors } from '../worker/worker-options' +import type { TaskFunction } from '../worker/task-functions' import { type IPool, PoolEmitter, @@ -91,6 +92,13 @@ export abstract class AbstractPool< */ protected readonly max?: number + /** + * The task functions added at runtime map: + * - `key`: The task function name. + * - `value`: The task function itself. + */ + private readonly taskFunctions: Map> + /** * Whether the pool is started or not. */ @@ -144,6 +152,8 @@ export abstract class AbstractPool< this.setupHook() + this.taskFunctions = new Map>() + this.started = false this.starting = false if (this.opts.startWorkers === true) { @@ -593,7 +603,7 @@ export abstract class AbstractPool< * @param workerId - The worker id. * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ - private getWorkerNodeKeyByWorkerId (workerId: number): number { + private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number { return this.workerNodes.findIndex( workerNode => workerNode.info.id === workerId ) @@ -754,19 +764,128 @@ export abstract class AbstractPool< ) } + private async sendTaskFunctionOperationToWorker ( + workerNodeKey: number, + message: MessageValue + ): Promise { + const workerId = this.getWorkerInfo(workerNodeKey).id as number + return await new Promise((resolve, reject) => { + this.registerWorkerMessageListener(workerNodeKey, message => { + if ( + message.workerId === workerId && + message.taskFunctionOperationStatus === true + ) { + resolve(true) + } else if ( + message.workerId === workerId && + message.taskFunctionOperationStatus === false + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId}` + ) + ) + } + }) + this.sendToWorker(workerNodeKey, message) + }) + } + + private async sendTaskFunctionOperationToWorkers ( + message: Omit, 'workerId'> + ): Promise { + return await new Promise((resolve, reject) => { + const responsesReceived = new Array>() + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.registerWorkerMessageListener(workerNodeKey, message => { + if (message.taskFunctionOperationStatus != null) { + responsesReceived.push(message) + if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.every( + message => message.taskFunctionOperationStatus === true + ) + ) { + resolve(true) + } else if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.some( + message => message.taskFunctionOperationStatus === false + ) + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId as number}` + ) + ) + } + } + }) + this.sendToWorker(workerNodeKey, message) + } + }) + } + + /** @inheritDoc */ + public hasTaskFunction (name: string): boolean { + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.includes(name) + ) { + return true + } + } + return false + } + + /** @inheritDoc */ + public async addTaskFunction ( + name: string, + taskFunction: TaskFunction + ): Promise { + this.taskFunctions.set(name, taskFunction) + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'add', + taskFunctionName: name, + taskFunction: taskFunction.toString() + }) + } + + /** @inheritDoc */ + public async removeTaskFunction (name: string): Promise { + this.taskFunctions.delete(name) + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'remove', + taskFunctionName: name + }) + } + /** @inheritDoc */ - public listTaskFunctions (): string[] { + public listTaskFunctionNames (): string[] { for (const workerNode of this.workerNodes) { if ( - Array.isArray(workerNode.info.taskFunctions) && - workerNode.info.taskFunctions.length > 0 + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.length > 0 ) { - return workerNode.info.taskFunctions + return workerNode.info.taskFunctionNames } } return [] } + /** @inheritDoc */ + public async setDefaultTaskFunction (name: string): Promise { + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'default', + taskFunctionName: name + }) + } + private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && @@ -810,7 +929,6 @@ export abstract class AbstractPool< data: data ?? ({} as Data), transferList, timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -858,18 +976,23 @@ export abstract class AbstractPool< } protected async sendKillMessageToWorker ( - workerNodeKey: number, - workerId: number + workerNodeKey: number ): Promise { await new Promise((resolve, reject) => { this.registerWorkerMessageListener(workerNodeKey, message => { if (message.kill === 'success') { resolve() } else if (message.kill === 'failure') { - reject(new Error(`Worker ${workerId} kill message handling failed`)) + reject( + new Error( + `Worker ${ + message.workerId as number + } kill message handling failed` + ) + ) } }) - this.sendToWorker(workerNodeKey, { kill: true, workerId }) + this.sendToWorker(workerNodeKey, { kill: true }) }) } @@ -969,8 +1092,8 @@ export abstract class AbstractPool< const workerInfo = this.getWorkerInfo(workerNodeKey) return ( workerInfo != null && - Array.isArray(workerInfo.taskFunctions) && - workerInfo.taskFunctions.length > 2 + Array.isArray(workerInfo.taskFunctionNames) && + workerInfo.taskFunctionNames.length > 2 ) } @@ -985,7 +1108,7 @@ export abstract class AbstractPool< ) { --workerTaskStatistics.executing } - if (message.taskError == null) { + if (message.workerError == null) { ++workerTaskStatistics.executed } else { ++workerTaskStatistics.failed @@ -996,7 +1119,7 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { - if (message.taskError != null) { + if (message.workerError != null) { return } updateMeasurementStatistics( @@ -1023,7 +1146,7 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { - if (message.taskError != null) { + if (message.workerError != null) { return } const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = @@ -1173,9 +1296,19 @@ export abstract class AbstractPool< }) const workerInfo = this.getWorkerInfo(workerNodeKey) this.sendToWorker(workerNodeKey, { - checkActive: true, - workerId: workerInfo.id as number + checkActive: true }) + if (this.taskFunctions.size > 0) { + for (const [taskFunctionName, taskFunction] of this.taskFunctions) { + this.sendTaskFunctionOperationToWorker(workerNodeKey, { + taskFunctionOperation: 'add', + taskFunctionName, + taskFunction: taskFunction.toString() + }).catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) + } + } workerInfo.dynamic = true if ( this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || @@ -1245,8 +1378,7 @@ export abstract class AbstractPool< .runTime.aggregate, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate - }, - workerId: this.getWorkerInfo(workerNodeKey).id as number + } }) } @@ -1262,11 +1394,7 @@ export abstract class AbstractPool< }, 0 ) - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] - const task = { - ...(this.dequeueTask(workerNodeKey) as Task), - workerId: destinationWorkerNode.info.id as number - } + const task = this.dequeueTask(workerNodeKey) as Task if (this.shallExecuteTask(destinationWorkerNodeKey)) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1296,7 +1424,6 @@ export abstract class AbstractPool< private taskStealingOnEmptyQueue (workerId: number): void { const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] const workerNodes = this.workerNodes .slice() .sort( @@ -1310,10 +1437,7 @@ export abstract class AbstractPool< workerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: destinationWorkerNode.info.id as number - } + const task = sourceWorkerNode.popTask() as Task if (this.shallExecuteTask(destinationWorkerNodeKey)) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1347,10 +1471,7 @@ export abstract class AbstractPool< workerNode.usage.tasks.queued < (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: workerNode.info.id as number - } + const task = sourceWorkerNode.popTask() as Task if (this.shallExecuteTask(workerNodeKey)) { this.executeTask(workerNodeKey, task) } else { @@ -1372,42 +1493,44 @@ export abstract class AbstractPool< protected workerListener (): (message: MessageValue) => void { return message => { this.checkMessageWorkerId(message) - if (message.ready != null && message.taskFunctions != null) { + if (message.ready != null && message.taskFunctionNames != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) - } else if (message.taskFunctions != null) { - // Task functions message received from worker + } else if (message.taskFunctionNames != null) { + // Task function names message received from worker this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ).taskFunctions = message.taskFunctions + ).taskFunctionNames = message.taskFunctionNames } } } private handleWorkerReadyResponse (message: MessageValue): void { if (message.ready === false) { - throw new Error(`Worker ${message.workerId} failed to initialize`) + throw new Error( + `Worker ${message.workerId as number} failed to initialize` + ) } const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) ) workerInfo.ready = message.ready as boolean - workerInfo.taskFunctions = message.taskFunctions + workerInfo.taskFunctionNames = message.taskFunctionNames if (this.ready) { this.emitter?.emit(PoolEvents.ready, this.info) } } private handleTaskExecutionResponse (message: MessageValue): void { - const { taskId, taskError, data } = message + const { taskId, workerError, data } = message const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - if (taskError != null) { - this.emitter?.emit(PoolEvents.taskError, taskError) - promiseResponse.reject(taskError.message) + if (workerError != null) { + this.emitter?.emit(PoolEvents.taskError, workerError) + promiseResponse.reject(workerError.message) } else { promiseResponse.resolve(data as Response) } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 17ef7e9e..9470cccd 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -73,10 +73,7 @@ export class FixedClusterPool< worker.on('disconnect', () => { worker.kill() }) - await this.sendKillMessageToWorker( - workerNodeKey, - workerNode.info.id as number - ) + await this.sendKillMessageToWorker(workerNodeKey) worker.disconnect() await waitWorkerExit } @@ -86,14 +83,16 @@ export class FixedClusterPool< workerNodeKey: number, message: MessageValue ): void { - this.workerNodes[workerNodeKey].worker.send(message) + this.workerNodes[workerNodeKey].worker.send({ + ...message, + workerId: this.workerNodes[workerNodeKey].info.id as number + }) } /** @inheritDoc */ protected sendStartupMessageToWorker (workerNodeKey: number): void { this.sendToWorker(workerNodeKey, { - ready: false, - workerId: this.workerNodes[workerNodeKey].info.id as number + ready: false }) } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 711a71be..212253ff 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -1,5 +1,6 @@ import { EventEmitter } from 'node:events' import { type TransferListItem } from 'node:worker_threads' +import type { TaskFunction } from '../worker/task-functions' import type { ErrorHandler, ExitHandler, @@ -260,12 +261,45 @@ export interface IPool< * Terminates all workers in this pool. */ readonly destroy: () => Promise + /** + * Whether the specified task function exists in this pool. + * + * @param name - The name of the task function. + * @returns `true` if the task function exists, `false` otherwise. + */ + readonly hasTaskFunction: (name: string) => boolean + /** + * Adds a task function to this pool. + * If a task function with the same name already exists, it will be overwritten. + * + * @param name - The name of the task function. + * @param taskFunction - The task function. + * @returns `true` if the task function was added, `false` otherwise. + */ + readonly addTaskFunction: ( + name: string, + taskFunction: TaskFunction + ) => Promise + /** + * Removes a task function from this pool. + * + * @param name - The name of the task function. + * @returns `true` if the task function was removed, `false` otherwise. + */ + readonly removeTaskFunction: (name: string) => Promise /** * Lists the names of task function available in this pool. * * @returns The names of task function available in this pool. */ - readonly listTaskFunctions: () => string[] + readonly listTaskFunctionNames: () => string[] + /** + * Sets the default task function in this pool. + * + * @param name - The name of the task function. + * @returns `true` if the default task function was set, `false` otherwise. + */ + readonly setDefaultTaskFunction: (name: string) => Promise /** * Sets the worker choice strategy in this pool. * diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 6e234e2e..2ad94a7b 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -67,10 +67,7 @@ export class FixedThreadPool< resolve() }) }) - await this.sendKillMessageToWorker( - workerNodeKey, - workerNode.info.id as number - ) + await this.sendKillMessageToWorker(workerNodeKey) workerNode.closeChannel() await worker.terminate() await waitWorkerExit @@ -84,16 +81,18 @@ export class FixedThreadPool< ): void { ( this.workerNodes[workerNodeKey].messageChannel as MessageChannel - ).port1.postMessage(message, transferList) + ).port1.postMessage( + { ...message, workerId: this.workerNodes[workerNodeKey].info.id }, + transferList + ) } /** @inheritDoc */ protected sendStartupMessageToWorker (workerNodeKey: number): void { const workerNode = this.workerNodes[workerNodeKey] - const worker = workerNode.worker const port2: MessagePort = (workerNode.messageChannel as MessageChannel) .port2 - worker.postMessage( + workerNode.worker.postMessage( { ready: false, workerId: workerNode.info.id, diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 68c8cab2..ed667702 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -161,21 +161,21 @@ implements IWorkerNode { /** @inheritdoc */ public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined { - if (!Array.isArray(this.info.taskFunctions)) { + if (!Array.isArray(this.info.taskFunctionNames)) { throw new Error( `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined` ) } if ( - Array.isArray(this.info.taskFunctions) && - this.info.taskFunctions.length < 3 + Array.isArray(this.info.taskFunctionNames) && + this.info.taskFunctionNames.length < 3 ) { throw new Error( `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements` ) } if (name === DEFAULT_TASK_NAME) { - name = this.info.taskFunctions[1] + name = this.info.taskFunctionNames[1] } if (!this.taskFunctionsUsage.has(name)) { this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name)) @@ -249,7 +249,7 @@ implements IWorkerNode { for (const task of this.tasksQueue) { if ( (task.name === DEFAULT_TASK_NAME && - name === (this.info.taskFunctions as string[])[1]) || + name === (this.info.taskFunctionNames as string[])[1]) || (task.name !== DEFAULT_TASK_NAME && name === task.name) ) { ++taskFunctionQueueSize diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 29050455..37d63085 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -144,7 +144,7 @@ export interface WorkerInfo { /** * Task function names. */ - taskFunctions?: string[] + taskFunctionNames?: string[] } /** diff --git a/src/utility-types.ts b/src/utility-types.ts index e1fb311e..3817515a 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -3,13 +3,13 @@ import type { MessagePort, TransferListItem } from 'node:worker_threads' import type { KillBehavior } from './worker/worker-options' /** - * Task error. + * Worker error. * * @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data. */ -export interface TaskError { +export interface WorkerError { /** - * Task name triggering the error. + * Task function name triggering the error. */ readonly name: string /** @@ -72,7 +72,7 @@ export interface Task { /** * Worker id. */ - readonly workerId: number + readonly workerId?: number /** * Task name. */ @@ -109,17 +109,36 @@ export interface MessageValue */ readonly kill?: KillBehavior | true | 'success' | 'failure' /** - * Task error. + * Worker error. */ - readonly taskError?: TaskError + readonly workerError?: WorkerError /** * Task performance. */ readonly taskPerformance?: TaskPerformance + /** + * Task function operation: + * - `'add'` - Add a task function. + * - `'delete'` - Delete a task function. + * - `'default'` - Set a task function as default. + */ + readonly taskFunctionOperation?: 'add' | 'remove' | 'default' + /** + * Whether the task function operation is successful or not. + */ + readonly taskFunctionOperationStatus?: boolean + /** + * Task function serialized to string. + */ + readonly taskFunction?: string + /** + * Task function name. + */ + readonly taskFunctionName?: string /** * Task function names. */ - readonly taskFunctions?: string[] + readonly taskFunctionNames?: string[] /** * Whether the worker computes the given statistics or not. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index f0ef5932..bc62ade5 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -18,6 +18,7 @@ import { KillBehaviors, type WorkerOptions } from './worker-options' import type { TaskAsyncFunction, TaskFunction, + TaskFunctionOperationReturnType, TaskFunctions, TaskSyncFunction } from './task-functions' @@ -199,11 +200,14 @@ export abstract class AbstractWorker< * * @param name - The name of the task function to check. * @returns Whether the worker has a task function with the given name or not. - * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string. */ - public hasTaskFunction (name: string): boolean { - this.checkTaskFunctionName(name) - return this.taskFunctions.has(name) + public hasTaskFunction (name: string): TaskFunctionOperationReturnType { + try { + this.checkTaskFunctionName(name) + } catch (error) { + return { status: false, error: error as Error } + } + return { status: this.taskFunctions.has(name) } } /** @@ -213,24 +217,21 @@ export abstract class AbstractWorker< * @param name - The name of the task function to add. * @param fn - The task function to add. * @returns Whether the task function was added or not. - * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name. - * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `fn` parameter is not a function. */ public addTaskFunction ( name: string, fn: TaskFunction - ): boolean { - this.checkTaskFunctionName(name) - if (name === DEFAULT_TASK_NAME) { - throw new Error( - 'Cannot add a task function with the default reserved name' - ) - } - if (typeof fn !== 'function') { - throw new TypeError('fn parameter is not a function') - } + ): TaskFunctionOperationReturnType { try { + this.checkTaskFunctionName(name) + if (name === DEFAULT_TASK_NAME) { + throw new Error( + 'Cannot add a task function with the default reserved name' + ) + } + if (typeof fn !== 'function') { + throw new TypeError('fn parameter is not a function') + } const boundFn = fn.bind(this) if ( this.taskFunctions.get(name) === @@ -239,10 +240,10 @@ export abstract class AbstractWorker< this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn) } this.taskFunctions.set(name, boundFn) - this.sendTaskFunctionsListToMainWorker() - return true - } catch { - return false + this.sendTaskFunctionNamesToMainWorker() + return { status: true } + } catch (error) { + return { status: false, error: error as Error } } } @@ -251,27 +252,29 @@ export abstract class AbstractWorker< * * @param name - The name of the task function to remove. * @returns Whether the task function existed and was removed or not. - * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the task function used as default task function. */ - public removeTaskFunction (name: string): boolean { - this.checkTaskFunctionName(name) - if (name === DEFAULT_TASK_NAME) { - throw new Error( - 'Cannot remove the task function with the default reserved name' - ) - } - if ( - this.taskFunctions.get(name) === this.taskFunctions.get(DEFAULT_TASK_NAME) - ) { - throw new Error( - 'Cannot remove the task function used as the default task function' - ) + public removeTaskFunction (name: string): TaskFunctionOperationReturnType { + try { + this.checkTaskFunctionName(name) + if (name === DEFAULT_TASK_NAME) { + throw new Error( + 'Cannot remove the task function with the default reserved name' + ) + } + if ( + this.taskFunctions.get(name) === + this.taskFunctions.get(DEFAULT_TASK_NAME) + ) { + throw new Error( + 'Cannot remove the task function used as the default task function' + ) + } + const deleteStatus = this.taskFunctions.delete(name) + this.sendTaskFunctionNamesToMainWorker() + return { status: deleteStatus } + } catch (error) { + return { status: false, error: error as Error } } - const deleteStatus = this.taskFunctions.delete(name) - this.sendTaskFunctionsListToMainWorker() - return deleteStatus } /** @@ -279,7 +282,7 @@ export abstract class AbstractWorker< * * @returns The names of the worker's task functions. */ - public listTaskFunctions (): string[] { + public listTaskFunctionNames (): string[] { const names: string[] = [...this.taskFunctions.keys()] let defaultTaskFunctionName: string = DEFAULT_TASK_NAME for (const [name, fn] of this.taskFunctions) { @@ -305,30 +308,27 @@ export abstract class AbstractWorker< * * @param name - The name of the task function to use as default task function. * @returns Whether the default task function was set or not. - * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is a non-existing task function. */ - public setDefaultTaskFunction (name: string): boolean { - this.checkTaskFunctionName(name) - if (name === DEFAULT_TASK_NAME) { - throw new Error( - 'Cannot set the default task function reserved name as the default task function' - ) - } - if (!this.taskFunctions.has(name)) { - throw new Error( - 'Cannot set the default task function to a non-existing task function' - ) - } + public setDefaultTaskFunction (name: string): TaskFunctionOperationReturnType { try { + this.checkTaskFunctionName(name) + if (name === DEFAULT_TASK_NAME) { + throw new Error( + 'Cannot set the default task function reserved name as the default task function' + ) + } + if (!this.taskFunctions.has(name)) { + throw new Error( + 'Cannot set the default task function to a non-existing task function' + ) + } this.taskFunctions.set( DEFAULT_TASK_NAME, this.taskFunctions.get(name) as TaskFunction ) - return true - } catch { - return false + return { status: true } + } catch (error) { + return { status: false, error: error as Error } } } @@ -361,6 +361,9 @@ export abstract class AbstractWorker< } else if (message.checkActive != null) { // Check active message received message.checkActive ? this.startCheckActive() : this.stopCheckActive() + } else if (message.taskFunctionOperation != null) { + // Task function operation message received + this.handleTaskFunctionOperationMessage(message) } else if (message.taskId != null && message.data != null) { // Task message received this.run(message) @@ -370,6 +373,35 @@ export abstract class AbstractWorker< } } + protected handleTaskFunctionOperationMessage ( + message: MessageValue + ): void { + const { taskFunctionOperation, taskFunction, taskFunctionName } = message + let response!: TaskFunctionOperationReturnType + if (taskFunctionOperation === 'add') { + response = this.addTaskFunction( + taskFunctionName as string, + // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func + new Function(`return ${taskFunction as string}`)() as TaskFunction< + Data, + Response + > + ) + } else if (taskFunctionOperation === 'remove') { + response = this.removeTaskFunction(taskFunctionName as string) + } else if (taskFunctionOperation === 'default') { + response = this.setDefaultTaskFunction(taskFunctionName as string) + } + this.sendToMainWorker({ + taskFunctionOperation, + taskFunctionOperationStatus: response.status, + workerError: { + name: taskFunctionName as string, + message: this.handleError(response.error as Error | string) + } + }) + } + /** * Handles a kill message sent by the main worker. * @@ -380,11 +412,11 @@ export abstract class AbstractWorker< if (isAsyncFunction(this.opts.killHandler)) { (this.opts.killHandler?.() as Promise) .then(() => { - this.sendToMainWorker({ kill: 'success', workerId: this.id }) + this.sendToMainWorker({ kill: 'success' }) return null }) .catch(() => { - this.sendToMainWorker({ kill: 'failure', workerId: this.id }) + this.sendToMainWorker({ kill: 'failure' }) }) .finally(() => { this.emitDestroy() @@ -394,9 +426,9 @@ export abstract class AbstractWorker< try { // eslint-disable-next-line @typescript-eslint/no-invalid-void-type this.opts.killHandler?.() as void - this.sendToMainWorker({ kill: 'success', workerId: this.id }) + this.sendToMainWorker({ kill: 'success' }) } catch { - this.sendToMainWorker({ kill: 'failure', workerId: this.id }) + this.sendToMainWorker({ kill: 'failure' }) } finally { this.emitDestroy() } @@ -448,7 +480,7 @@ export abstract class AbstractWorker< performance.now() - this.lastTaskTimestamp > (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) ) { - this.sendToMainWorker({ kill: this.opts.killBehavior, workerId: this.id }) + this.sendToMainWorker({ kill: this.opts.killBehavior }) } } @@ -475,12 +507,11 @@ export abstract class AbstractWorker< ): void /** - * Sends the list of task function names to the main worker. + * Sends task function names to the main worker. */ - protected sendTaskFunctionsListToMainWorker (): void { + protected sendTaskFunctionNamesToMainWorker (): void { this.sendToMainWorker({ - taskFunctions: this.listTaskFunctions(), - workerId: this.id + taskFunctionNames: this.listTaskFunctionNames() }) } @@ -505,12 +536,11 @@ export abstract class AbstractWorker< const fn = this.taskFunctions.get(name ?? DEFAULT_TASK_NAME) if (fn == null) { this.sendToMainWorker({ - taskError: { + workerError: { name: name as string, message: `Task function '${name as string}' not found`, data }, - workerId: this.id, taskId }) return @@ -540,17 +570,15 @@ export abstract class AbstractWorker< this.sendToMainWorker({ data: res, taskPerformance, - workerId: this.id, taskId }) } catch (error) { this.sendToMainWorker({ - taskError: { + workerError: { name: name as string, message: this.handleError(error as Error | string), data }, - workerId: this.id, taskId }) } finally { @@ -576,19 +604,17 @@ export abstract class AbstractWorker< this.sendToMainWorker({ data: res, taskPerformance, - workerId: this.id, taskId }) return null }) .catch(error => { this.sendToMainWorker({ - taskError: { + workerError: { name: name as string, message: this.handleError(error as Error | string), data }, - workerId: this.id, taskId }) }) diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 26964aa4..201a516c 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -48,14 +48,12 @@ export class ClusterWorker< this.getMainWorker().on('message', this.messageListener.bind(this)) this.sendToMainWorker({ ready: true, - taskFunctions: this.listTaskFunctions(), - workerId: this.id + taskFunctionNames: this.listTaskFunctionNames() }) } catch { this.sendToMainWorker({ ready: false, - taskFunctions: this.listTaskFunctions(), - workerId: this.id + taskFunctionNames: this.listTaskFunctionNames() }) } } @@ -68,6 +66,6 @@ export class ClusterWorker< /** @inheritDoc */ protected sendToMainWorker (message: MessageValue): void { - this.getMainWorker().send(message) + this.getMainWorker().send({ ...message, workerId: this.id }) } } diff --git a/src/worker/task-functions.ts b/src/worker/task-functions.ts index a61d6962..353b8b0c 100644 --- a/src/worker/task-functions.ts +++ b/src/worker/task-functions.ts @@ -43,3 +43,11 @@ export type TaskFunctions = Record< string, TaskFunction > + +/** + * Task function operation return type. + */ +export interface TaskFunctionOperationReturnType { + status: boolean + error?: Error +} diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index d6a36992..7b92caf6 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -62,14 +62,12 @@ export class ThreadWorker< this.port.on('message', this.messageListener.bind(this)) this.sendToMainWorker({ ready: true, - taskFunctions: this.listTaskFunctions(), - workerId: this.id + taskFunctionNames: this.listTaskFunctionNames() }) } catch { this.sendToMainWorker({ ready: false, - taskFunctions: this.listTaskFunctions(), - workerId: this.id + taskFunctionNames: this.listTaskFunctionNames() }) } } @@ -89,7 +87,7 @@ export class ThreadWorker< /** @inheritDoc */ protected sendToMainWorker (message: MessageValue): void { - this.port.postMessage(message) + this.port.postMessage({ ...message, workerId: this.id }) } /** @inheritDoc */ diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index e9c65dde..93e7c256 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -1250,7 +1250,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) - expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([ + expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ DEFAULT_TASK_NAME, 'jsonIntegerSerialization', 'factorial', @@ -1261,7 +1261,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js' ) await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1) - expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([ + expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([ DEFAULT_TASK_NAME, 'jsonIntegerSerialization', 'factorial', @@ -1289,14 +1289,14 @@ describe('Abstract pool test suite', () => { expect(pool.info.executingTasks).toBe(0) expect(pool.info.executedTasks).toBe(4) for (const workerNode of pool.workerNodes) { - expect(workerNode.info.taskFunctions).toStrictEqual([ + expect(workerNode.info.taskFunctionNames).toStrictEqual([ DEFAULT_TASK_NAME, 'jsonIntegerSerialization', 'factorial', 'fibonacci' ]) expect(workerNode.taskFunctionsUsage.size).toBe(3) - for (const name of pool.listTaskFunctions()) { + for (const name of pool.listTaskFunctionNames()) { expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({ tasks: { executed: expect.any(Number), @@ -1327,7 +1327,9 @@ describe('Abstract pool test suite', () => { expect( workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) ).toStrictEqual( - workerNode.getTaskFunctionWorkerUsage(workerNode.info.taskFunctions[1]) + workerNode.getTaskFunctionWorkerUsage( + workerNode.info.taskFunctionNames[1] + ) ) } await pool.destroy() diff --git a/tests/pools/abstract/worker-node.test.js b/tests/pools/abstract/worker-node.test.js index 45b97c1c..f6305fe7 100644 --- a/tests/pools/abstract/worker-node.test.js +++ b/tests/pools/abstract/worker-node.test.js @@ -139,7 +139,7 @@ describe('Worker node test suite', () => { "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined" ) ) - threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1'] + threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1'] expect(() => threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction') ).toThrowError( @@ -147,7 +147,7 @@ describe('Worker node test suite', () => { "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements" ) ) - threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1', 'fn2'] + threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2'] expect( threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) ).toStrictEqual({ diff --git a/tests/worker/abstract-worker.test.js b/tests/worker/abstract-worker.test.js index 5860ebff..87a78f5d 100644 --- a/tests/worker/abstract-worker.test.js +++ b/tests/worker/abstract-worker.test.js @@ -223,16 +223,20 @@ describe('Abstract worker test suite', () => { return 2 } const worker = new ClusterWorker({ fn1, fn2 }) - expect(() => worker.hasTaskFunction(0)).toThrowError( - new TypeError('name parameter is not a string') - ) - expect(() => worker.hasTaskFunction('')).toThrowError( - new TypeError('name parameter is an empty string') - ) - expect(worker.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true) - expect(worker.hasTaskFunction('fn1')).toBe(true) - expect(worker.hasTaskFunction('fn2')).toBe(true) - expect(worker.hasTaskFunction('fn3')).toBe(false) + expect(worker.hasTaskFunction(0)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is not a string') + }) + expect(worker.hasTaskFunction('')).toStrictEqual({ + status: false, + error: new TypeError('name parameter is an empty string') + }) + expect(worker.hasTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({ + status: true + }) + expect(worker.hasTaskFunction('fn1')).toStrictEqual({ status: true }) + expect(worker.hasTaskFunction('fn2')).toStrictEqual({ status: true }) + expect(worker.hasTaskFunction('fn3')).toStrictEqual({ status: false }) }) it('Verify that addTaskFunction() works', () => { @@ -246,24 +250,30 @@ describe('Abstract worker test suite', () => { return 3 } const worker = new ThreadWorker(fn1) - expect(() => worker.addTaskFunction(0, fn1)).toThrowError( - new TypeError('name parameter is not a string') - ) - expect(() => worker.addTaskFunction('', fn1)).toThrowError( - new TypeError('name parameter is an empty string') - ) - expect(() => worker.addTaskFunction('fn3', '')).toThrowError( - new TypeError('fn parameter is not a function') - ) + expect(worker.addTaskFunction(0, fn1)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is not a string') + }) + expect(worker.addTaskFunction('', fn1)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is an empty string') + }) + expect(worker.addTaskFunction('fn3', '')).toStrictEqual({ + status: false, + error: new TypeError('fn parameter is not a function') + }) expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) expect(worker.taskFunctions.size).toBe(2) expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual( worker.taskFunctions.get('fn1') ) - expect(() => worker.addTaskFunction(DEFAULT_TASK_NAME, fn2)).toThrowError( - new Error('Cannot add a task function with the default reserved name') - ) + expect(worker.addTaskFunction(DEFAULT_TASK_NAME, fn2)).toStrictEqual({ + status: false, + error: new Error( + 'Cannot add a task function with the default reserved name' + ) + }) worker.addTaskFunction('fn2', fn2) expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) @@ -290,12 +300,14 @@ describe('Abstract worker test suite', () => { return 2 } const worker = new ClusterWorker({ fn1, fn2 }) - expect(() => worker.removeTaskFunction(0, fn1)).toThrowError( - new TypeError('name parameter is not a string') - ) - expect(() => worker.removeTaskFunction('', fn1)).toThrowError( - new TypeError('name parameter is an empty string') - ) + expect(worker.removeTaskFunction(0, fn1)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is not a string') + }) + expect(worker.removeTaskFunction('', fn1)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is an empty string') + }) worker.getMainWorker = sinon.stub().returns({ id: 1, send: sinon.stub().returns() @@ -307,16 +319,18 @@ describe('Abstract worker test suite', () => { expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual( worker.taskFunctions.get('fn1') ) - expect(() => worker.removeTaskFunction(DEFAULT_TASK_NAME)).toThrowError( - new Error( + expect(worker.removeTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({ + status: false, + error: new Error( 'Cannot remove the task function with the default reserved name' ) - ) - expect(() => worker.removeTaskFunction('fn1')).toThrowError( - new Error( + }) + expect(worker.removeTaskFunction('fn1')).toStrictEqual({ + status: false, + error: new Error( 'Cannot remove the task function used as the default task function' ) - ) + }) worker.removeTaskFunction('fn2') expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) @@ -333,7 +347,7 @@ describe('Abstract worker test suite', () => { return 2 } const worker = new ClusterWorker({ fn1, fn2 }) - expect(worker.listTaskFunctions()).toStrictEqual([ + expect(worker.listTaskFunctionNames()).toStrictEqual([ DEFAULT_TASK_NAME, 'fn1', 'fn2' @@ -348,12 +362,14 @@ describe('Abstract worker test suite', () => { return 2 } const worker = new ThreadWorker({ fn1, fn2 }) - expect(() => worker.setDefaultTaskFunction(0, fn1)).toThrowError( - new TypeError('name parameter is not a string') - ) - expect(() => worker.setDefaultTaskFunction('', fn1)).toThrowError( - new TypeError('name parameter is an empty string') - ) + expect(worker.setDefaultTaskFunction(0, fn1)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is not a string') + }) + expect(worker.setDefaultTaskFunction('', fn1)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is an empty string') + }) expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function) @@ -361,16 +377,18 @@ describe('Abstract worker test suite', () => { expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual( worker.taskFunctions.get('fn1') ) - expect(() => worker.setDefaultTaskFunction(DEFAULT_TASK_NAME)).toThrowError( - new Error( + expect(worker.setDefaultTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({ + status: false, + error: new Error( 'Cannot set the default task function reserved name as the default task function' ) - ) - expect(() => worker.setDefaultTaskFunction('fn3')).toThrowError( - new Error( + }) + expect(worker.setDefaultTaskFunction('fn3')).toStrictEqual({ + status: false, + error: new Error( 'Cannot set the default task function to a non-existing task function' ) - ) + }) worker.setDefaultTaskFunction('fn1') expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual( worker.taskFunctions.get('fn1')