From: Jérôme Benoit Date: Mon, 18 Sep 2023 21:42:22 +0000 (+0200) Subject: Merge branch 'master' into feature/task-functions X-Git-Tag: v2.7.0~1^2~8 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=103057eedf4f7b18f009333806d0293bfb23e204;hp=f083bd34985d16051031c1ed0c713c5aff86f464;p=poolifier.git Merge branch 'master' into feature/task-functions --- diff --git a/CHANGELOG.md b/CHANGELOG.md index ad5bfde0..b6ba6fba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Rename `listTaskFunctions()` to `listTaskFunctionNames()` in pool and worker API. + +### Added + +- Add `hasTaskFunction()`, `addTaskFunction()`, `removeTaskFunction()`, `setDefaultTaskFunction()` methods to pool API. - Stricter worker constructor arguments validation. ## [2.6.45] - 2023-09-17 diff --git a/docs/api.md b/docs/api.md index df7d1ce5..8e3c7707 100644 --- a/docs/api.md +++ b/docs/api.md @@ -8,7 +8,11 @@ - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist) - [`pool.start()`](#poolstart) - [`pool.destroy()`](#pooldestroy) - - [`pool.listTaskFunctions()`](#poollisttaskfunctions) + - [`pool.hasTaskFunction(name)`](#poolhastaskfunctionname) + - [`pool.addTaskFunction(name, fn)`](#pooladdtaskfunctionname-fn) + - [`pool.removeTaskFunction(name)`](#poolremovetaskfunctionname) + - [`pool.listTaskFunctionNames()`](#poollisttaskfunctionnames) + - [`pool.setDefaultTaskFunction(name)`](#poolsetdefaulttaskfunctionname) - [`PoolOptions`](#pooloptions) - [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions) - [`ClusterPoolOptions extends PoolOptions`](#clusterpooloptions-extends-pooloptions) @@ -17,7 +21,7 @@ - [`YourWorker.hasTaskFunction(name)`](#yourworkerhastaskfunctionname) - [`YourWorker.addTaskFunction(name, fn)`](#yourworkeraddtaskfunctionname-fn) - [`YourWorker.removeTaskFunction(name)`](#yourworkerremovetaskfunctionname) - - [`YourWorker.listTaskFunctions()`](#yourworkerlisttaskfunctions) + - [`YourWorker.listTaskFunctionNames()`](#yourworkerlisttaskfunctionnames) - [`YourWorker.setDefaultTaskFunction(name)`](#yourworkersetdefaulttaskfunctionname) ## Pool @@ -39,7 +43,7 @@ `data` (optional) An object that you want to pass to your worker implementation. `name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'` -`transferList` (optional) An array of transferable objects that you want to transfer to your [worker_threads](https://nodejs.org/api/worker_threads.html) worker implementation +`transferList` (optional) An array of transferable objects that you want to transfer to your [worker_threads](https://nodejs.org/api/worker_threads.html) worker implementation. This method is available on both pool implementations and returns a promise with the task function execution response. @@ -51,10 +55,35 @@ This method is available on both pool implementations and will start the minimum This method is available on both pool implementations and will call the terminate method on each worker. -### `pool.listTaskFunctions()` +### `pool.hasTaskFunction(name)` + +`name` (mandatory) The task function name. + +This method is available on both pool implementations and returns a boolean. + +### `pool.addTaskFunction(name, fn)` + +`name` (mandatory) The task function name. +`fn` (mandatory) The task function. + +This method is available on both pool implementations and returns a boolean promise. + +### `pool.removeTaskFunction(name)` + +`name` (mandatory) The task function name. + +This method is available on both pool implementations and returns a boolean promise. + +### `pool.listTaskFunctionNames()` This method is available on both pool implementations and returns an array of the task function names. +### `pool.setDefaultTaskFunction(name)` + +`name` (mandatory) The task function name. + +This method is available on both pool implementations and returns a boolean promise. + ### `PoolOptions` An object with these properties: @@ -148,22 +177,22 @@ An object with these properties: `name` (mandatory) The task function name. -This method is available on both worker implementations and returns a boolean. +This method is available on both worker implementations and returns `{ status: boolean, error?: Error }`. #### `YourWorker.addTaskFunction(name, fn)` `name` (mandatory) The task function name. `fn` (mandatory) The task function. -This method is available on both worker implementations and returns a boolean. +This method is available on both worker implementations and returns `{ status: boolean, error?: Error }`. #### `YourWorker.removeTaskFunction(name)` `name` (mandatory) The task function name. -This method is available on both worker implementations and returns a boolean. +This method is available on both worker implementations and returns `{ status: boolean, error?: Error }`. -#### `YourWorker.listTaskFunctions()` +#### `YourWorker.listTaskFunctionNames()` This method is available on both worker implementations and returns an array of the task function names. @@ -171,4 +200,4 @@ This method is available on both worker implementations and returns an array of `name` (mandatory) The task function name. -This method is available on both worker implementations and returns a boolean. +This method is available on both worker implementations and returns `{ status: boolean, error?: Error }`. diff --git a/src/index.ts b/src/index.ts index 08943274..8faba255 100644 --- a/src/index.ts +++ b/src/index.ts @@ -60,6 +60,7 @@ export type { export type { TaskAsyncFunction, TaskFunction, + TaskFunctionOperationReturnType, TaskFunctions, TaskSyncFunction } from './worker/task-functions' @@ -67,7 +68,7 @@ export type { MessageValue, PromiseResponseWrapper, Task, - TaskError, + WorkerError, TaskPerformance, WorkerStatistics, Writable diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index cbc24e7d..1bd5ee2c 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -21,6 +21,7 @@ import { updateMeasurementStatistics } from '../utils' import { KillBehaviors } from '../worker/worker-options' +import type { TaskFunction } from '../worker/task-functions' import { type IPool, PoolEmitter, @@ -91,6 +92,13 @@ export abstract class AbstractPool< */ protected readonly max?: number + /** + * The task functions added at runtime map: + * - `key`: The task function name. + * - `value`: The task function itself. + */ + private readonly taskFunctions: Map> + /** * Whether the pool is started or not. */ @@ -144,6 +152,8 @@ export abstract class AbstractPool< this.setupHook() + this.taskFunctions = new Map>() + this.started = false this.starting = false if (this.opts.startWorkers === true) { @@ -593,7 +603,7 @@ export abstract class AbstractPool< * @param workerId - The worker id. * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ - private getWorkerNodeKeyByWorkerId (workerId: number): number { + private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number { return this.workerNodes.findIndex( workerNode => workerNode.info.id === workerId ) @@ -754,19 +764,142 @@ export abstract class AbstractPool< ) } + private async sendTaskFunctionOperationToWorker ( + workerNodeKey: number, + message: MessageValue + ): Promise { + const workerId = this.getWorkerInfo(workerNodeKey).id as number + return await new Promise((resolve, reject) => { + this.registerWorkerMessageListener(workerNodeKey, message => { + if ( + message.workerId === workerId && + message.taskFunctionOperationStatus === true + ) { + resolve(true) + } else if ( + message.workerId === workerId && + message.taskFunctionOperationStatus === false + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId}` + ) + ) + } + }) + this.sendToWorker(workerNodeKey, message) + }) + } + + private async sendTaskFunctionOperationToWorkers ( + message: Omit, 'workerId'> + ): Promise { + return await new Promise((resolve, reject) => { + const responsesReceived = new Array>() + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.registerWorkerMessageListener(workerNodeKey, message => { + if (message.taskFunctionOperationStatus != null) { + responsesReceived.push(message) + if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.every( + message => message.taskFunctionOperationStatus === true + ) + ) { + resolve(true) + } else if ( + responsesReceived.length === this.workerNodes.length && + responsesReceived.some( + message => message.taskFunctionOperationStatus === false + ) + ) { + reject( + new Error( + `Task function operation ${ + message.taskFunctionOperation as string + } failed on worker ${message.workerId as number}` + ) + ) + } + } + }) + this.sendToWorker(workerNodeKey, message) + } + }) + } + /** @inheritDoc */ - public listTaskFunctions (): string[] { + public hasTaskFunction (name: string): boolean { for (const workerNode of this.workerNodes) { if ( - Array.isArray(workerNode.info.taskFunctions) && - workerNode.info.taskFunctions.length > 0 + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.includes(name) ) { - return workerNode.info.taskFunctions + return true + } + } + return false + } + + /** @inheritDoc */ + public async addTaskFunction ( + name: string, + fn: TaskFunction + ): Promise { + if (typeof name !== 'string') { + throw new TypeError('name argument must be a string') + } + if (typeof name === 'string' && name.trim().length === 0) { + throw new TypeError('name argument must not be an empty string') + } + if (typeof fn !== 'function') { + throw new TypeError('fn argument must be a function') + } + this.taskFunctions.set(name, fn) + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'add', + taskFunctionName: name, + taskFunction: fn.toString() + }) + } + + /** @inheritDoc */ + public async removeTaskFunction (name: string): Promise { + if (!this.taskFunctions.has(name)) { + throw new Error( + 'Cannot remove a task function not handled on the pool side' + ) + } + this.taskFunctions.delete(name) + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'remove', + taskFunctionName: name + }) + } + + /** @inheritDoc */ + public listTaskFunctionNames (): string[] { + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctionNames) && + workerNode.info.taskFunctionNames.length > 0 + ) { + return workerNode.info.taskFunctionNames } } return [] } + /** @inheritDoc */ + public async setDefaultTaskFunction (name: string): Promise { + return await this.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'default', + taskFunctionName: name + }) + } + private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && @@ -810,7 +943,6 @@ export abstract class AbstractPool< data: data ?? ({} as Data), transferList, timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -858,18 +990,23 @@ export abstract class AbstractPool< } protected async sendKillMessageToWorker ( - workerNodeKey: number, - workerId: number + workerNodeKey: number ): Promise { await new Promise((resolve, reject) => { this.registerWorkerMessageListener(workerNodeKey, message => { if (message.kill === 'success') { resolve() } else if (message.kill === 'failure') { - reject(new Error(`Worker ${workerId} kill message handling failed`)) + reject( + new Error( + `Worker ${ + message.workerId as number + } kill message handling failed` + ) + ) } }) - this.sendToWorker(workerNodeKey, { kill: true, workerId }) + this.sendToWorker(workerNodeKey, { kill: true }) }) } @@ -969,8 +1106,8 @@ export abstract class AbstractPool< const workerInfo = this.getWorkerInfo(workerNodeKey) return ( workerInfo != null && - Array.isArray(workerInfo.taskFunctions) && - workerInfo.taskFunctions.length > 2 + Array.isArray(workerInfo.taskFunctionNames) && + workerInfo.taskFunctionNames.length > 2 ) } @@ -985,7 +1122,7 @@ export abstract class AbstractPool< ) { --workerTaskStatistics.executing } - if (message.taskError == null) { + if (message.workerError == null) { ++workerTaskStatistics.executed } else { ++workerTaskStatistics.failed @@ -996,7 +1133,7 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { - if (message.taskError != null) { + if (message.workerError != null) { return } updateMeasurementStatistics( @@ -1023,7 +1160,7 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { - if (message.taskError != null) { + if (message.workerError != null) { return } const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = @@ -1173,9 +1310,19 @@ export abstract class AbstractPool< }) const workerInfo = this.getWorkerInfo(workerNodeKey) this.sendToWorker(workerNodeKey, { - checkActive: true, - workerId: workerInfo.id as number + checkActive: true }) + if (this.taskFunctions.size > 0) { + for (const [taskFunctionName, taskFunction] of this.taskFunctions) { + this.sendTaskFunctionOperationToWorker(workerNodeKey, { + taskFunctionOperation: 'add', + taskFunctionName, + taskFunction: taskFunction.toString() + }).catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) + } + } workerInfo.dynamic = true if ( this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || @@ -1245,8 +1392,7 @@ export abstract class AbstractPool< .runTime.aggregate, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate - }, - workerId: this.getWorkerInfo(workerNodeKey).id as number + } }) } @@ -1262,11 +1408,7 @@ export abstract class AbstractPool< }, 0 ) - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] - const task = { - ...(this.dequeueTask(workerNodeKey) as Task), - workerId: destinationWorkerNode.info.id as number - } + const task = this.dequeueTask(workerNodeKey) as Task if (this.shallExecuteTask(destinationWorkerNodeKey)) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1296,7 +1438,6 @@ export abstract class AbstractPool< private taskStealingOnEmptyQueue (workerId: number): void { const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] const workerNodes = this.workerNodes .slice() .sort( @@ -1310,10 +1451,7 @@ export abstract class AbstractPool< workerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: destinationWorkerNode.info.id as number - } + const task = sourceWorkerNode.popTask() as Task if (this.shallExecuteTask(destinationWorkerNodeKey)) { this.executeTask(destinationWorkerNodeKey, task) } else { @@ -1347,10 +1485,7 @@ export abstract class AbstractPool< workerNode.usage.tasks.queued < (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { - const task = { - ...(sourceWorkerNode.popTask() as Task), - workerId: workerNode.info.id as number - } + const task = sourceWorkerNode.popTask() as Task if (this.shallExecuteTask(workerNodeKey)) { this.executeTask(workerNodeKey, task) } else { @@ -1372,42 +1507,44 @@ export abstract class AbstractPool< protected workerListener (): (message: MessageValue) => void { return message => { this.checkMessageWorkerId(message) - if (message.ready != null && message.taskFunctions != null) { + if (message.ready != null && message.taskFunctionNames != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) - } else if (message.taskFunctions != null) { - // Task functions message received from worker + } else if (message.taskFunctionNames != null) { + // Task function names message received from worker this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ).taskFunctions = message.taskFunctions + ).taskFunctionNames = message.taskFunctionNames } } } private handleWorkerReadyResponse (message: MessageValue): void { if (message.ready === false) { - throw new Error(`Worker ${message.workerId} failed to initialize`) + throw new Error( + `Worker ${message.workerId as number} failed to initialize` + ) } const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) ) workerInfo.ready = message.ready as boolean - workerInfo.taskFunctions = message.taskFunctions + workerInfo.taskFunctionNames = message.taskFunctionNames if (this.ready) { this.emitter?.emit(PoolEvents.ready, this.info) } } private handleTaskExecutionResponse (message: MessageValue): void { - const { taskId, taskError, data } = message + const { taskId, workerError, data } = message const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - if (taskError != null) { - this.emitter?.emit(PoolEvents.taskError, taskError) - promiseResponse.reject(taskError.message) + if (workerError != null) { + this.emitter?.emit(PoolEvents.taskError, workerError) + promiseResponse.reject(workerError.message) } else { promiseResponse.resolve(data as Response) } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 17ef7e9e..9470cccd 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -73,10 +73,7 @@ export class FixedClusterPool< worker.on('disconnect', () => { worker.kill() }) - await this.sendKillMessageToWorker( - workerNodeKey, - workerNode.info.id as number - ) + await this.sendKillMessageToWorker(workerNodeKey) worker.disconnect() await waitWorkerExit } @@ -86,14 +83,16 @@ export class FixedClusterPool< workerNodeKey: number, message: MessageValue ): void { - this.workerNodes[workerNodeKey].worker.send(message) + this.workerNodes[workerNodeKey].worker.send({ + ...message, + workerId: this.workerNodes[workerNodeKey].info.id as number + }) } /** @inheritDoc */ protected sendStartupMessageToWorker (workerNodeKey: number): void { this.sendToWorker(workerNodeKey, { - ready: false, - workerId: this.workerNodes[workerNodeKey].info.id as number + ready: false }) } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 711a71be..7efb3808 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -1,5 +1,6 @@ import { EventEmitter } from 'node:events' import { type TransferListItem } from 'node:worker_threads' +import type { TaskFunction } from '../worker/task-functions' import type { ErrorHandler, ExitHandler, @@ -260,12 +261,45 @@ export interface IPool< * Terminates all workers in this pool. */ readonly destroy: () => Promise + /** + * Whether the specified task function exists in this pool. + * + * @param name - The name of the task function. + * @returns `true` if the task function exists, `false` otherwise. + */ + readonly hasTaskFunction: (name: string) => boolean + /** + * Adds a task function to this pool. + * If a task function with the same name already exists, it will be overwritten. + * + * @param name - The name of the task function. + * @param fn - The task function. + * @returns `true` if the task function was added, `false` otherwise. + */ + readonly addTaskFunction: ( + name: string, + fn: TaskFunction + ) => Promise + /** + * Removes a task function from this pool. + * + * @param name - The name of the task function. + * @returns `true` if the task function was removed, `false` otherwise. + */ + readonly removeTaskFunction: (name: string) => Promise /** * Lists the names of task function available in this pool. * * @returns The names of task function available in this pool. */ - readonly listTaskFunctions: () => string[] + readonly listTaskFunctionNames: () => string[] + /** + * Sets the default task function in this pool. + * + * @param name - The name of the task function. + * @returns `true` if the default task function was set, `false` otherwise. + */ + readonly setDefaultTaskFunction: (name: string) => Promise /** * Sets the worker choice strategy in this pool. * diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 6e234e2e..2ad94a7b 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -67,10 +67,7 @@ export class FixedThreadPool< resolve() }) }) - await this.sendKillMessageToWorker( - workerNodeKey, - workerNode.info.id as number - ) + await this.sendKillMessageToWorker(workerNodeKey) workerNode.closeChannel() await worker.terminate() await waitWorkerExit @@ -84,16 +81,18 @@ export class FixedThreadPool< ): void { ( this.workerNodes[workerNodeKey].messageChannel as MessageChannel - ).port1.postMessage(message, transferList) + ).port1.postMessage( + { ...message, workerId: this.workerNodes[workerNodeKey].info.id }, + transferList + ) } /** @inheritDoc */ protected sendStartupMessageToWorker (workerNodeKey: number): void { const workerNode = this.workerNodes[workerNodeKey] - const worker = workerNode.worker const port2: MessagePort = (workerNode.messageChannel as MessageChannel) .port2 - worker.postMessage( + workerNode.worker.postMessage( { ready: false, workerId: workerNode.info.id, diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 68c8cab2..ed667702 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -161,21 +161,21 @@ implements IWorkerNode { /** @inheritdoc */ public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined { - if (!Array.isArray(this.info.taskFunctions)) { + if (!Array.isArray(this.info.taskFunctionNames)) { throw new Error( `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined` ) } if ( - Array.isArray(this.info.taskFunctions) && - this.info.taskFunctions.length < 3 + Array.isArray(this.info.taskFunctionNames) && + this.info.taskFunctionNames.length < 3 ) { throw new Error( `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements` ) } if (name === DEFAULT_TASK_NAME) { - name = this.info.taskFunctions[1] + name = this.info.taskFunctionNames[1] } if (!this.taskFunctionsUsage.has(name)) { this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name)) @@ -249,7 +249,7 @@ implements IWorkerNode { for (const task of this.tasksQueue) { if ( (task.name === DEFAULT_TASK_NAME && - name === (this.info.taskFunctions as string[])[1]) || + name === (this.info.taskFunctionNames as string[])[1]) || (task.name !== DEFAULT_TASK_NAME && name === task.name) ) { ++taskFunctionQueueSize diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 29050455..37d63085 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -144,7 +144,7 @@ export interface WorkerInfo { /** * Task function names. */ - taskFunctions?: string[] + taskFunctionNames?: string[] } /** diff --git a/src/utility-types.ts b/src/utility-types.ts index e1fb311e..48988c08 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -3,13 +3,13 @@ import type { MessagePort, TransferListItem } from 'node:worker_threads' import type { KillBehavior } from './worker/worker-options' /** - * Task error. + * Worker error. * * @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data. */ -export interface TaskError { +export interface WorkerError { /** - * Task name triggering the error. + * Task function name triggering the error. */ readonly name: string /** @@ -72,7 +72,7 @@ export interface Task { /** * Worker id. */ - readonly workerId: number + readonly workerId?: number /** * Task name. */ @@ -109,17 +109,36 @@ export interface MessageValue */ readonly kill?: KillBehavior | true | 'success' | 'failure' /** - * Task error. + * Worker error. */ - readonly taskError?: TaskError + readonly workerError?: WorkerError /** * Task performance. */ readonly taskPerformance?: TaskPerformance + /** + * Task function operation: + * - `'add'` - Add a task function. + * - `'remove'` - Remove a task function. + * - `'default'` - Set a task function as default. + */ + readonly taskFunctionOperation?: 'add' | 'remove' | 'default' + /** + * Whether the task function operation is successful or not. + */ + readonly taskFunctionOperationStatus?: boolean + /** + * Task function serialized to string. + */ + readonly taskFunction?: string + /** + * Task function name. + */ + readonly taskFunctionName?: string /** * Task function names. */ - readonly taskFunctions?: string[] + readonly taskFunctionNames?: string[] /** * Whether the worker computes the given statistics or not. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index f0ef5932..8bbd07a7 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -18,6 +18,7 @@ import { KillBehaviors, type WorkerOptions } from './worker-options' import type { TaskAsyncFunction, TaskFunction, + TaskFunctionOperationReturnType, TaskFunctions, TaskSyncFunction } from './task-functions' @@ -199,11 +200,14 @@ export abstract class AbstractWorker< * * @param name - The name of the task function to check. * @returns Whether the worker has a task function with the given name or not. - * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string. */ - public hasTaskFunction (name: string): boolean { - this.checkTaskFunctionName(name) - return this.taskFunctions.has(name) + public hasTaskFunction (name: string): TaskFunctionOperationReturnType { + try { + this.checkTaskFunctionName(name) + } catch (error) { + return { status: false, error: error as Error } + } + return { status: this.taskFunctions.has(name) } } /** @@ -213,24 +217,21 @@ export abstract class AbstractWorker< * @param name - The name of the task function to add. * @param fn - The task function to add. * @returns Whether the task function was added or not. - * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name. - * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `fn` parameter is not a function. */ public addTaskFunction ( name: string, fn: TaskFunction - ): boolean { - this.checkTaskFunctionName(name) - if (name === DEFAULT_TASK_NAME) { - throw new Error( - 'Cannot add a task function with the default reserved name' - ) - } - if (typeof fn !== 'function') { - throw new TypeError('fn parameter is not a function') - } + ): TaskFunctionOperationReturnType { try { + this.checkTaskFunctionName(name) + if (name === DEFAULT_TASK_NAME) { + throw new Error( + 'Cannot add a task function with the default reserved name' + ) + } + if (typeof fn !== 'function') { + throw new TypeError('fn parameter is not a function') + } const boundFn = fn.bind(this) if ( this.taskFunctions.get(name) === @@ -239,10 +240,10 @@ export abstract class AbstractWorker< this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn) } this.taskFunctions.set(name, boundFn) - this.sendTaskFunctionsListToMainWorker() - return true - } catch { - return false + this.sendTaskFunctionNamesToMainWorker() + return { status: true } + } catch (error) { + return { status: false, error: error as Error } } } @@ -251,27 +252,29 @@ export abstract class AbstractWorker< * * @param name - The name of the task function to remove. * @returns Whether the task function existed and was removed or not. - * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the task function used as default task function. */ - public removeTaskFunction (name: string): boolean { - this.checkTaskFunctionName(name) - if (name === DEFAULT_TASK_NAME) { - throw new Error( - 'Cannot remove the task function with the default reserved name' - ) - } - if ( - this.taskFunctions.get(name) === this.taskFunctions.get(DEFAULT_TASK_NAME) - ) { - throw new Error( - 'Cannot remove the task function used as the default task function' - ) + public removeTaskFunction (name: string): TaskFunctionOperationReturnType { + try { + this.checkTaskFunctionName(name) + if (name === DEFAULT_TASK_NAME) { + throw new Error( + 'Cannot remove the task function with the default reserved name' + ) + } + if ( + this.taskFunctions.get(name) === + this.taskFunctions.get(DEFAULT_TASK_NAME) + ) { + throw new Error( + 'Cannot remove the task function used as the default task function' + ) + } + const deleteStatus = this.taskFunctions.delete(name) + this.sendTaskFunctionNamesToMainWorker() + return { status: deleteStatus } + } catch (error) { + return { status: false, error: error as Error } } - const deleteStatus = this.taskFunctions.delete(name) - this.sendTaskFunctionsListToMainWorker() - return deleteStatus } /** @@ -279,7 +282,7 @@ export abstract class AbstractWorker< * * @returns The names of the worker's task functions. */ - public listTaskFunctions (): string[] { + public listTaskFunctionNames (): string[] { const names: string[] = [...this.taskFunctions.keys()] let defaultTaskFunctionName: string = DEFAULT_TASK_NAME for (const [name, fn] of this.taskFunctions) { @@ -305,30 +308,28 @@ export abstract class AbstractWorker< * * @param name - The name of the task function to use as default task function. * @returns Whether the default task function was set or not. - * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is a non-existing task function. */ - public setDefaultTaskFunction (name: string): boolean { - this.checkTaskFunctionName(name) - if (name === DEFAULT_TASK_NAME) { - throw new Error( - 'Cannot set the default task function reserved name as the default task function' - ) - } - if (!this.taskFunctions.has(name)) { - throw new Error( - 'Cannot set the default task function to a non-existing task function' - ) - } + public setDefaultTaskFunction (name: string): TaskFunctionOperationReturnType { try { + this.checkTaskFunctionName(name) + if (name === DEFAULT_TASK_NAME) { + throw new Error( + 'Cannot set the default task function reserved name as the default task function' + ) + } + if (!this.taskFunctions.has(name)) { + throw new Error( + 'Cannot set the default task function to a non-existing task function' + ) + } this.taskFunctions.set( DEFAULT_TASK_NAME, this.taskFunctions.get(name) as TaskFunction ) - return true - } catch { - return false + this.sendTaskFunctionNamesToMainWorker() + return { status: true } + } catch (error) { + return { status: false, error: error as Error } } } @@ -361,6 +362,9 @@ export abstract class AbstractWorker< } else if (message.checkActive != null) { // Check active message received message.checkActive ? this.startCheckActive() : this.stopCheckActive() + } else if (message.taskFunctionOperation != null) { + // Task function operation message received + this.handleTaskFunctionOperationMessage(message) } else if (message.taskId != null && message.data != null) { // Task message received this.run(message) @@ -370,6 +374,35 @@ export abstract class AbstractWorker< } } + protected handleTaskFunctionOperationMessage ( + message: MessageValue + ): void { + const { taskFunctionOperation, taskFunctionName, taskFunction } = message + let response!: TaskFunctionOperationReturnType + if (taskFunctionOperation === 'add') { + response = this.addTaskFunction( + taskFunctionName as string, + // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func + new Function(`return ${taskFunction as string}`)() as TaskFunction< + Data, + Response + > + ) + } else if (taskFunctionOperation === 'remove') { + response = this.removeTaskFunction(taskFunctionName as string) + } else if (taskFunctionOperation === 'default') { + response = this.setDefaultTaskFunction(taskFunctionName as string) + } + this.sendToMainWorker({ + taskFunctionOperation, + taskFunctionOperationStatus: response.status, + workerError: { + name: taskFunctionName as string, + message: this.handleError(response.error as Error | string) + } + }) + } + /** * Handles a kill message sent by the main worker. * @@ -380,11 +413,11 @@ export abstract class AbstractWorker< if (isAsyncFunction(this.opts.killHandler)) { (this.opts.killHandler?.() as Promise) .then(() => { - this.sendToMainWorker({ kill: 'success', workerId: this.id }) + this.sendToMainWorker({ kill: 'success' }) return null }) .catch(() => { - this.sendToMainWorker({ kill: 'failure', workerId: this.id }) + this.sendToMainWorker({ kill: 'failure' }) }) .finally(() => { this.emitDestroy() @@ -394,9 +427,9 @@ export abstract class AbstractWorker< try { // eslint-disable-next-line @typescript-eslint/no-invalid-void-type this.opts.killHandler?.() as void - this.sendToMainWorker({ kill: 'success', workerId: this.id }) + this.sendToMainWorker({ kill: 'success' }) } catch { - this.sendToMainWorker({ kill: 'failure', workerId: this.id }) + this.sendToMainWorker({ kill: 'failure' }) } finally { this.emitDestroy() } @@ -448,7 +481,7 @@ export abstract class AbstractWorker< performance.now() - this.lastTaskTimestamp > (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) ) { - this.sendToMainWorker({ kill: this.opts.killBehavior, workerId: this.id }) + this.sendToMainWorker({ kill: this.opts.killBehavior }) } } @@ -475,12 +508,11 @@ export abstract class AbstractWorker< ): void /** - * Sends the list of task function names to the main worker. + * Sends task function names to the main worker. */ - protected sendTaskFunctionsListToMainWorker (): void { + protected sendTaskFunctionNamesToMainWorker (): void { this.sendToMainWorker({ - taskFunctions: this.listTaskFunctions(), - workerId: this.id + taskFunctionNames: this.listTaskFunctionNames() }) } @@ -505,12 +537,11 @@ export abstract class AbstractWorker< const fn = this.taskFunctions.get(name ?? DEFAULT_TASK_NAME) if (fn == null) { this.sendToMainWorker({ - taskError: { + workerError: { name: name as string, message: `Task function '${name as string}' not found`, data }, - workerId: this.id, taskId }) return @@ -540,17 +571,15 @@ export abstract class AbstractWorker< this.sendToMainWorker({ data: res, taskPerformance, - workerId: this.id, taskId }) } catch (error) { this.sendToMainWorker({ - taskError: { + workerError: { name: name as string, message: this.handleError(error as Error | string), data }, - workerId: this.id, taskId }) } finally { @@ -576,19 +605,17 @@ export abstract class AbstractWorker< this.sendToMainWorker({ data: res, taskPerformance, - workerId: this.id, taskId }) return null }) .catch(error => { this.sendToMainWorker({ - taskError: { + workerError: { name: name as string, message: this.handleError(error as Error | string), data }, - workerId: this.id, taskId }) }) diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 26964aa4..201a516c 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -48,14 +48,12 @@ export class ClusterWorker< this.getMainWorker().on('message', this.messageListener.bind(this)) this.sendToMainWorker({ ready: true, - taskFunctions: this.listTaskFunctions(), - workerId: this.id + taskFunctionNames: this.listTaskFunctionNames() }) } catch { this.sendToMainWorker({ ready: false, - taskFunctions: this.listTaskFunctions(), - workerId: this.id + taskFunctionNames: this.listTaskFunctionNames() }) } } @@ -68,6 +66,6 @@ export class ClusterWorker< /** @inheritDoc */ protected sendToMainWorker (message: MessageValue): void { - this.getMainWorker().send(message) + this.getMainWorker().send({ ...message, workerId: this.id }) } } diff --git a/src/worker/task-functions.ts b/src/worker/task-functions.ts index a61d6962..353b8b0c 100644 --- a/src/worker/task-functions.ts +++ b/src/worker/task-functions.ts @@ -43,3 +43,11 @@ export type TaskFunctions = Record< string, TaskFunction > + +/** + * Task function operation return type. + */ +export interface TaskFunctionOperationReturnType { + status: boolean + error?: Error +} diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index d6a36992..7b92caf6 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -62,14 +62,12 @@ export class ThreadWorker< this.port.on('message', this.messageListener.bind(this)) this.sendToMainWorker({ ready: true, - taskFunctions: this.listTaskFunctions(), - workerId: this.id + taskFunctionNames: this.listTaskFunctionNames() }) } catch { this.sendToMainWorker({ ready: false, - taskFunctions: this.listTaskFunctions(), - workerId: this.id + taskFunctionNames: this.listTaskFunctionNames() }) } } @@ -89,7 +87,7 @@ export class ThreadWorker< /** @inheritDoc */ protected sendToMainWorker (message: MessageValue): void { - this.port.postMessage(message) + this.port.postMessage({ ...message, workerId: this.id }) } /** @inheritDoc */ diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index e9c65dde..99ebbfa1 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -1243,34 +1243,184 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it('Verify that listTaskFunctions() is working', async () => { + it('Verify that hasTaskFunction() is working', async () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) - expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([ + expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true) + expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe( + true + ) + expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true) + expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true) + expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false) + await dynamicThreadPool.destroy() + const fixedClusterPool = new FixedClusterPool( + numberOfWorkers, + './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js' + ) + await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1) + expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true) + expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe( + true + ) + expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true) + expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true) + expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false) + await fixedClusterPool.destroy() + }) + + it('Verify that addTaskFunction() is working', async () => { + const dynamicThreadPool = new DynamicThreadPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/thread/testWorker.js' + ) + await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) + await expect( + dynamicThreadPool.addTaskFunction(0, () => {}) + ).rejects.toThrowError(new TypeError('name argument must be a string')) + await expect( + dynamicThreadPool.addTaskFunction('', () => {}) + ).rejects.toThrowError( + new TypeError('name argument must not be an empty string') + ) + await expect( + dynamicThreadPool.addTaskFunction('test', 0) + ).rejects.toThrowError(new TypeError('fn argument must be a function')) + await expect( + dynamicThreadPool.addTaskFunction('test', '') + ).rejects.toThrowError(new TypeError('fn argument must be a function')) + expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ + DEFAULT_TASK_NAME, + 'test' + ]) + const echoTaskFunction = data => { + return data + } + await expect( + dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + ).resolves.toBe(true) + expect(dynamicThreadPool.taskFunctions.size).toBe(1) + expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual( + echoTaskFunction + ) + expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ + DEFAULT_TASK_NAME, + 'test', + 'echo' + ]) + const taskFunctionData = { test: 'test' } + const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo') + expect(echoResult).toStrictEqual(taskFunctionData) + await dynamicThreadPool.destroy() + }) + + it('Verify that removeTaskFunction() is working', async () => { + const dynamicThreadPool = new DynamicThreadPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/thread/testWorker.js' + ) + await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) + expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ + DEFAULT_TASK_NAME, + 'test' + ]) + await expect( + dynamicThreadPool.removeTaskFunction('test') + ).rejects.toThrowError( + new Error('Cannot remove a task function not handled on the pool side') + ) + const echoTaskFunction = data => { + return data + } + await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + expect(dynamicThreadPool.taskFunctions.size).toBe(1) + expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual( + echoTaskFunction + ) + expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ + DEFAULT_TASK_NAME, + 'test', + 'echo' + ]) + await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe( + true + ) + expect(dynamicThreadPool.taskFunctions.size).toBe(0) + expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined() + expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ + DEFAULT_TASK_NAME, + 'test' + ]) + await dynamicThreadPool.destroy() + }) + + it('Verify that listTaskFunctionNames() is working', async () => { + const dynamicThreadPool = new DynamicThreadPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js' + ) + await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) + expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ DEFAULT_TASK_NAME, 'jsonIntegerSerialization', 'factorial', 'fibonacci' ]) + await dynamicThreadPool.destroy() const fixedClusterPool = new FixedClusterPool( numberOfWorkers, './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js' ) await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1) - expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([ + expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([ DEFAULT_TASK_NAME, 'jsonIntegerSerialization', 'factorial', 'fibonacci' ]) - await dynamicThreadPool.destroy() await fixedClusterPool.destroy() }) + it('Verify that setDefaultTaskFunction() is working', async () => { + const dynamicThreadPool = new DynamicThreadPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js' + ) + await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) + expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ + DEFAULT_TASK_NAME, + 'jsonIntegerSerialization', + 'factorial', + 'fibonacci' + ]) + await expect( + dynamicThreadPool.setDefaultTaskFunction('factorial') + ).resolves.toBe(true) + expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ + DEFAULT_TASK_NAME, + 'factorial', + 'jsonIntegerSerialization', + 'fibonacci' + ]) + await expect( + dynamicThreadPool.setDefaultTaskFunction('fibonacci') + ).resolves.toBe(true) + expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ + DEFAULT_TASK_NAME, + 'fibonacci', + 'jsonIntegerSerialization', + 'factorial' + ]) + }) + it('Verify that multiple task functions worker is working', async () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), @@ -1289,14 +1439,14 @@ describe('Abstract pool test suite', () => { expect(pool.info.executingTasks).toBe(0) expect(pool.info.executedTasks).toBe(4) for (const workerNode of pool.workerNodes) { - expect(workerNode.info.taskFunctions).toStrictEqual([ + expect(workerNode.info.taskFunctionNames).toStrictEqual([ DEFAULT_TASK_NAME, 'jsonIntegerSerialization', 'factorial', 'fibonacci' ]) expect(workerNode.taskFunctionsUsage.size).toBe(3) - for (const name of pool.listTaskFunctions()) { + for (const name of pool.listTaskFunctionNames()) { expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({ tasks: { executed: expect.any(Number), @@ -1327,7 +1477,9 @@ describe('Abstract pool test suite', () => { expect( workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) ).toStrictEqual( - workerNode.getTaskFunctionWorkerUsage(workerNode.info.taskFunctions[1]) + workerNode.getTaskFunctionWorkerUsage( + workerNode.info.taskFunctionNames[1] + ) ) } await pool.destroy() diff --git a/tests/pools/abstract/worker-node.test.js b/tests/pools/abstract/worker-node.test.js index 45b97c1c..f6305fe7 100644 --- a/tests/pools/abstract/worker-node.test.js +++ b/tests/pools/abstract/worker-node.test.js @@ -139,7 +139,7 @@ describe('Worker node test suite', () => { "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined" ) ) - threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1'] + threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1'] expect(() => threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction') ).toThrowError( @@ -147,7 +147,7 @@ describe('Worker node test suite', () => { "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements" ) ) - threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1', 'fn2'] + threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2'] expect( threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) ).toStrictEqual({ diff --git a/tests/worker/abstract-worker.test.js b/tests/worker/abstract-worker.test.js index 5860ebff..200b05ac 100644 --- a/tests/worker/abstract-worker.test.js +++ b/tests/worker/abstract-worker.test.js @@ -200,7 +200,7 @@ describe('Abstract worker test suite', () => { expect(killHandlerStub.calledOnce).toBe(true) }) - it('Verify that handleError() method works properly', () => { + it('Verify that handleError() method is working properly', () => { const error = new Error('Error as an error') const worker = new ClusterWorker(() => {}) expect(worker.handleError(error)).not.toBeInstanceOf(Error) @@ -215,7 +215,7 @@ describe('Abstract worker test suite', () => { ).toThrowError('Main worker not set') }) - it('Verify that hasTaskFunction() works', () => { + it('Verify that hasTaskFunction() is working', () => { const fn1 = () => { return 1 } @@ -223,19 +223,23 @@ describe('Abstract worker test suite', () => { return 2 } const worker = new ClusterWorker({ fn1, fn2 }) - expect(() => worker.hasTaskFunction(0)).toThrowError( - new TypeError('name parameter is not a string') - ) - expect(() => worker.hasTaskFunction('')).toThrowError( - new TypeError('name parameter is an empty string') - ) - expect(worker.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true) - expect(worker.hasTaskFunction('fn1')).toBe(true) - expect(worker.hasTaskFunction('fn2')).toBe(true) - expect(worker.hasTaskFunction('fn3')).toBe(false) + expect(worker.hasTaskFunction(0)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is not a string') + }) + expect(worker.hasTaskFunction('')).toStrictEqual({ + status: false, + error: new TypeError('name parameter is an empty string') + }) + expect(worker.hasTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({ + status: true + }) + expect(worker.hasTaskFunction('fn1')).toStrictEqual({ status: true }) + expect(worker.hasTaskFunction('fn2')).toStrictEqual({ status: true }) + expect(worker.hasTaskFunction('fn3')).toStrictEqual({ status: false }) }) - it('Verify that addTaskFunction() works', () => { + it('Verify that addTaskFunction() is working', () => { const fn1 = () => { return 1 } @@ -246,24 +250,30 @@ describe('Abstract worker test suite', () => { return 3 } const worker = new ThreadWorker(fn1) - expect(() => worker.addTaskFunction(0, fn1)).toThrowError( - new TypeError('name parameter is not a string') - ) - expect(() => worker.addTaskFunction('', fn1)).toThrowError( - new TypeError('name parameter is an empty string') - ) - expect(() => worker.addTaskFunction('fn3', '')).toThrowError( - new TypeError('fn parameter is not a function') - ) + expect(worker.addTaskFunction(0, fn1)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is not a string') + }) + expect(worker.addTaskFunction('', fn1)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is an empty string') + }) + expect(worker.addTaskFunction('fn3', '')).toStrictEqual({ + status: false, + error: new TypeError('fn parameter is not a function') + }) expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) expect(worker.taskFunctions.size).toBe(2) expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual( worker.taskFunctions.get('fn1') ) - expect(() => worker.addTaskFunction(DEFAULT_TASK_NAME, fn2)).toThrowError( - new Error('Cannot add a task function with the default reserved name') - ) + expect(worker.addTaskFunction(DEFAULT_TASK_NAME, fn2)).toStrictEqual({ + status: false, + error: new Error( + 'Cannot add a task function with the default reserved name' + ) + }) worker.addTaskFunction('fn2', fn2) expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) @@ -282,7 +292,7 @@ describe('Abstract worker test suite', () => { ) }) - it('Verify that removeTaskFunction() works', () => { + it('Verify that removeTaskFunction() is working', () => { const fn1 = () => { return 1 } @@ -290,12 +300,14 @@ describe('Abstract worker test suite', () => { return 2 } const worker = new ClusterWorker({ fn1, fn2 }) - expect(() => worker.removeTaskFunction(0, fn1)).toThrowError( - new TypeError('name parameter is not a string') - ) - expect(() => worker.removeTaskFunction('', fn1)).toThrowError( - new TypeError('name parameter is an empty string') - ) + expect(worker.removeTaskFunction(0, fn1)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is not a string') + }) + expect(worker.removeTaskFunction('', fn1)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is an empty string') + }) worker.getMainWorker = sinon.stub().returns({ id: 1, send: sinon.stub().returns() @@ -307,16 +319,18 @@ describe('Abstract worker test suite', () => { expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual( worker.taskFunctions.get('fn1') ) - expect(() => worker.removeTaskFunction(DEFAULT_TASK_NAME)).toThrowError( - new Error( + expect(worker.removeTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({ + status: false, + error: new Error( 'Cannot remove the task function with the default reserved name' ) - ) - expect(() => worker.removeTaskFunction('fn1')).toThrowError( - new Error( + }) + expect(worker.removeTaskFunction('fn1')).toStrictEqual({ + status: false, + error: new Error( 'Cannot remove the task function used as the default task function' ) - ) + }) worker.removeTaskFunction('fn2') expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) @@ -325,7 +339,7 @@ describe('Abstract worker test suite', () => { expect(worker.getMainWorker().send.calledOnce).toBe(true) }) - it('Verify that listTaskFunctions() works', () => { + it('Verify that listTaskFunctionNames() is working', () => { const fn1 = () => { return 1 } @@ -333,14 +347,14 @@ describe('Abstract worker test suite', () => { return 2 } const worker = new ClusterWorker({ fn1, fn2 }) - expect(worker.listTaskFunctions()).toStrictEqual([ + expect(worker.listTaskFunctionNames()).toStrictEqual([ DEFAULT_TASK_NAME, 'fn1', 'fn2' ]) }) - it('Verify that setDefaultTaskFunction() works', () => { + it('Verify that setDefaultTaskFunction() is working', () => { const fn1 = () => { return 1 } @@ -348,12 +362,14 @@ describe('Abstract worker test suite', () => { return 2 } const worker = new ThreadWorker({ fn1, fn2 }) - expect(() => worker.setDefaultTaskFunction(0, fn1)).toThrowError( - new TypeError('name parameter is not a string') - ) - expect(() => worker.setDefaultTaskFunction('', fn1)).toThrowError( - new TypeError('name parameter is an empty string') - ) + expect(worker.setDefaultTaskFunction(0, fn1)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is not a string') + }) + expect(worker.setDefaultTaskFunction('', fn1)).toStrictEqual({ + status: false, + error: new TypeError('name parameter is an empty string') + }) expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function) @@ -361,16 +377,18 @@ describe('Abstract worker test suite', () => { expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual( worker.taskFunctions.get('fn1') ) - expect(() => worker.setDefaultTaskFunction(DEFAULT_TASK_NAME)).toThrowError( - new Error( + expect(worker.setDefaultTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({ + status: false, + error: new Error( 'Cannot set the default task function reserved name as the default task function' ) - ) - expect(() => worker.setDefaultTaskFunction('fn3')).toThrowError( - new Error( + }) + expect(worker.setDefaultTaskFunction('fn3')).toStrictEqual({ + status: false, + error: new Error( 'Cannot set the default task function to a non-existing task function' ) - ) + }) worker.setDefaultTaskFunction('fn1') expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual( worker.taskFunctions.get('fn1')