Merge branch 'master' into feature/task-functions
authorJérôme Benoit <jerome.benoit@sap.com>
Tue, 19 Sep 2023 11:20:28 +0000 (13:20 +0200)
committerGitHub <noreply@github.com>
Tue, 19 Sep 2023 11:20:28 +0000 (13:20 +0200)
17 files changed:
CHANGELOG.md
docs/api.md
src/index.ts
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/pool.ts
src/pools/thread/fixed.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/task-functions.ts
src/worker/thread-worker.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/abstract/worker-node.test.js
tests/worker/abstract-worker.test.js

index ad5bfde0f264bd84b5719d8dba1b635e1ff033f0..b6ba6fba70f58830691c9da795caedaf51595354 100644 (file)
@@ -13,6 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ### Changed
 
+- Rename `listTaskFunctions()` to `listTaskFunctionNames()` in pool and worker API.
+
+### Added
+
+- Add `hasTaskFunction()`, `addTaskFunction()`, `removeTaskFunction()`, `setDefaultTaskFunction()` methods to pool API.
 - Stricter worker constructor arguments validation.
 
 ## [2.6.45] - 2023-09-17
index df7d1ce5c1fa63def41ccbedbd1522f19c350aca..8e3c77076f231bda67064b51eba22cd4150c3d77 100644 (file)
@@ -8,7 +8,11 @@
   - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
   - [`pool.start()`](#poolstart)
   - [`pool.destroy()`](#pooldestroy)
-  - [`pool.listTaskFunctions()`](#poollisttaskfunctions)
+  - [`pool.hasTaskFunction(name)`](#poolhastaskfunctionname)
+  - [`pool.addTaskFunction(name, fn)`](#pooladdtaskfunctionname-fn)
+  - [`pool.removeTaskFunction(name)`](#poolremovetaskfunctionname)
+  - [`pool.listTaskFunctionNames()`](#poollisttaskfunctionnames)
+  - [`pool.setDefaultTaskFunction(name)`](#poolsetdefaulttaskfunctionname)
   - [`PoolOptions`](#pooloptions)
     - [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions)
     - [`ClusterPoolOptions extends PoolOptions`](#clusterpooloptions-extends-pooloptions)
@@ -17,7 +21,7 @@
     - [`YourWorker.hasTaskFunction(name)`](#yourworkerhastaskfunctionname)
     - [`YourWorker.addTaskFunction(name, fn)`](#yourworkeraddtaskfunctionname-fn)
     - [`YourWorker.removeTaskFunction(name)`](#yourworkerremovetaskfunctionname)
-    - [`YourWorker.listTaskFunctions()`](#yourworkerlisttaskfunctions)
+    - [`YourWorker.listTaskFunctionNames()`](#yourworkerlisttaskfunctionnames)
     - [`YourWorker.setDefaultTaskFunction(name)`](#yourworkersetdefaulttaskfunctionname)
 
 ## Pool
@@ -39,7 +43,7 @@
 
 `data` (optional) An object that you want to pass to your worker implementation.  
 `name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'`  
-`transferList` (optional) An array of transferable objects that you want to transfer to your [worker_threads](https://nodejs.org/api/worker_threads.html) worker implementation
+`transferList` (optional) An array of transferable objects that you want to transfer to your [worker_threads](https://nodejs.org/api/worker_threads.html) worker implementation.
 
 This method is available on both pool implementations and returns a promise with the task function execution response.
 
@@ -51,10 +55,35 @@ This method is available on both pool implementations and will start the minimum
 
 This method is available on both pool implementations and will call the terminate method on each worker.
 
-### `pool.listTaskFunctions()`
+### `pool.hasTaskFunction(name)`
+
+`name` (mandatory) The task function name.
+
+This method is available on both pool implementations and returns a boolean.
+
+### `pool.addTaskFunction(name, fn)`
+
+`name` (mandatory) The task function name.
+`fn` (mandatory) The task function.
+
+This method is available on both pool implementations and returns a boolean promise.
+
+### `pool.removeTaskFunction(name)`
+
+`name` (mandatory) The task function name.
+
+This method is available on both pool implementations and returns a boolean promise.
+
+### `pool.listTaskFunctionNames()`
 
 This method is available on both pool implementations and returns an array of the task function names.
 
+### `pool.setDefaultTaskFunction(name)`
+
+`name` (mandatory) The task function name.
+
+This method is available on both pool implementations and returns a boolean promise.
+
 ### `PoolOptions`
 
 An object with these properties:
@@ -148,22 +177,22 @@ An object with these properties:
 
 `name` (mandatory) The task function name.
 
-This method is available on both worker implementations and returns a boolean.
+This method is available on both worker implementations and returns `{ status: boolean, error?: Error }`.
 
 #### `YourWorker.addTaskFunction(name, fn)`
 
 `name` (mandatory) The task function name.  
 `fn` (mandatory) The task function.
 
-This method is available on both worker implementations and returns a boolean.
+This method is available on both worker implementations and returns `{ status: boolean, error?: Error }`.
 
 #### `YourWorker.removeTaskFunction(name)`
 
 `name` (mandatory) The task function name.
 
-This method is available on both worker implementations and returns a boolean.
+This method is available on both worker implementations and returns `{ status: boolean, error?: Error }`.
 
-#### `YourWorker.listTaskFunctions()`
+#### `YourWorker.listTaskFunctionNames()`
 
 This method is available on both worker implementations and returns an array of the task function names.
 
@@ -171,4 +200,4 @@ This method is available on both worker implementations and returns an array of
 
 `name` (mandatory) The task function name.
 
-This method is available on both worker implementations and returns a boolean.
+This method is available on both worker implementations and returns `{ status: boolean, error?: Error }`.
index 08943274da27c8176bcf4b6cebe3d8d408cc2656..8faba2552bb4cb772724aa350016471a36d659d2 100644 (file)
@@ -60,6 +60,7 @@ export type {
 export type {
   TaskAsyncFunction,
   TaskFunction,
+  TaskFunctionOperationReturnType,
   TaskFunctions,
   TaskSyncFunction
 } from './worker/task-functions'
@@ -67,7 +68,7 @@ export type {
   MessageValue,
   PromiseResponseWrapper,
   Task,
-  TaskError,
+  WorkerError,
   TaskPerformance,
   WorkerStatistics,
   Writable
index cbc24e7d84c65c3313f0e27aea9f927ea7bfe543..33585218249e599cd1998fdc19158e7ab4b22168 100644 (file)
@@ -21,6 +21,7 @@ import {
   updateMeasurementStatistics
 } from '../utils'
 import { KillBehaviors } from '../worker/worker-options'
+import type { TaskFunction } from '../worker/task-functions'
 import {
   type IPool,
   PoolEmitter,
@@ -91,6 +92,13 @@ export abstract class AbstractPool<
    */
   protected readonly max?: number
 
+  /**
+   * The task functions added at runtime map:
+   * - `key`: The task function name.
+   * - `value`: The task function itself.
+   */
+  private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
+
   /**
    * Whether the pool is started or not.
    */
@@ -144,6 +152,8 @@ export abstract class AbstractPool<
 
     this.setupHook()
 
+    this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+
     this.started = false
     this.starting = false
     if (this.opts.startWorkers === true) {
@@ -593,7 +603,7 @@ export abstract class AbstractPool<
    * @param workerId - The worker id.
    * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
    */
-  private getWorkerNodeKeyByWorkerId (workerId: number): number {
+  private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
     return this.workerNodes.findIndex(
       workerNode => workerNode.info.id === workerId
     )
@@ -754,19 +764,151 @@ export abstract class AbstractPool<
     )
   }
 
+  private async sendTaskFunctionOperationToWorker (
+    workerNodeKey: number,
+    message: MessageValue<Data>
+  ): Promise<boolean> {
+    return await new Promise<boolean>((resolve, reject) => {
+      const workerId = this.getWorkerInfo(workerNodeKey).id as number
+      this.registerWorkerMessageListener(workerNodeKey, message => {
+        if (
+          message.workerId === workerId &&
+          message.taskFunctionOperationStatus === true
+        ) {
+          resolve(true)
+        } else if (
+          message.workerId === workerId &&
+          message.taskFunctionOperationStatus === false
+        ) {
+          reject(
+            new Error(
+              `Task function operation ${
+                message.taskFunctionOperation as string
+              } failed on worker ${message.workerId}`
+            )
+          )
+        }
+      })
+      this.sendToWorker(workerNodeKey, message)
+    })
+  }
+
+  private async sendTaskFunctionOperationToWorkers (
+    message: MessageValue<Data>
+  ): Promise<boolean> {
+    return await new Promise<boolean>((resolve, reject) => {
+      const responsesReceived = new Array<MessageValue<Data | Response>>()
+      for (const [workerNodeKey] of this.workerNodes.entries()) {
+        this.registerWorkerMessageListener(workerNodeKey, message => {
+          if (message.taskFunctionOperationStatus != null) {
+            responsesReceived.push(message)
+            if (
+              responsesReceived.length === this.workerNodes.length &&
+              responsesReceived.every(
+                message => message.taskFunctionOperationStatus === true
+              )
+            ) {
+              resolve(true)
+            } else if (
+              responsesReceived.length === this.workerNodes.length &&
+              responsesReceived.some(
+                message => message.taskFunctionOperationStatus === false
+              )
+            ) {
+              reject(
+                new Error(
+                  `Task function operation ${
+                    message.taskFunctionOperation as string
+                  } failed on worker ${message.workerId as number}`
+                )
+              )
+            }
+          }
+        })
+        this.sendToWorker(workerNodeKey, message)
+      }
+    })
+  }
+
   /** @inheritDoc */
-  public listTaskFunctions (): string[] {
+  public hasTaskFunction (name: string): boolean {
     for (const workerNode of this.workerNodes) {
       if (
-        Array.isArray(workerNode.info.taskFunctions) &&
-        workerNode.info.taskFunctions.length > 0
+        Array.isArray(workerNode.info.taskFunctionNames) &&
+        workerNode.info.taskFunctionNames.includes(name)
       ) {
-        return workerNode.info.taskFunctions
+        return true
+      }
+    }
+    return false
+  }
+
+  /** @inheritDoc */
+  public async addTaskFunction (
+    name: string,
+    fn: TaskFunction<Data, Response>
+  ): Promise<boolean> {
+    if (typeof name !== 'string') {
+      throw new TypeError('name argument must be a string')
+    }
+    if (typeof name === 'string' && name.trim().length === 0) {
+      throw new TypeError('name argument must not be an empty string')
+    }
+    if (typeof fn !== 'function') {
+      throw new TypeError('fn argument must be a function')
+    }
+    const opResult = await this.sendTaskFunctionOperationToWorkers({
+      taskFunctionOperation: 'add',
+      taskFunctionName: name,
+      taskFunction: fn.toString()
+    })
+    this.taskFunctions.set(name, fn)
+    return opResult
+  }
+
+  /** @inheritDoc */
+  public async removeTaskFunction (name: string): Promise<boolean> {
+    if (!this.taskFunctions.has(name)) {
+      throw new Error(
+        'Cannot remove a task function not handled on the pool side'
+      )
+    }
+    const opResult = await this.sendTaskFunctionOperationToWorkers({
+      taskFunctionOperation: 'remove',
+      taskFunctionName: name
+    })
+    this.deleteTaskFunctionWorkerUsages(name)
+    this.taskFunctions.delete(name)
+    return opResult
+  }
+
+  /** @inheritDoc */
+  public listTaskFunctionNames (): string[] {
+    for (const workerNode of this.workerNodes) {
+      if (
+        Array.isArray(workerNode.info.taskFunctionNames) &&
+        workerNode.info.taskFunctionNames.length > 0
+      ) {
+        return workerNode.info.taskFunctionNames
       }
     }
     return []
   }
 
+  /** @inheritDoc */
+  public async setDefaultTaskFunction (name: string): Promise<boolean> {
+    return await this.sendTaskFunctionOperationToWorkers({
+      taskFunctionOperation: 'default',
+      taskFunctionName: name
+    })
+  }
+
+  private deleteTaskFunctionWorkerUsages (name: string): void {
+    for (const workerNode of this.workerNodes) {
+      workerNode.deleteTaskFunctionWorkerUsage(name)
+    }
+  }
+
   private shallExecuteTask (workerNodeKey: number): boolean {
     return (
       this.tasksQueueSize(workerNodeKey) === 0 &&
@@ -810,7 +952,6 @@ export abstract class AbstractPool<
         data: data ?? ({} as Data),
         transferList,
         timestamp,
-        workerId: this.getWorkerInfo(workerNodeKey).id as number,
         taskId: randomUUID()
       }
       this.promiseResponseMap.set(task.taskId as string, {
@@ -858,18 +999,23 @@ export abstract class AbstractPool<
   }
 
   protected async sendKillMessageToWorker (
-    workerNodeKey: number,
-    workerId: number
+    workerNodeKey: number
   ): Promise<void> {
     await new Promise<void>((resolve, reject) => {
       this.registerWorkerMessageListener(workerNodeKey, message => {
         if (message.kill === 'success') {
           resolve()
         } else if (message.kill === 'failure') {
-          reject(new Error(`Worker ${workerId} kill message handling failed`))
+          reject(
+            new Error(
+              `Worker ${
+                message.workerId as number
+              } kill message handling failed`
+            )
+          )
         }
       })
-      this.sendToWorker(workerNodeKey, { kill: true, workerId })
+      this.sendToWorker(workerNodeKey, { kill: true })
     })
   }
 
@@ -969,8 +1115,8 @@ export abstract class AbstractPool<
     const workerInfo = this.getWorkerInfo(workerNodeKey)
     return (
       workerInfo != null &&
-      Array.isArray(workerInfo.taskFunctions) &&
-      workerInfo.taskFunctions.length > 2
+      Array.isArray(workerInfo.taskFunctionNames) &&
+      workerInfo.taskFunctionNames.length > 2
     )
   }
 
@@ -985,7 +1131,7 @@ export abstract class AbstractPool<
     ) {
       --workerTaskStatistics.executing
     }
-    if (message.taskError == null) {
+    if (message.workerError == null) {
       ++workerTaskStatistics.executed
     } else {
       ++workerTaskStatistics.failed
@@ -996,7 +1142,7 @@ export abstract class AbstractPool<
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void {
-    if (message.taskError != null) {
+    if (message.workerError != null) {
       return
     }
     updateMeasurementStatistics(
@@ -1023,7 +1169,7 @@ export abstract class AbstractPool<
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void {
-    if (message.taskError != null) {
+    if (message.workerError != null) {
       return
     }
     const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
@@ -1173,9 +1319,19 @@ export abstract class AbstractPool<
     })
     const workerInfo = this.getWorkerInfo(workerNodeKey)
     this.sendToWorker(workerNodeKey, {
-      checkActive: true,
-      workerId: workerInfo.id as number
+      checkActive: true
     })
+    if (this.taskFunctions.size > 0) {
+      for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+        this.sendTaskFunctionOperationToWorker(workerNodeKey, {
+          taskFunctionOperation: 'add',
+          taskFunctionName,
+          taskFunction: taskFunction.toString()
+        }).catch(error => {
+          this.emitter?.emit(PoolEvents.error, error)
+        })
+      }
+    }
     workerInfo.dynamic = true
     if (
       this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
@@ -1245,8 +1401,7 @@ export abstract class AbstractPool<
             .runTime.aggregate,
         elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
           .elu.aggregate
-      },
-      workerId: this.getWorkerInfo(workerNodeKey).id as number
+      }
     })
   }
 
@@ -1262,11 +1417,7 @@ export abstract class AbstractPool<
         },
         0
       )
-      const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
-      const task = {
-        ...(this.dequeueTask(workerNodeKey) as Task<Data>),
-        workerId: destinationWorkerNode.info.id as number
-      }
+      const task = this.dequeueTask(workerNodeKey) as Task<Data>
       if (this.shallExecuteTask(destinationWorkerNodeKey)) {
         this.executeTask(destinationWorkerNodeKey, task)
       } else {
@@ -1296,7 +1447,6 @@ export abstract class AbstractPool<
 
   private taskStealingOnEmptyQueue (workerId: number): void {
     const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
-    const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
     const workerNodes = this.workerNodes
       .slice()
       .sort(
@@ -1310,10 +1460,7 @@ export abstract class AbstractPool<
         workerNode.usage.tasks.queued > 0
     )
     if (sourceWorkerNode != null) {
-      const task = {
-        ...(sourceWorkerNode.popTask() as Task<Data>),
-        workerId: destinationWorkerNode.info.id as number
-      }
+      const task = sourceWorkerNode.popTask() as Task<Data>
       if (this.shallExecuteTask(destinationWorkerNodeKey)) {
         this.executeTask(destinationWorkerNodeKey, task)
       } else {
@@ -1347,10 +1494,7 @@ export abstract class AbstractPool<
         workerNode.usage.tasks.queued <
           (this.opts.tasksQueueOptions?.size as number) - sizeOffset
       ) {
-        const task = {
-          ...(sourceWorkerNode.popTask() as Task<Data>),
-          workerId: workerNode.info.id as number
-        }
+        const task = sourceWorkerNode.popTask() as Task<Data>
         if (this.shallExecuteTask(workerNodeKey)) {
           this.executeTask(workerNodeKey, task)
         } else {
@@ -1372,42 +1516,44 @@ export abstract class AbstractPool<
   protected workerListener (): (message: MessageValue<Response>) => void {
     return message => {
       this.checkMessageWorkerId(message)
-      if (message.ready != null && message.taskFunctions != null) {
+      if (message.ready != null && message.taskFunctionNames != null) {
         // Worker ready response received from worker
         this.handleWorkerReadyResponse(message)
       } else if (message.taskId != null) {
         // Task execution response received from worker
         this.handleTaskExecutionResponse(message)
-      } else if (message.taskFunctions != null) {
-        // Task functions message received from worker
+      } else if (message.taskFunctionNames != null) {
+        // Task function names message received from worker
         this.getWorkerInfo(
           this.getWorkerNodeKeyByWorkerId(message.workerId)
-        ).taskFunctions = message.taskFunctions
+        ).taskFunctionNames = message.taskFunctionNames
       }
     }
   }
 
   private handleWorkerReadyResponse (message: MessageValue<Response>): void {
     if (message.ready === false) {
-      throw new Error(`Worker ${message.workerId} failed to initialize`)
+      throw new Error(
+        `Worker ${message.workerId as number} failed to initialize`
+      )
     }
     const workerInfo = this.getWorkerInfo(
       this.getWorkerNodeKeyByWorkerId(message.workerId)
     )
     workerInfo.ready = message.ready as boolean
-    workerInfo.taskFunctions = message.taskFunctions
+    workerInfo.taskFunctionNames = message.taskFunctionNames
     if (this.ready) {
       this.emitter?.emit(PoolEvents.ready, this.info)
     }
   }
 
   private handleTaskExecutionResponse (message: MessageValue<Response>): void {
-    const { taskId, taskError, data } = message
+    const { taskId, workerError, data } = message
     const promiseResponse = this.promiseResponseMap.get(taskId as string)
     if (promiseResponse != null) {
-      if (taskError != null) {
-        this.emitter?.emit(PoolEvents.taskError, taskError)
-        promiseResponse.reject(taskError.message)
+      if (workerError != null) {
+        this.emitter?.emit(PoolEvents.taskError, workerError)
+        promiseResponse.reject(workerError.message)
       } else {
         promiseResponse.resolve(data as Response)
       }
index 17ef7e9e57adcb91967b30be12cef158d1fc37fd..9470cccd857321388d70e0bf7f09de1a20505cfe 100644 (file)
@@ -73,10 +73,7 @@ export class FixedClusterPool<
     worker.on('disconnect', () => {
       worker.kill()
     })
-    await this.sendKillMessageToWorker(
-      workerNodeKey,
-      workerNode.info.id as number
-    )
+    await this.sendKillMessageToWorker(workerNodeKey)
     worker.disconnect()
     await waitWorkerExit
   }
@@ -86,14 +83,16 @@ export class FixedClusterPool<
     workerNodeKey: number,
     message: MessageValue<Data>
   ): void {
-    this.workerNodes[workerNodeKey].worker.send(message)
+    this.workerNodes[workerNodeKey].worker.send({
+      ...message,
+      workerId: this.workerNodes[workerNodeKey].info.id as number
+    })
   }
 
   /** @inheritDoc */
   protected sendStartupMessageToWorker (workerNodeKey: number): void {
     this.sendToWorker(workerNodeKey, {
-      ready: false,
-      workerId: this.workerNodes[workerNodeKey].info.id as number
+      ready: false
     })
   }
 
index 711a71bedb25205bef8b1fdbdaf4528866e92879..7efb3808fa8e02ae6c294665461453c48e1a8451 100644 (file)
@@ -1,5 +1,6 @@
 import { EventEmitter } from 'node:events'
 import { type TransferListItem } from 'node:worker_threads'
+import type { TaskFunction } from '../worker/task-functions'
 import type {
   ErrorHandler,
   ExitHandler,
@@ -260,12 +261,45 @@ export interface IPool<
    * Terminates all workers in this pool.
    */
   readonly destroy: () => Promise<void>
+  /**
+   * Whether the specified task function exists in this pool.
+   *
+   * @param name - The name of the task function.
+   * @returns `true` if the task function exists, `false` otherwise.
+   */
+  readonly hasTaskFunction: (name: string) => boolean
+  /**
+   * Adds a task function to this pool.
+   * If a task function with the same name already exists, it will be overwritten.
+   *
+   * @param name - The name of the task function.
+   * @param fn - The task function.
+   * @returns `true` if the task function was added, `false` otherwise.
+   */
+  readonly addTaskFunction: (
+    name: string,
+    fn: TaskFunction<Data, Response>
+  ) => Promise<boolean>
+  /**
+   * Removes a task function from this pool.
+   *
+   * @param name - The name of the task function.
+   * @returns `true` if the task function was removed, `false` otherwise.
+   */
+  readonly removeTaskFunction: (name: string) => Promise<boolean>
   /**
    * Lists the names of task function available in this pool.
    *
    * @returns The names of task function available in this pool.
    */
-  readonly listTaskFunctions: () => string[]
+  readonly listTaskFunctionNames: () => string[]
+  /**
+   * Sets the default task function in this pool.
+   *
+   * @param name - The name of the task function.
+   * @returns `true` if the default task function was set, `false` otherwise.
+   */
+  readonly setDefaultTaskFunction: (name: string) => Promise<boolean>
   /**
    * Sets the worker choice strategy in this pool.
    *
index 6e234e2ea3dab6f045bfbfca84f7556afbff9f3c..2ad94a7bf16083072bf1303745834c2b7c06506a 100644 (file)
@@ -67,10 +67,7 @@ export class FixedThreadPool<
         resolve()
       })
     })
-    await this.sendKillMessageToWorker(
-      workerNodeKey,
-      workerNode.info.id as number
-    )
+    await this.sendKillMessageToWorker(workerNodeKey)
     workerNode.closeChannel()
     await worker.terminate()
     await waitWorkerExit
@@ -84,16 +81,18 @@ export class FixedThreadPool<
   ): void {
     (
       this.workerNodes[workerNodeKey].messageChannel as MessageChannel
-    ).port1.postMessage(message, transferList)
+    ).port1.postMessage(
+      { ...message, workerId: this.workerNodes[workerNodeKey].info.id },
+      transferList
+    )
   }
 
   /** @inheritDoc */
   protected sendStartupMessageToWorker (workerNodeKey: number): void {
     const workerNode = this.workerNodes[workerNodeKey]
-    const worker = workerNode.worker
     const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
       .port2
-    worker.postMessage(
+    workerNode.worker.postMessage(
       {
         ready: false,
         workerId: workerNode.info.id,
index 68c8cab2ef97070ccea235dfd391ed5479978120..e9680261b591e26707c11c5ca868873f3ca5a6e4 100644 (file)
@@ -161,21 +161,21 @@ implements IWorkerNode<Worker, Data> {
 
   /** @inheritdoc */
   public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
-    if (!Array.isArray(this.info.taskFunctions)) {
+    if (!Array.isArray(this.info.taskFunctionNames)) {
       throw new Error(
         `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
       )
     }
     if (
-      Array.isArray(this.info.taskFunctions) &&
-      this.info.taskFunctions.length < 3
+      Array.isArray(this.info.taskFunctionNames) &&
+      this.info.taskFunctionNames.length < 3
     ) {
       throw new Error(
         `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
       )
     }
     if (name === DEFAULT_TASK_NAME) {
-      name = this.info.taskFunctions[1]
+      name = this.info.taskFunctionNames[1]
     }
     if (!this.taskFunctionsUsage.has(name)) {
       this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
@@ -183,6 +183,11 @@ implements IWorkerNode<Worker, Data> {
     return this.taskFunctionsUsage.get(name)
   }
 
+  /** @inheritdoc */
+  public deleteTaskFunctionWorkerUsage (name: string): boolean {
+    return this.taskFunctionsUsage.delete(name)
+  }
+
   private async startOnEmptyQueue (): Promise<void> {
     if (
       this.onEmptyQueueCount > 0 &&
@@ -249,7 +254,7 @@ implements IWorkerNode<Worker, Data> {
       for (const task of this.tasksQueue) {
         if (
           (task.name === DEFAULT_TASK_NAME &&
-            name === (this.info.taskFunctions as string[])[1]) ||
+            name === (this.info.taskFunctionNames as string[])[1]) ||
           (task.name !== DEFAULT_TASK_NAME && name === task.name)
         ) {
           ++taskFunctionQueueSize
index 29050455088c6efd49d8147aed07c41f2a5a004e..4c877bc8b1dcc58871a55cd70c87871d918c4596 100644 (file)
@@ -144,7 +144,7 @@ export interface WorkerInfo {
   /**
    * Task function names.
    */
-  taskFunctions?: string[]
+  taskFunctionNames?: string[]
 }
 
 /**
@@ -315,4 +315,11 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
    */
   readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined
+  /**
+   * Deletes task function worker usage statistics.
+   *
+   * @param name - The task function name.
+   * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise.
+   */
+  readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
 }
index e1fb311e45b4eb8ed0e5d33cda3b3806d7526f5b..48988c089f6717e1e4108143ce87c20bdc5536d1 100644 (file)
@@ -3,13 +3,13 @@ import type { MessagePort, TransferListItem } from 'node:worker_threads'
 import type { KillBehavior } from './worker/worker-options'
 
 /**
- * Task error.
+ * Worker error.
  *
  * @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data.
  */
-export interface TaskError<Data = unknown> {
+export interface WorkerError<Data = unknown> {
   /**
-   * Task name triggering the error.
+   * Task function name triggering the error.
    */
   readonly name: string
   /**
@@ -72,7 +72,7 @@ export interface Task<Data = unknown> {
   /**
    * Worker id.
    */
-  readonly workerId: number
+  readonly workerId?: number
   /**
    * Task name.
    */
@@ -109,17 +109,36 @@ export interface MessageValue<Data = unknown, ErrorData = unknown>
    */
   readonly kill?: KillBehavior | true | 'success' | 'failure'
   /**
-   * Task error.
+   * Worker error.
    */
-  readonly taskError?: TaskError<ErrorData>
+  readonly workerError?: WorkerError<ErrorData>
   /**
    * Task performance.
    */
   readonly taskPerformance?: TaskPerformance
+  /**
+   * Task function operation:
+   * - `'add'` - Add a task function.
+   * - `'remove'` - Remove a task function.
+   * - `'default'` - Set a task function as default.
+   */
+  readonly taskFunctionOperation?: 'add' | 'remove' | 'default'
+  /**
+   * Whether the task function operation is successful or not.
+   */
+  readonly taskFunctionOperationStatus?: boolean
+  /**
+   * Task function serialized to string.
+   */
+  readonly taskFunction?: string
+  /**
+   * Task function name.
+   */
+  readonly taskFunctionName?: string
   /**
    * Task function names.
    */
-  readonly taskFunctions?: string[]
+  readonly taskFunctionNames?: string[]
   /**
    * Whether the worker computes the given statistics or not.
    */
index f0ef59327c6099bc3719be1d84b537b0496fdd22..6c6de861156e040285fc3a4ad4c50ba5e796f04f 100644 (file)
@@ -18,6 +18,7 @@ import { KillBehaviors, type WorkerOptions } from './worker-options'
 import type {
   TaskAsyncFunction,
   TaskFunction,
+  TaskFunctionOperationReturnType,
   TaskFunctions,
   TaskSyncFunction
 } from './task-functions'
@@ -199,11 +200,14 @@ export abstract class AbstractWorker<
    *
    * @param name - The name of the task function to check.
    * @returns Whether the worker has a task function with the given name or not.
-   * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
    */
-  public hasTaskFunction (name: string): boolean {
-    this.checkTaskFunctionName(name)
-    return this.taskFunctions.has(name)
+  public hasTaskFunction (name: string): TaskFunctionOperationReturnType {
+    try {
+      this.checkTaskFunctionName(name)
+    } catch (error) {
+      return { status: false, error: error as Error }
+    }
+    return { status: this.taskFunctions.has(name) }
   }
 
   /**
@@ -213,24 +217,21 @@ export abstract class AbstractWorker<
    * @param name - The name of the task function to add.
    * @param fn - The task function to add.
    * @returns Whether the task function was added or not.
-   * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
-   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
-   * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `fn` parameter is not a function.
    */
   public addTaskFunction (
     name: string,
     fn: TaskFunction<Data, Response>
-  ): boolean {
-    this.checkTaskFunctionName(name)
-    if (name === DEFAULT_TASK_NAME) {
-      throw new Error(
-        'Cannot add a task function with the default reserved name'
-      )
-    }
-    if (typeof fn !== 'function') {
-      throw new TypeError('fn parameter is not a function')
-    }
+  ): TaskFunctionOperationReturnType {
     try {
+      this.checkTaskFunctionName(name)
+      if (name === DEFAULT_TASK_NAME) {
+        throw new Error(
+          'Cannot add a task function with the default reserved name'
+        )
+      }
+      if (typeof fn !== 'function') {
+        throw new TypeError('fn parameter is not a function')
+      }
       const boundFn = fn.bind(this)
       if (
         this.taskFunctions.get(name) ===
@@ -239,10 +240,10 @@ export abstract class AbstractWorker<
         this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
       }
       this.taskFunctions.set(name, boundFn)
-      this.sendTaskFunctionsListToMainWorker()
-      return true
-    } catch {
-      return false
+      this.sendTaskFunctionNamesToMainWorker()
+      return { status: true }
+    } catch (error) {
+      return { status: false, error: error as Error }
     }
   }
 
@@ -251,27 +252,29 @@ export abstract class AbstractWorker<
    *
    * @param name - The name of the task function to remove.
    * @returns Whether the task function existed and was removed or not.
-   * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
-   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
-   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the task function used as default task function.
    */
-  public removeTaskFunction (name: string): boolean {
-    this.checkTaskFunctionName(name)
-    if (name === DEFAULT_TASK_NAME) {
-      throw new Error(
-        'Cannot remove the task function with the default reserved name'
-      )
-    }
-    if (
-      this.taskFunctions.get(name) === this.taskFunctions.get(DEFAULT_TASK_NAME)
-    ) {
-      throw new Error(
-        'Cannot remove the task function used as the default task function'
-      )
+  public removeTaskFunction (name: string): TaskFunctionOperationReturnType {
+    try {
+      this.checkTaskFunctionName(name)
+      if (name === DEFAULT_TASK_NAME) {
+        throw new Error(
+          'Cannot remove the task function with the default reserved name'
+        )
+      }
+      if (
+        this.taskFunctions.get(name) ===
+        this.taskFunctions.get(DEFAULT_TASK_NAME)
+      ) {
+        throw new Error(
+          'Cannot remove the task function used as the default task function'
+        )
+      }
+      const deleteStatus = this.taskFunctions.delete(name)
+      this.sendTaskFunctionNamesToMainWorker()
+      return { status: deleteStatus }
+    } catch (error) {
+      return { status: false, error: error as Error }
     }
-    const deleteStatus = this.taskFunctions.delete(name)
-    this.sendTaskFunctionsListToMainWorker()
-    return deleteStatus
   }
 
   /**
@@ -279,7 +282,7 @@ export abstract class AbstractWorker<
    *
    * @returns The names of the worker's task functions.
    */
-  public listTaskFunctions (): string[] {
+  public listTaskFunctionNames (): string[] {
     const names: string[] = [...this.taskFunctions.keys()]
     let defaultTaskFunctionName: string = DEFAULT_TASK_NAME
     for (const [name, fn] of this.taskFunctions) {
@@ -305,30 +308,28 @@ export abstract class AbstractWorker<
    *
    * @param name - The name of the task function to use as default task function.
    * @returns Whether the default task function was set or not.
-   * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
-   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
-   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is a non-existing task function.
    */
-  public setDefaultTaskFunction (name: string): boolean {
-    this.checkTaskFunctionName(name)
-    if (name === DEFAULT_TASK_NAME) {
-      throw new Error(
-        'Cannot set the default task function reserved name as the default task function'
-      )
-    }
-    if (!this.taskFunctions.has(name)) {
-      throw new Error(
-        'Cannot set the default task function to a non-existing task function'
-      )
-    }
+  public setDefaultTaskFunction (name: string): TaskFunctionOperationReturnType {
     try {
+      this.checkTaskFunctionName(name)
+      if (name === DEFAULT_TASK_NAME) {
+        throw new Error(
+          'Cannot set the default task function reserved name as the default task function'
+        )
+      }
+      if (!this.taskFunctions.has(name)) {
+        throw new Error(
+          'Cannot set the default task function to a non-existing task function'
+        )
+      }
       this.taskFunctions.set(
         DEFAULT_TASK_NAME,
         this.taskFunctions.get(name) as TaskFunction<Data, Response>
       )
-      return true
-    } catch {
-      return false
+      this.sendTaskFunctionNamesToMainWorker()
+      return { status: true }
+    } catch (error) {
+      return { status: false, error: error as Error }
     }
   }
 
@@ -361,6 +362,9 @@ export abstract class AbstractWorker<
     } else if (message.checkActive != null) {
       // Check active message received
       message.checkActive ? this.startCheckActive() : this.stopCheckActive()
+    } else if (message.taskFunctionOperation != null) {
+      // Task function operation message received
+      this.handleTaskFunctionOperationMessage(message)
     } else if (message.taskId != null && message.data != null) {
       // Task message received
       this.run(message)
@@ -370,6 +374,39 @@ export abstract class AbstractWorker<
     }
   }
 
+  protected handleTaskFunctionOperationMessage (
+    message: MessageValue<Data>
+  ): void {
+    const { taskFunctionOperation, taskFunctionName, taskFunction } = message
+    let response!: TaskFunctionOperationReturnType
+    if (taskFunctionOperation === 'add') {
+      response = this.addTaskFunction(
+        taskFunctionName as string,
+        // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func
+        new Function(`return ${taskFunction as string}`)() as TaskFunction<
+        Data,
+        Response
+        >
+      )
+    } else if (taskFunctionOperation === 'remove') {
+      response = this.removeTaskFunction(taskFunctionName as string)
+    } else if (taskFunctionOperation === 'default') {
+      response = this.setDefaultTaskFunction(taskFunctionName as string)
+    }
+    this.sendToMainWorker({
+      taskFunctionOperation,
+      taskFunctionOperationStatus: response.status,
+      taskFunctionName,
+      ...(!response.status &&
+        response?.error != null && {
+        workerError: {
+          name: taskFunctionName as string,
+          message: this.handleError(response.error as Error | string)
+        }
+      })
+    })
+  }
+
   /**
    * Handles a kill message sent by the main worker.
    *
@@ -380,11 +417,11 @@ export abstract class AbstractWorker<
     if (isAsyncFunction(this.opts.killHandler)) {
       (this.opts.killHandler?.() as Promise<void>)
         .then(() => {
-          this.sendToMainWorker({ kill: 'success', workerId: this.id })
+          this.sendToMainWorker({ kill: 'success' })
           return null
         })
         .catch(() => {
-          this.sendToMainWorker({ kill: 'failure', workerId: this.id })
+          this.sendToMainWorker({ kill: 'failure' })
         })
         .finally(() => {
           this.emitDestroy()
@@ -394,9 +431,9 @@ export abstract class AbstractWorker<
       try {
         // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
         this.opts.killHandler?.() as void
-        this.sendToMainWorker({ kill: 'success', workerId: this.id })
+        this.sendToMainWorker({ kill: 'success' })
       } catch {
-        this.sendToMainWorker({ kill: 'failure', workerId: this.id })
+        this.sendToMainWorker({ kill: 'failure' })
       } finally {
         this.emitDestroy()
       }
@@ -448,7 +485,7 @@ export abstract class AbstractWorker<
       performance.now() - this.lastTaskTimestamp >
       (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
     ) {
-      this.sendToMainWorker({ kill: this.opts.killBehavior, workerId: this.id })
+      this.sendToMainWorker({ kill: this.opts.killBehavior })
     }
   }
 
@@ -475,12 +512,11 @@ export abstract class AbstractWorker<
   ): void
 
   /**
-   * Sends the list of task function names to the main worker.
+   * Sends task function names to the main worker.
    */
-  protected sendTaskFunctionsListToMainWorker (): void {
+  protected sendTaskFunctionNamesToMainWorker (): void {
     this.sendToMainWorker({
-      taskFunctions: this.listTaskFunctions(),
-      workerId: this.id
+      taskFunctionNames: this.listTaskFunctionNames()
     })
   }
 
@@ -505,12 +541,11 @@ export abstract class AbstractWorker<
     const fn = this.taskFunctions.get(name ?? DEFAULT_TASK_NAME)
     if (fn == null) {
       this.sendToMainWorker({
-        taskError: {
+        workerError: {
           name: name as string,
           message: `Task function '${name as string}' not found`,
           data
         },
-        workerId: this.id,
         taskId
       })
       return
@@ -540,17 +575,15 @@ export abstract class AbstractWorker<
       this.sendToMainWorker({
         data: res,
         taskPerformance,
-        workerId: this.id,
         taskId
       })
     } catch (error) {
       this.sendToMainWorker({
-        taskError: {
+        workerError: {
           name: name as string,
           message: this.handleError(error as Error | string),
           data
         },
-        workerId: this.id,
         taskId
       })
     } finally {
@@ -576,19 +609,17 @@ export abstract class AbstractWorker<
         this.sendToMainWorker({
           data: res,
           taskPerformance,
-          workerId: this.id,
           taskId
         })
-        return null
+        return undefined
       })
       .catch(error => {
         this.sendToMainWorker({
-          taskError: {
+          workerError: {
             name: name as string,
             message: this.handleError(error as Error | string),
             data
           },
-          workerId: this.id,
           taskId
         })
       })
index 26964aa489f83fff90f8e4306c3dc1bbafc36f23..201a516c56e1d8ab2a3d2d569c4d54d7f0fd484d 100644 (file)
@@ -48,14 +48,12 @@ export class ClusterWorker<
         this.getMainWorker().on('message', this.messageListener.bind(this))
         this.sendToMainWorker({
           ready: true,
-          taskFunctions: this.listTaskFunctions(),
-          workerId: this.id
+          taskFunctionNames: this.listTaskFunctionNames()
         })
       } catch {
         this.sendToMainWorker({
           ready: false,
-          taskFunctions: this.listTaskFunctions(),
-          workerId: this.id
+          taskFunctionNames: this.listTaskFunctionNames()
         })
       }
     }
@@ -68,6 +66,6 @@ export class ClusterWorker<
 
   /** @inheritDoc */
   protected sendToMainWorker (message: MessageValue<Response>): void {
-    this.getMainWorker().send(message)
+    this.getMainWorker().send({ ...message, workerId: this.id })
   }
 }
index a61d69627ce6eeebd36e462f5801c36e94cc8e06..353b8b0c1f8b329d71c89e6a89bbbdd921d3d548 100644 (file)
@@ -43,3 +43,11 @@ export type TaskFunctions<Data = unknown, Response = unknown> = Record<
 string,
 TaskFunction<Data, Response>
 >
+
+/**
+ * Task function operation return type.
+ */
+export interface TaskFunctionOperationReturnType {
+  status: boolean
+  error?: Error
+}
index d6a3699220b63fad5dc3191c1b4c1ca72d112bc8..7b92caf67201795797f6e3e06fff4fc593f4ee94 100644 (file)
@@ -62,14 +62,12 @@ export class ThreadWorker<
         this.port.on('message', this.messageListener.bind(this))
         this.sendToMainWorker({
           ready: true,
-          taskFunctions: this.listTaskFunctions(),
-          workerId: this.id
+          taskFunctionNames: this.listTaskFunctionNames()
         })
       } catch {
         this.sendToMainWorker({
           ready: false,
-          taskFunctions: this.listTaskFunctions(),
-          workerId: this.id
+          taskFunctionNames: this.listTaskFunctionNames()
         })
       }
     }
@@ -89,7 +87,7 @@ export class ThreadWorker<
 
   /** @inheritDoc */
   protected sendToMainWorker (message: MessageValue<Response>): void {
-    this.port.postMessage(message)
+    this.port.postMessage({ ...message, workerId: this.id })
   }
 
   /** @inheritDoc */
index e9c65dde91de251b6228c303861696a9f8629162..3f59396a192efce942e1fc29b2164e16edf719b7 100644 (file)
@@ -1243,34 +1243,209 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify that listTaskFunctions() is working', async () => {
+  it('Verify that hasTaskFunction() is working', async () => {
     const dynamicThreadPool = new DynamicThreadPool(
       Math.floor(numberOfWorkers / 2),
       numberOfWorkers,
       './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
     )
     await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
-    expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
+    expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
+    expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
+      true
+    )
+    expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
+    expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
+    expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
+    await dynamicThreadPool.destroy()
+    const fixedClusterPool = new FixedClusterPool(
+      numberOfWorkers,
+      './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+    )
+    await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
+    expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
+    expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
+      true
+    )
+    expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
+    expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
+    expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
+    await fixedClusterPool.destroy()
+  })
+
+  it('Verify that addTaskFunction() is working', async () => {
+    const dynamicThreadPool = new DynamicThreadPool(
+      Math.floor(numberOfWorkers / 2),
+      numberOfWorkers,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+    await expect(
+      dynamicThreadPool.addTaskFunction(0, () => {})
+    ).rejects.toThrowError(new TypeError('name argument must be a string'))
+    await expect(
+      dynamicThreadPool.addTaskFunction('', () => {})
+    ).rejects.toThrowError(
+      new TypeError('name argument must not be an empty string')
+    )
+    await expect(
+      dynamicThreadPool.addTaskFunction('test', 0)
+    ).rejects.toThrowError(new TypeError('fn argument must be a function'))
+    await expect(
+      dynamicThreadPool.addTaskFunction('test', '')
+    ).rejects.toThrowError(new TypeError('fn argument must be a function'))
+    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+      DEFAULT_TASK_NAME,
+      'test'
+    ])
+    const echoTaskFunction = data => {
+      return data
+    }
+    await expect(
+      dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+    ).resolves.toBe(true)
+    expect(dynamicThreadPool.taskFunctions.size).toBe(1)
+    expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
+      echoTaskFunction
+    )
+    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+      DEFAULT_TASK_NAME,
+      'test',
+      'echo'
+    ])
+    const taskFunctionData = { test: 'test' }
+    const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
+    expect(echoResult).toStrictEqual(taskFunctionData)
+    for (const workerNode of dynamicThreadPool.workerNodes) {
+      expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
+        tasks: {
+          executed: expect.any(Number),
+          executing: 0,
+          queued: 0,
+          stolen: 0,
+          failed: 0
+        },
+        runTime: {
+          history: new CircularArray()
+        },
+        waitTime: {
+          history: new CircularArray()
+        },
+        elu: {
+          idle: {
+            history: new CircularArray()
+          },
+          active: {
+            history: new CircularArray()
+          }
+        }
+      })
+    }
+    await dynamicThreadPool.destroy()
+  })
+
+  it('Verify that removeTaskFunction() is working', async () => {
+    const dynamicThreadPool = new DynamicThreadPool(
+      Math.floor(numberOfWorkers / 2),
+      numberOfWorkers,
+      './tests/worker-files/thread/testWorker.js'
+    )
+    await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+      DEFAULT_TASK_NAME,
+      'test'
+    ])
+    await expect(
+      dynamicThreadPool.removeTaskFunction('test')
+    ).rejects.toThrowError(
+      new Error('Cannot remove a task function not handled on the pool side')
+    )
+    const echoTaskFunction = data => {
+      return data
+    }
+    await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+    expect(dynamicThreadPool.taskFunctions.size).toBe(1)
+    expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
+      echoTaskFunction
+    )
+    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+      DEFAULT_TASK_NAME,
+      'test',
+      'echo'
+    ])
+    await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
+      true
+    )
+    expect(dynamicThreadPool.taskFunctions.size).toBe(0)
+    expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
+    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+      DEFAULT_TASK_NAME,
+      'test'
+    ])
+    await dynamicThreadPool.destroy()
+  })
+
+  it('Verify that listTaskFunctionNames() is working', async () => {
+    const dynamicThreadPool = new DynamicThreadPool(
+      Math.floor(numberOfWorkers / 2),
+      numberOfWorkers,
+      './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
+    )
+    await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
       DEFAULT_TASK_NAME,
       'jsonIntegerSerialization',
       'factorial',
       'fibonacci'
     ])
+    await dynamicThreadPool.destroy()
     const fixedClusterPool = new FixedClusterPool(
       numberOfWorkers,
       './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
     )
     await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
-    expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
+    expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
       DEFAULT_TASK_NAME,
       'jsonIntegerSerialization',
       'factorial',
       'fibonacci'
     ])
-    await dynamicThreadPool.destroy()
     await fixedClusterPool.destroy()
   })
 
+  it('Verify that setDefaultTaskFunction() is working', async () => {
+    const dynamicThreadPool = new DynamicThreadPool(
+      Math.floor(numberOfWorkers / 2),
+      numberOfWorkers,
+      './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
+    )
+    await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+      DEFAULT_TASK_NAME,
+      'jsonIntegerSerialization',
+      'factorial',
+      'fibonacci'
+    ])
+    await expect(
+      dynamicThreadPool.setDefaultTaskFunction('factorial')
+    ).resolves.toBe(true)
+    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+      DEFAULT_TASK_NAME,
+      'factorial',
+      'jsonIntegerSerialization',
+      'fibonacci'
+    ])
+    await expect(
+      dynamicThreadPool.setDefaultTaskFunction('fibonacci')
+    ).resolves.toBe(true)
+    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+      DEFAULT_TASK_NAME,
+      'fibonacci',
+      'jsonIntegerSerialization',
+      'factorial'
+    ])
+  })
+
   it('Verify that multiple task functions worker is working', async () => {
     const pool = new DynamicClusterPool(
       Math.floor(numberOfWorkers / 2),
@@ -1289,14 +1464,14 @@ describe('Abstract pool test suite', () => {
     expect(pool.info.executingTasks).toBe(0)
     expect(pool.info.executedTasks).toBe(4)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.info.taskFunctions).toStrictEqual([
+      expect(workerNode.info.taskFunctionNames).toStrictEqual([
         DEFAULT_TASK_NAME,
         'jsonIntegerSerialization',
         'factorial',
         'fibonacci'
       ])
       expect(workerNode.taskFunctionsUsage.size).toBe(3)
-      for (const name of pool.listTaskFunctions()) {
+      for (const name of pool.listTaskFunctionNames()) {
         expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
           tasks: {
             executed: expect.any(Number),
@@ -1327,7 +1502,9 @@ describe('Abstract pool test suite', () => {
       expect(
         workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
       ).toStrictEqual(
-        workerNode.getTaskFunctionWorkerUsage(workerNode.info.taskFunctions[1])
+        workerNode.getTaskFunctionWorkerUsage(
+          workerNode.info.taskFunctionNames[1]
+        )
       )
     }
     await pool.destroy()
@@ -1341,11 +1518,51 @@ describe('Abstract pool test suite', () => {
     )
     const workerNodeKey = 0
     await expect(
-      pool.sendKillMessageToWorker(
-        workerNodeKey,
-        pool.workerNodes[workerNodeKey].info.id
-      )
+      pool.sendKillMessageToWorker(workerNodeKey)
     ).resolves.toBeUndefined()
     await pool.destroy()
   })
+
+  it('Verify sendTaskFunctionOperationToWorker()', async () => {
+    const pool = new DynamicClusterPool(
+      Math.floor(numberOfWorkers / 2),
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js'
+    )
+    const workerNodeKey = 0
+    await expect(
+      pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
+        taskFunctionOperation: 'add',
+        taskFunctionName: 'empty',
+        taskFunction: (() => {}).toString()
+      })
+    ).resolves.toBe(true)
+    expect(
+      pool.workerNodes[workerNodeKey].info.taskFunctionNames
+    ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
+    await pool.destroy()
+  })
+
+  it('Verify sendTaskFunctionOperationToWorkers()', async () => {
+    const pool = new DynamicClusterPool(
+      Math.floor(numberOfWorkers / 2),
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js'
+    )
+    await expect(
+      pool.sendTaskFunctionOperationToWorkers({
+        taskFunctionOperation: 'add',
+        taskFunctionName: 'empty',
+        taskFunction: (() => {}).toString()
+      })
+    ).resolves.toBe(true)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.info.taskFunctionNames).toStrictEqual([
+        DEFAULT_TASK_NAME,
+        'test',
+        'empty'
+      ])
+    }
+    await pool.destroy()
+  })
 })
index 45b97c1c9251086a9d9154e01efc1b37fa9dd74b..ee469eda7b39692f4a5a26e9ba3782453c66c981 100644 (file)
@@ -139,7 +139,7 @@ describe('Worker node test suite', () => {
         "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
       )
     )
-    threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1']
+    threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
     expect(() =>
       threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
     ).toThrowError(
@@ -147,7 +147,7 @@ describe('Worker node test suite', () => {
         "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
       )
     )
-    threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
+    threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
     expect(
       threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
     ).toStrictEqual({
@@ -221,4 +221,19 @@ describe('Worker node test suite', () => {
     })
     expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
   })
+
+  it('Worker node deleteTaskFunctionWorkerUsage()', () => {
+    expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
+      DEFAULT_TASK_NAME,
+      'fn1',
+      'fn2'
+    ])
+    expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
+    expect(
+      threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
+    ).toBe(false)
+    expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
+    expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
+    expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
+  })
 })
index 5860ebffa07aaa41f7edc7ff4c4280ce6454fed6..200b05acdff0f26a29c0eb4e2e760a64448935f5 100644 (file)
@@ -200,7 +200,7 @@ describe('Abstract worker test suite', () => {
     expect(killHandlerStub.calledOnce).toBe(true)
   })
 
-  it('Verify that handleError() method works properly', () => {
+  it('Verify that handleError() method is working properly', () => {
     const error = new Error('Error as an error')
     const worker = new ClusterWorker(() => {})
     expect(worker.handleError(error)).not.toBeInstanceOf(Error)
@@ -215,7 +215,7 @@ describe('Abstract worker test suite', () => {
     ).toThrowError('Main worker not set')
   })
 
-  it('Verify that hasTaskFunction() works', () => {
+  it('Verify that hasTaskFunction() is working', () => {
     const fn1 = () => {
       return 1
     }
@@ -223,19 +223,23 @@ describe('Abstract worker test suite', () => {
       return 2
     }
     const worker = new ClusterWorker({ fn1, fn2 })
-    expect(() => worker.hasTaskFunction(0)).toThrowError(
-      new TypeError('name parameter is not a string')
-    )
-    expect(() => worker.hasTaskFunction('')).toThrowError(
-      new TypeError('name parameter is an empty string')
-    )
-    expect(worker.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
-    expect(worker.hasTaskFunction('fn1')).toBe(true)
-    expect(worker.hasTaskFunction('fn2')).toBe(true)
-    expect(worker.hasTaskFunction('fn3')).toBe(false)
+    expect(worker.hasTaskFunction(0)).toStrictEqual({
+      status: false,
+      error: new TypeError('name parameter is not a string')
+    })
+    expect(worker.hasTaskFunction('')).toStrictEqual({
+      status: false,
+      error: new TypeError('name parameter is an empty string')
+    })
+    expect(worker.hasTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({
+      status: true
+    })
+    expect(worker.hasTaskFunction('fn1')).toStrictEqual({ status: true })
+    expect(worker.hasTaskFunction('fn2')).toStrictEqual({ status: true })
+    expect(worker.hasTaskFunction('fn3')).toStrictEqual({ status: false })
   })
 
-  it('Verify that addTaskFunction() works', () => {
+  it('Verify that addTaskFunction() is working', () => {
     const fn1 = () => {
       return 1
     }
@@ -246,24 +250,30 @@ describe('Abstract worker test suite', () => {
       return 3
     }
     const worker = new ThreadWorker(fn1)
-    expect(() => worker.addTaskFunction(0, fn1)).toThrowError(
-      new TypeError('name parameter is not a string')
-    )
-    expect(() => worker.addTaskFunction('', fn1)).toThrowError(
-      new TypeError('name parameter is an empty string')
-    )
-    expect(() => worker.addTaskFunction('fn3', '')).toThrowError(
-      new TypeError('fn parameter is not a function')
-    )
+    expect(worker.addTaskFunction(0, fn1)).toStrictEqual({
+      status: false,
+      error: new TypeError('name parameter is not a string')
+    })
+    expect(worker.addTaskFunction('', fn1)).toStrictEqual({
+      status: false,
+      error: new TypeError('name parameter is an empty string')
+    })
+    expect(worker.addTaskFunction('fn3', '')).toStrictEqual({
+      status: false,
+      error: new TypeError('fn parameter is not a function')
+    })
     expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
     expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
     expect(worker.taskFunctions.size).toBe(2)
     expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
       worker.taskFunctions.get('fn1')
     )
-    expect(() => worker.addTaskFunction(DEFAULT_TASK_NAME, fn2)).toThrowError(
-      new Error('Cannot add a task function with the default reserved name')
-    )
+    expect(worker.addTaskFunction(DEFAULT_TASK_NAME, fn2)).toStrictEqual({
+      status: false,
+      error: new Error(
+        'Cannot add a task function with the default reserved name'
+      )
+    })
     worker.addTaskFunction('fn2', fn2)
     expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
     expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
@@ -282,7 +292,7 @@ describe('Abstract worker test suite', () => {
     )
   })
 
-  it('Verify that removeTaskFunction() works', () => {
+  it('Verify that removeTaskFunction() is working', () => {
     const fn1 = () => {
       return 1
     }
@@ -290,12 +300,14 @@ describe('Abstract worker test suite', () => {
       return 2
     }
     const worker = new ClusterWorker({ fn1, fn2 })
-    expect(() => worker.removeTaskFunction(0, fn1)).toThrowError(
-      new TypeError('name parameter is not a string')
-    )
-    expect(() => worker.removeTaskFunction('', fn1)).toThrowError(
-      new TypeError('name parameter is an empty string')
-    )
+    expect(worker.removeTaskFunction(0, fn1)).toStrictEqual({
+      status: false,
+      error: new TypeError('name parameter is not a string')
+    })
+    expect(worker.removeTaskFunction('', fn1)).toStrictEqual({
+      status: false,
+      error: new TypeError('name parameter is an empty string')
+    })
     worker.getMainWorker = sinon.stub().returns({
       id: 1,
       send: sinon.stub().returns()
@@ -307,16 +319,18 @@ describe('Abstract worker test suite', () => {
     expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
       worker.taskFunctions.get('fn1')
     )
-    expect(() => worker.removeTaskFunction(DEFAULT_TASK_NAME)).toThrowError(
-      new Error(
+    expect(worker.removeTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({
+      status: false,
+      error: new Error(
         'Cannot remove the task function with the default reserved name'
       )
-    )
-    expect(() => worker.removeTaskFunction('fn1')).toThrowError(
-      new Error(
+    })
+    expect(worker.removeTaskFunction('fn1')).toStrictEqual({
+      status: false,
+      error: new Error(
         'Cannot remove the task function used as the default task function'
       )
-    )
+    })
     worker.removeTaskFunction('fn2')
     expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
     expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
@@ -325,7 +339,7 @@ describe('Abstract worker test suite', () => {
     expect(worker.getMainWorker().send.calledOnce).toBe(true)
   })
 
-  it('Verify that listTaskFunctions() works', () => {
+  it('Verify that listTaskFunctionNames() is working', () => {
     const fn1 = () => {
       return 1
     }
@@ -333,14 +347,14 @@ describe('Abstract worker test suite', () => {
       return 2
     }
     const worker = new ClusterWorker({ fn1, fn2 })
-    expect(worker.listTaskFunctions()).toStrictEqual([
+    expect(worker.listTaskFunctionNames()).toStrictEqual([
       DEFAULT_TASK_NAME,
       'fn1',
       'fn2'
     ])
   })
 
-  it('Verify that setDefaultTaskFunction() works', () => {
+  it('Verify that setDefaultTaskFunction() is working', () => {
     const fn1 = () => {
       return 1
     }
@@ -348,12 +362,14 @@ describe('Abstract worker test suite', () => {
       return 2
     }
     const worker = new ThreadWorker({ fn1, fn2 })
-    expect(() => worker.setDefaultTaskFunction(0, fn1)).toThrowError(
-      new TypeError('name parameter is not a string')
-    )
-    expect(() => worker.setDefaultTaskFunction('', fn1)).toThrowError(
-      new TypeError('name parameter is an empty string')
-    )
+    expect(worker.setDefaultTaskFunction(0, fn1)).toStrictEqual({
+      status: false,
+      error: new TypeError('name parameter is not a string')
+    })
+    expect(worker.setDefaultTaskFunction('', fn1)).toStrictEqual({
+      status: false,
+      error: new TypeError('name parameter is an empty string')
+    })
     expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
     expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
     expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
@@ -361,16 +377,18 @@ describe('Abstract worker test suite', () => {
     expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
       worker.taskFunctions.get('fn1')
     )
-    expect(() => worker.setDefaultTaskFunction(DEFAULT_TASK_NAME)).toThrowError(
-      new Error(
+    expect(worker.setDefaultTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({
+      status: false,
+      error: new Error(
         'Cannot set the default task function reserved name as the default task function'
       )
-    )
-    expect(() => worker.setDefaultTaskFunction('fn3')).toThrowError(
-      new Error(
+    })
+    expect(worker.setDefaultTaskFunction('fn3')).toStrictEqual({
+      status: false,
+      error: new Error(
         'Cannot set the default task function to a non-existing task function'
       )
-    )
+    })
     worker.setDefaultTaskFunction('fn1')
     expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
       worker.taskFunctions.get('fn1')