Merge branch 'master' of github.com:poolifier/poolifier into feature/task-functions
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 16 Sep 2023 21:46:02 +0000 (23:46 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 16 Sep 2023 21:46:02 +0000 (23:46 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
1  2 
CHANGELOG.md
docs/api.md
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker-node.ts
tests/pools/abstract/abstract-pool.test.js

diff --combined CHANGELOG.md
index f1857ed75645b87a41162fd8ef8317dd9288d7e2,f40bbb4f299608631aeb934920e6d34d5df26510..852f5693e1aac978bbf9451764f598f546d4eaa2
@@@ -10,12 -10,13 +10,18 @@@ and this project adheres to [Semantic V
  ### Changed
  
  - Disable publication on GitHub packages registry on release until authentication issue is fixed.
 +- Rename `listTaskFunctions()` to `listTaskFunctionNames()` in pool and worker API.
 +
 +### Added
 +
 +- Add `addTaskFunction()`, `removeTaskFunction()`, `setDefaultTaskFunction()` methods to pool API.
  
+ ### Added
+ - Add `startWorkers` to pool options to whether start the minimum number of workers at pool creation or not.
+ - Add `taskStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing on back pressure or not.
+ - Continuous internal benchmarking: https://poolifier.github.io/benchmark-results/dev/bench.
  ## [2.6.44] - 2023-09-08
  
  ### Fixed
  - Optimize worker alive status check.
  - BREAKING CHANGE: Rename worker choice strategy `LESS_RECENTLY_USED` to `LESS_USED`.
  - Optimize `LESS_USED` worker choice strategy.
- - Update benchmarks versus external threads pools.
+ - Update benchmark versus external threads pools.
  - Optimize tasks usage statistics requirements for worker choice strategy.
  
  ### Fixed
  - Optimize worker alive status check.
  - BREAKING CHANGE: Rename worker choice strategy `LESS_RECENTLY_USED` to `LESS_USED`.
  - Optimize `LESS_USED` worker choice strategy.
- - Update benchmarks versus external threads pools.
+ - Update benchmark versus external threads pools.
  
  ### Fixed
  
  ### Changed
  
  - Optimize fair share task scheduling algorithm implementation.
- - Update benchmarks versus external pools results with latest version.
+ - Update benchmark versus external pools results with latest version.
  
  ## [2.3.3] - 2022-10-15
  
diff --combined docs/api.md
index ad8ede83926db63389c9f8640d3036bc99815703,7939123b1e64b0bcbe5c75103aeb3f93a47ba830..20d10edf90313edc75259739a5995849bddcb250
@@@ -6,8 -6,9 +6,9 @@@
    - [`pool = new FixedThreadPool/FixedClusterPool(numberOfThreads/numberOfWorkers, filePath, opts)`](#pool--new-fixedthreadpoolfixedclusterpoolnumberofthreadsnumberofworkers-filepath-opts)
    - [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts)
    - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
+   - [`pool.start()`](#poolstart)
    - [`pool.destroy()`](#pooldestroy)
 -  - [`pool.listTaskFunctions()`](#poollisttaskfunctions)
 +  - [`pool.listTaskFunctionNames()`](#poollisttaskfunctionnames)
    - [`PoolOptions`](#pooloptions)
      - [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions)
      - [`ClusterPoolOptions extends PoolOptions`](#clusterpooloptions-extends-pooloptions)
@@@ -16,7 -17,7 +17,7 @@@
      - [`YourWorker.hasTaskFunction(name)`](#yourworkerhastaskfunctionname)
      - [`YourWorker.addTaskFunction(name, fn)`](#yourworkeraddtaskfunctionname-fn)
      - [`YourWorker.removeTaskFunction(name)`](#yourworkerremovetaskfunctionname)
 -    - [`YourWorker.listTaskFunctions()`](#yourworkerlisttaskfunctions)
 +    - [`YourWorker.listTaskFunctionNames()`](#yourworkerlisttaskfunctionnames)
      - [`YourWorker.setDefaultTaskFunction(name)`](#yourworkersetdefaulttaskfunctionname)
  
  ## Pool
  
  This method is available on both pool implementations and returns a promise with the task function execution response.
  
+ ### `pool.start()`
+ This method is available on both pool implementations and will start the minimum number of workers.
  ### `pool.destroy()`
  
  This method is available on both pool implementations and will call the terminate method on each worker.
  
 -### `pool.listTaskFunctions()`
 +### `pool.listTaskFunctionNames()`
  
  This method is available on both pool implementations and returns an array of the task function names.
  
  
  An object with these properties:
  
- - `onlineHandler` (optional) - A function that will listen for online event on each worker
- - `messageHandler` (optional) - A function that will listen for message event on each worker
- - `errorHandler` (optional) - A function that will listen for error event on each worker
- - `exitHandler` (optional) - A function that will listen for exit event on each worker
+ - `onlineHandler` (optional) - A function that will listen for online event on each worker.  
+   Default: `() => {}`
+ - `messageHandler` (optional) - A function that will listen for message event on each worker.  
+   Default: `() => {}`
+ - `errorHandler` (optional) - A function that will listen for error event on each worker.  
+   Default: `() => {}`
+ - `exitHandler` (optional) - A function that will listen for exit event on each worker.  
+   Default: `() => {}`
  - `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool:
  
    - `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
@@@ -83,6 -93,8 +93,8 @@@
  
    Default: `{ retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
  
+ - `startWorkers` (optional) - Start the minimum number of workers at pool creation.  
+   Default: `true`
  - `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool.  
    Default: `true`
  - `enableEvents` (optional) - Events emission enablement in this pool.  
  
    - `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer.
    - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
+   - `taskStealing` (optional) - Task stealing enablement.
+   - `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement on back pressure.
  
-   Default: `{ size: (pool maximum size)^2, concurrency: 1 }`
+   Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true }`
  
  #### `ThreadPoolOptions extends PoolOptions`
  
@@@ -149,7 -163,7 +163,7 @@@ This method is available on both worke
  
  This method is available on both worker implementations and returns a boolean.
  
 -#### `YourWorker.listTaskFunctions()`
 +#### `YourWorker.listTaskFunctionNames()`
  
  This method is available on both worker implementations and returns an array of the task function names.
  
index e122e88180bce90a32710f0d8f6bd8537d57b8b9,fb1bf8458ad7a970f27ae7a4dcbb300faf5bf848..6761882589bb14bd9d205c33684ac7e7b3ab5128
@@@ -21,7 -21,6 +21,7 @@@ import 
    updateMeasurementStatistics
  } from '../utils'
  import { KillBehaviors } from '../worker/worker-options'
 +import type { TaskFunction } from '../worker/task-functions'
  import {
    type IPool,
    PoolEmitter,
@@@ -69,7 -68,8 +69,7 @@@ export abstract class AbstractPool
    public readonly emitter?: PoolEmitter
  
    /**
 -   * The task execution response promise map.
 -   *
 +   * The task execution response promise map:
     * - `key`: The message id of each submitted task.
     * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
     *
     */
    protected readonly max?: number
  
-   /**
-    * Whether the pool is starting or not.
-    */
-   private readonly starting: boolean
 +  /**
 +   * The task functions added at runtime map:
 +   * - `key`: The task function name.
 +   * - `value`: The task function itself.
 +   */
 +  private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
 +
    /**
     * Whether the pool is started or not.
     */
    private started: boolean
+   /**
+    * Whether the pool is starting or not.
+    */
+   private starting: boolean
    /**
     * The start timestamp of the pool.
     */
  
      this.setupHook()
  
-     this.starting = true
-     this.startPool()
 +    this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
 +
+     this.started = false
      this.starting = false
-     this.started = true
+     if (this.opts.startWorkers === true) {
+       this.start()
+     }
  
      this.startTimestamp = performance.now()
    }
  
    private checkPoolOptions (opts: PoolOptions<Worker>): void {
      if (isPlainObject(opts)) {
+       this.opts.startWorkers = opts.startWorkers ?? true
        this.opts.workerChoiceStrategy =
          opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
        this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
          `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
        )
      }
-     if (tasksQueueOptions?.queueMaxSize != null) {
-       throw new Error(
-         'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
-       )
-     }
      if (
        tasksQueueOptions?.size != null &&
        !Number.isSafeInteger(tasksQueueOptions?.size)
      }
    }
  
-   private startPool (): void {
-     while (
-       this.workerNodes.reduce(
-         (accumulator, workerNode) =>
-           !workerNode.info.dynamic ? accumulator + 1 : accumulator,
-         0
-       ) < this.numberOfWorkers
-     ) {
-       this.createAndSetupWorkerNode()
-     }
-   }
    /** @inheritDoc */
    public get info (): PoolInfo {
      return {
        version,
        type: this.type,
        worker: this.worker,
+       started: this.started,
        ready: this.ready,
        strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
        minSize: this.minSize,
     * @param workerId - The worker id.
     * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
     */
 -  private getWorkerNodeKeyByWorkerId (workerId: number): number {
 +  private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
      return this.workerNodes.findIndex(
        workerNode => workerNode.info.id === workerId
      )
      return {
        ...{
          size: Math.pow(this.maxSize, 2),
-         concurrency: 1
+         concurrency: 1,
+         taskStealing: true,
+         tasksStealingOnBackPressure: true
        },
        ...tasksQueueOptions
      }
                (this.opts.tasksQueueOptions?.concurrency as number)
          ) === -1
        )
 -    } else {
 -      return (
 -        this.workerNodes.findIndex(
 -          workerNode =>
 -            workerNode.info.ready && workerNode.usage.tasks.executing === 0
 -        ) === -1
 -      )
      }
 +    return (
 +      this.workerNodes.findIndex(
 +        workerNode =>
 +          workerNode.info.ready && workerNode.usage.tasks.executing === 0
 +      ) === -1
 +    )
 +  }
 +
 +  private async sendTaskFunctionOperationToWorker (
 +    workerNodeKey: number,
 +    message: MessageValue<Data>
 +  ): Promise<boolean> {
 +    const workerId = this.getWorkerInfo(workerNodeKey).id as number
 +    return await new Promise<boolean>((resolve, reject) => {
 +      this.registerWorkerMessageListener(workerNodeKey, message => {
 +        if (
 +          message.workerId === workerId &&
 +          message.taskFunctionOperationStatus === true
 +        ) {
 +          resolve(true)
 +        } else if (
 +          message.workerId === workerId &&
 +          message.taskFunctionOperationStatus === false
 +        ) {
 +          reject(
 +            new Error(
 +              `Task function operation ${
 +                message.taskFunctionOperation as string
 +              } failed on worker ${message.workerId}`
 +            )
 +          )
 +        }
 +      })
 +      this.sendToWorker(workerNodeKey, message)
 +    })
 +  }
 +
 +  private async sendTaskFunctionOperationToWorkers (
 +    message: Omit<MessageValue<Data>, 'workerId'>
 +  ): Promise<boolean> {
 +    return await new Promise<boolean>((resolve, reject) => {
 +      const responsesReceived = new Array<MessageValue<Data | Response>>()
 +      for (const [workerNodeKey] of this.workerNodes.entries()) {
 +        this.registerWorkerMessageListener(workerNodeKey, message => {
 +          if (message.taskFunctionOperationStatus != null) {
 +            responsesReceived.push(message)
 +            if (
 +              responsesReceived.length === this.workerNodes.length &&
 +              responsesReceived.every(
 +                message => message.taskFunctionOperationStatus === true
 +              )
 +            ) {
 +              resolve(true)
 +            } else if (
 +              responsesReceived.length === this.workerNodes.length &&
 +              responsesReceived.some(
 +                message => message.taskFunctionOperationStatus === false
 +              )
 +            ) {
 +              reject(
 +                new Error(
 +                  `Task function operation ${
 +                    message.taskFunctionOperation as string
 +                  } failed on worker ${message.workerId as number}`
 +                )
 +              )
 +            }
 +          }
 +        })
 +        this.sendToWorker(workerNodeKey, message)
 +      }
 +    })
    }
  
    /** @inheritDoc */
 -  public listTaskFunctions (): string[] {
 +  public hasTaskFunction (name: string): boolean {
      for (const workerNode of this.workerNodes) {
        if (
 -        Array.isArray(workerNode.info.taskFunctions) &&
 -        workerNode.info.taskFunctions.length > 0
 +        Array.isArray(workerNode.info.taskFunctionNames) &&
 +        workerNode.info.taskFunctionNames.includes(name)
        ) {
 -        return workerNode.info.taskFunctions
 +        return true
 +      }
 +    }
 +    return false
 +  }
 +
 +  /** @inheritDoc */
 +  public async addTaskFunction (
 +    name: string,
 +    taskFunction: TaskFunction<Data, Response>
 +  ): Promise<boolean> {
 +    this.taskFunctions.set(name, taskFunction)
 +    return await this.sendTaskFunctionOperationToWorkers({
 +      taskFunctionOperation: 'add',
 +      taskFunctionName: name,
 +      taskFunction: taskFunction.toString()
 +    })
 +  }
 +
 +  /** @inheritDoc */
 +  public async removeTaskFunction (name: string): Promise<boolean> {
 +    this.taskFunctions.delete(name)
 +    return await this.sendTaskFunctionOperationToWorkers({
 +      taskFunctionOperation: 'remove',
 +      taskFunctionName: name
 +    })
 +  }
 +
 +  /** @inheritDoc */
 +  public listTaskFunctionNames (): string[] {
 +    for (const workerNode of this.workerNodes) {
 +      if (
 +        Array.isArray(workerNode.info.taskFunctionNames) &&
 +        workerNode.info.taskFunctionNames.length > 0
 +      ) {
 +        return workerNode.info.taskFunctionNames
        }
      }
      return []
    }
  
 +  /** @inheritDoc */
 +  public async setDefaultTaskFunction (name: string): Promise<boolean> {
 +    return await this.sendTaskFunctionOperationToWorkers({
 +      taskFunctionOperation: 'default',
 +      taskFunctionName: name
 +    })
 +  }
 +
    private shallExecuteTask (workerNodeKey: number): boolean {
      return (
        this.tasksQueueSize(workerNodeKey) === 0 &&
    ): Promise<Response> {
      return await new Promise<Response>((resolve, reject) => {
        if (!this.started) {
-         reject(new Error('Cannot execute a task on destroyed pool'))
+         reject(new Error('Cannot execute a task on not started pool'))
          return
        }
        if (name != null && typeof name !== 'string') {
          data: data ?? ({} as Data),
          transferList,
          timestamp,
 -        workerId: this.getWorkerInfo(workerNodeKey).id as number,
          taskId: randomUUID()
        }
        this.promiseResponseMap.set(task.taskId as string, {
      })
    }
  
+   /** @inheritdoc */
+   public start (): void {
+     this.starting = true
+     while (
+       this.workerNodes.reduce(
+         (accumulator, workerNode) =>
+           !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+         0
+       ) < this.numberOfWorkers
+     ) {
+       this.createAndSetupWorkerNode()
+     }
+     this.starting = false
+     this.started = true
+   }
    /** @inheritDoc */
    public async destroy (): Promise<void> {
      await Promise.all(
    }
  
    protected async sendKillMessageToWorker (
 -    workerNodeKey: number,
 -    workerId: number
 +    workerNodeKey: number
    ): Promise<void> {
      await new Promise<void>((resolve, reject) => {
        this.registerWorkerMessageListener(workerNodeKey, message => {
          if (message.kill === 'success') {
            resolve()
          } else if (message.kill === 'failure') {
 -          reject(new Error(`Worker ${workerId} kill message handling failed`))
 +          reject(
 +            new Error(
 +              `Worker ${
 +                message.workerId as number
 +              } kill message handling failed`
 +            )
 +          )
          }
        })
 -      this.sendToWorker(workerNodeKey, { kill: true, workerId })
 +      this.sendToWorker(workerNodeKey, { kill: true })
      })
    }
  
      const workerInfo = this.getWorkerInfo(workerNodeKey)
      return (
        workerInfo != null &&
 -      Array.isArray(workerInfo.taskFunctions) &&
 -      workerInfo.taskFunctions.length > 2
 +      Array.isArray(workerInfo.taskFunctionNames) &&
 +      workerInfo.taskFunctionNames.length > 2
      )
    }
  
      ) {
        --workerTaskStatistics.executing
      }
 -    if (message.taskError == null) {
 +    if (message.workerError == null) {
        ++workerTaskStatistics.executed
      } else {
        ++workerTaskStatistics.failed
      workerUsage: WorkerUsage,
      message: MessageValue<Response>
    ): void {
 -    if (message.taskError != null) {
 +    if (message.workerError != null) {
        return
      }
      updateMeasurementStatistics(
      workerUsage: WorkerUsage,
      message: MessageValue<Response>
    ): void {
 -    if (message.taskError != null) {
 +    if (message.workerError != null) {
        return
      }
      const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
      })
      const workerInfo = this.getWorkerInfo(workerNodeKey)
      this.sendToWorker(workerNodeKey, {
 -      checkActive: true,
 -      workerId: workerInfo.id as number
 +      checkActive: true
      })
 +    if (this.taskFunctions.size > 0) {
 +      for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
 +        this.sendTaskFunctionOperationToWorker(workerNodeKey, {
 +          taskFunctionOperation: 'add',
 +          taskFunctionName,
 +          taskFunction: taskFunction.toString()
 +        }).catch(error => {
 +          this.emitter?.emit(PoolEvents.error, error)
 +        })
 +      }
 +    }
      workerInfo.dynamic = true
      if (
        this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
      // Send the statistics message to worker.
      this.sendStatisticsMessageToWorker(workerNodeKey)
      if (this.opts.enableTasksQueue === true) {
-       this.workerNodes[workerNodeKey].onEmptyQueue =
-         this.taskStealingOnEmptyQueue.bind(this)
-       this.workerNodes[workerNodeKey].onBackPressure =
-         this.tasksStealingOnBackPressure.bind(this)
+       if (this.opts.tasksQueueOptions?.taskStealing === true) {
+         this.workerNodes[workerNodeKey].onEmptyQueue =
+           this.taskStealingOnEmptyQueue.bind(this)
+       }
+       if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
+         this.workerNodes[workerNodeKey].onBackPressure =
+           this.tasksStealingOnBackPressure.bind(this)
+       }
      }
    }
  
              .runTime.aggregate,
          elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
            .elu.aggregate
 -      },
 -      workerId: this.getWorkerInfo(workerNodeKey).id as number
 +      }
      })
    }
  
          },
          0
        )
 -      const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
 -      const task = {
 -        ...(this.dequeueTask(workerNodeKey) as Task<Data>),
 -        workerId: destinationWorkerNode.info.id as number
 -      }
 +      const task = this.dequeueTask(workerNodeKey) as Task<Data>
        if (this.shallExecuteTask(destinationWorkerNodeKey)) {
          this.executeTask(destinationWorkerNodeKey, task)
        } else {
  
    private taskStealingOnEmptyQueue (workerId: number): void {
      const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
 -    const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
      const workerNodes = this.workerNodes
        .slice()
        .sort(
          workerNode.usage.tasks.queued > 0
      )
      if (sourceWorkerNode != null) {
 -      const task = {
 -        ...(sourceWorkerNode.popTask() as Task<Data>),
 -        workerId: destinationWorkerNode.info.id as number
 -      }
 +      const task = sourceWorkerNode.popTask() as Task<Data>
        if (this.shallExecuteTask(destinationWorkerNodeKey)) {
          this.executeTask(destinationWorkerNodeKey, task)
        } else {
          workerNode.usage.tasks.queued <
            (this.opts.tasksQueueOptions?.size as number) - sizeOffset
        ) {
 -        const task = {
 -          ...(sourceWorkerNode.popTask() as Task<Data>),
 -          workerId: workerNode.info.id as number
 -        }
 +        const task = sourceWorkerNode.popTask() as Task<Data>
          if (this.shallExecuteTask(workerNodeKey)) {
            this.executeTask(workerNodeKey, task)
          } else {
    protected workerListener (): (message: MessageValue<Response>) => void {
      return message => {
        this.checkMessageWorkerId(message)
 -      if (message.ready != null && message.taskFunctions != null) {
 +      if (message.ready != null && message.taskFunctionNames != null) {
          // Worker ready response received from worker
          this.handleWorkerReadyResponse(message)
        } else if (message.taskId != null) {
          // Task execution response received from worker
          this.handleTaskExecutionResponse(message)
 -      } else if (message.taskFunctions != null) {
 -        // Task functions message received from worker
 +      } else if (message.taskFunctionNames != null) {
 +        // Task function names message received from worker
          this.getWorkerInfo(
            this.getWorkerNodeKeyByWorkerId(message.workerId)
 -        ).taskFunctions = message.taskFunctions
 +        ).taskFunctionNames = message.taskFunctionNames
        }
      }
    }
  
    private handleWorkerReadyResponse (message: MessageValue<Response>): void {
      if (message.ready === false) {
 -      throw new Error(`Worker ${message.workerId} failed to initialize`)
 +      throw new Error(
 +        `Worker ${message.workerId as number} failed to initialize`
 +      )
      }
      const workerInfo = this.getWorkerInfo(
        this.getWorkerNodeKeyByWorkerId(message.workerId)
      )
      workerInfo.ready = message.ready as boolean
 -    workerInfo.taskFunctions = message.taskFunctions
 +    workerInfo.taskFunctionNames = message.taskFunctionNames
      if (this.emitter != null && this.ready) {
        this.emitter.emit(PoolEvents.ready, this.info)
      }
    }
  
    private handleTaskExecutionResponse (message: MessageValue<Response>): void {
 -    const { taskId, taskError, data } = message
 +    const { taskId, workerError, data } = message
      const promiseResponse = this.promiseResponseMap.get(taskId as string)
      if (promiseResponse != null) {
 -      if (taskError != null) {
 -        this.emitter?.emit(PoolEvents.taskError, taskError)
 -        promiseResponse.reject(taskError.message)
 +      if (workerError != null) {
 +        this.emitter?.emit(PoolEvents.taskError, workerError)
 +        promiseResponse.reject(workerError.message)
        } else {
          promiseResponse.resolve(data as Response)
        }
diff --combined src/pools/pool.ts
index cb7580898656ccd58a379edf24cdc55631301f34,b5eec21c714798099076be2dab22f64aec4722ac..ddd4c2a047f9563e2ac8911f6bac0a4ec063ff9e
@@@ -1,6 -1,5 +1,6 @@@
  import { EventEmitter } from 'node:events'
  import { type TransferListItem } from 'node:worker_threads'
 +import type { TaskFunction } from '../worker/task-functions'
  import type {
    ErrorHandler,
    ExitHandler,
@@@ -64,6 -63,7 +64,7 @@@ export interface PoolInfo 
    readonly version: string
    readonly type: PoolType
    readonly worker: WorkerType
+   readonly started: boolean
    readonly ready: boolean
    readonly strategy: WorkerChoiceStrategy
    readonly minSize: number
@@@ -107,16 -107,24 +108,24 @@@ export interface TasksQueueOptions 
     * @defaultValue (pool maximum size)^2
     */
    readonly size?: number
-   /**
-    * @deprecated Use `size` instead.
-    */
-   readonly queueMaxSize?: number
    /**
     * Maximum number of tasks that can be executed concurrently on a worker node.
     *
     * @defaultValue 1
     */
    readonly concurrency?: number
+   /**
+    * Whether to enable task stealing.
+    *
+    * @defaultValue true
+    */
+   readonly taskStealing?: boolean
+   /**
+    * Whether to enable tasks stealing on back pressure.
+    *
+    * @defaultValue true
+    */
+   readonly tasksStealingOnBackPressure?: boolean
  }
  
  /**
@@@ -141,6 -149,12 +150,12 @@@ export interface PoolOptions<Worker ext
     * A function that will listen for exit event on each worker.
     */
    exitHandler?: ExitHandler<Worker>
+   /**
+    * Whether to start the minimum number of workers at pool initialization.
+    *
+    * @defaultValue false
+    */
+   startWorkers?: boolean
    /**
     * The worker choice strategy to use in this pool.
     *
@@@ -230,49 -244,20 +245,53 @@@ export interface IPool
      name?: string,
      transferList?: TransferListItem[]
    ) => Promise<Response>
+   /**
+    * Starts the minimum number of workers in this pool.
+    */
+   readonly start: () => void
    /**
     * Terminates all workers in this pool.
     */
    readonly destroy: () => Promise<void>
 +  /**
 +   * Whether the specified task function exists in this pool.
 +   *
 +   * @param name - The name of the task function.
 +   * @returns `true` if the task function exists, `false` otherwise.
 +   */
 +  readonly hasTaskFunction: (name: string) => boolean
 +  /**
 +   * Adds a task function to this pool.
 +   * If a task function with the same name already exists, it will be overwritten.
 +   *
 +   * @param name - The name of the task function.
 +   * @param taskFunction - The task function.
 +   * @returns `true` if the task function was added, `false` otherwise.
 +   */
 +  readonly addTaskFunction: (
 +    name: string,
 +    taskFunction: TaskFunction<Data, Response>
 +  ) => Promise<boolean>
 +  /**
 +   * Removes a task function from this pool.
 +   *
 +   * @param name - The name of the task function.
 +   * @returns `true` if the task function was removed, `false` otherwise.
 +   */
 +  readonly removeTaskFunction: (name: string) => Promise<boolean>
    /**
     * Lists the names of task function available in this pool.
     *
     * @returns The names of task function available in this pool.
     */
 -  readonly listTaskFunctions: () => string[]
 +  readonly listTaskFunctionNames: () => string[]
 +  /**
 +   * Sets the default task function in this pool.
 +   *
 +   * @param name - The name of the task function.
 +   * @returns `true` if the default task function was set, `false` otherwise.
 +   */
 +  readonly setDefaultTaskFunction: (name: string) => Promise<boolean>
    /**
     * Sets the worker choice strategy in this pool.
     *
diff --combined src/pools/worker-node.ts
index 78756560c0fbaa78fedb96aad7153a21d14e61ff,68c8cab2ef97070ccea235dfd391ed5479978120..ed6677027f810474aa9aa01e768fd5f483250679
@@@ -46,6 -46,7 +46,7 @@@ implements IWorkerNode<Worker, Data> 
    /** @inheritdoc */
    public onEmptyQueue?: WorkerNodeEventCallback
    private readonly tasksQueue: Deque<Task<Data>>
+   private onBackPressureStarted: boolean
    private onEmptyQueueCount: number
    private readonly taskFunctionsUsage: Map<string, WorkerUsage>
  
@@@ -65,6 -66,7 +66,7 @@@
      }
      this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
      this.tasksQueue = new Deque<Task<Data>>()
+     this.onBackPressureStarted = false
      this.onEmptyQueueCount = 0
      this.taskFunctionsUsage = new Map<string, WorkerUsage>()
    }
    /** @inheritdoc */
    public enqueueTask (task: Task<Data>): number {
      const tasksQueueSize = this.tasksQueue.push(task)
-     if (this.onBackPressure != null && this.hasBackPressure()) {
+     if (
+       this.onBackPressure != null &&
+       this.hasBackPressure() &&
+       !this.onBackPressureStarted
+     ) {
+       this.onBackPressureStarted = true
        this.onBackPressure(this.info.id as number)
+       this.onBackPressureStarted = false
      }
      return tasksQueueSize
    }
    /** @inheritdoc */
    public unshiftTask (task: Task<Data>): number {
      const tasksQueueSize = this.tasksQueue.unshift(task)
-     if (this.onBackPressure != null && this.hasBackPressure()) {
+     if (
+       this.onBackPressure != null &&
+       this.hasBackPressure() &&
+       !this.onBackPressureStarted
+     ) {
+       this.onBackPressureStarted = true
        this.onBackPressure(this.info.id as number)
+       this.onBackPressureStarted = false
      }
      return tasksQueueSize
    }
    /** @inheritdoc */
    public dequeueTask (): Task<Data> | undefined {
      const task = this.tasksQueue.shift()
-     if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+     if (
+       this.onEmptyQueue != null &&
+       this.tasksQueue.size === 0 &&
+       this.onEmptyQueueCount === 0
+     ) {
        this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
      }
      return task
    /** @inheritdoc */
    public popTask (): Task<Data> | undefined {
      const task = this.tasksQueue.pop()
-     if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+     if (
+       this.onEmptyQueue != null &&
+       this.tasksQueue.size === 0 &&
+       this.onEmptyQueueCount === 0
+     ) {
        this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
      }
      return task
  
    /** @inheritdoc */
    public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
 -    if (!Array.isArray(this.info.taskFunctions)) {
 +    if (!Array.isArray(this.info.taskFunctionNames)) {
        throw new Error(
          `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
        )
      }
      if (
 -      Array.isArray(this.info.taskFunctions) &&
 -      this.info.taskFunctions.length < 3
 +      Array.isArray(this.info.taskFunctionNames) &&
 +      this.info.taskFunctionNames.length < 3
      ) {
        throw new Error(
          `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
        )
      }
      if (name === DEFAULT_TASK_NAME) {
 -      name = this.info.taskFunctions[1]
 +      name = this.info.taskFunctionNames[1]
      }
      if (!this.taskFunctionsUsage.has(name)) {
        this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
        this.onEmptyQueueCount = 0
        return
      }
-     (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number)
      ++this.onEmptyQueueCount
+     this.onEmptyQueue?.(this.info.id as number)
      await sleep(exponentialDelay(this.onEmptyQueueCount))
      await this.startOnEmptyQueue()
    }
        for (const task of this.tasksQueue) {
          if (
            (task.name === DEFAULT_TASK_NAME &&
 -            name === (this.info.taskFunctions as string[])[1]) ||
 +            name === (this.info.taskFunctionNames as string[])[1]) ||
            (task.name !== DEFAULT_TASK_NAME && name === task.name)
          ) {
            ++taskFunctionQueueSize
index f2b50667106643134a1a84d9b7be2ef8004e4404,719acd766781fed8c6f9c3e46b4cc82d1a3e28a2..46f48be526ca3832fd9f46dcde13bb9779f86eea
@@@ -16,6 -16,7 +16,7 @@@ const { Deque } = require('../../../lib
  const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
  const { version } = require('../../../package.json')
  const { waitPoolEvents } = require('../../test-utils')
+ const { WorkerNode } = require('../../../lib/pools/worker-node')
  
  describe('Abstract pool test suite', () => {
    const numberOfWorkers = 2
        './tests/worker-files/thread/testWorker.js'
      )
      expect(pool.emitter).toBeInstanceOf(EventEmitter)
-     expect(pool.opts.enableEvents).toBe(true)
-     expect(pool.opts.restartWorkerOnError).toBe(true)
-     expect(pool.opts.enableTasksQueue).toBe(false)
-     expect(pool.opts.tasksQueueOptions).toBeUndefined()
-     expect(pool.opts.workerChoiceStrategy).toBe(
-       WorkerChoiceStrategies.ROUND_ROBIN
-     )
-     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
-       retries: 6,
-       runTime: { median: false },
-       waitTime: { median: false },
-       elu: { median: false }
+     expect(pool.opts).toStrictEqual({
+       startWorkers: true,
+       enableEvents: true,
+       restartWorkerOnError: true,
+       enableTasksQueue: false,
+       workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+       workerChoiceStrategyOptions: {
+         retries: 6,
+         runTime: { median: false },
+         waitTime: { median: false },
+         elu: { median: false }
+       }
      })
      expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
        retries: 6,
          elu: { median: false }
        })
      }
-     expect(pool.opts.messageHandler).toBeUndefined()
-     expect(pool.opts.errorHandler).toBeUndefined()
-     expect(pool.opts.onlineHandler).toBeUndefined()
-     expect(pool.opts.exitHandler).toBeUndefined()
      await pool.destroy()
      const testHandler = () => console.info('test handler executed')
      pool = new FixedThreadPool(
        }
      )
      expect(pool.emitter).toBeUndefined()
-     expect(pool.opts.enableEvents).toBe(false)
-     expect(pool.opts.restartWorkerOnError).toBe(false)
-     expect(pool.opts.enableTasksQueue).toBe(true)
-     expect(pool.opts.tasksQueueOptions).toStrictEqual({
-       concurrency: 2,
-       size: 4
-     })
-     expect(pool.opts.workerChoiceStrategy).toBe(
-       WorkerChoiceStrategies.LEAST_USED
-     )
-     expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
-       retries: 6,
-       runTime: { median: true },
-       waitTime: { median: false },
-       elu: { median: false },
-       weights: { 0: 300, 1: 200 }
+     expect(pool.opts).toStrictEqual({
+       startWorkers: true,
+       enableEvents: false,
+       restartWorkerOnError: false,
+       enableTasksQueue: true,
+       tasksQueueOptions: {
+         concurrency: 2,
+         size: 4,
+         taskStealing: true,
+         tasksStealingOnBackPressure: true
+       },
+       workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
+       workerChoiceStrategyOptions: {
+         retries: 6,
+         runTime: { median: true },
+         waitTime: { median: false },
+         elu: { median: false },
+         weights: { 0: 300, 1: 200 }
+       },
+       onlineHandler: testHandler,
+       messageHandler: testHandler,
+       errorHandler: testHandler,
+       exitHandler: testHandler
      })
      expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
        retries: 6,
          weights: { 0: 300, 1: 200 }
        })
      }
-     expect(pool.opts.messageHandler).toStrictEqual(testHandler)
-     expect(pool.opts.errorHandler).toStrictEqual(testHandler)
-     expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
-     expect(pool.opts.exitHandler).toStrictEqual(testHandler)
      await pool.destroy()
    })
  
      ).toThrowError(
        new TypeError('Invalid worker node tasks concurrency: must be an integer')
      )
-     expect(
-       () =>
-         new FixedThreadPool(
-           numberOfWorkers,
-           './tests/worker-files/thread/testWorker.js',
-           {
-             enableTasksQueue: true,
-             tasksQueueOptions: { queueMaxSize: 2 }
-           }
-         )
-     ).toThrowError(
-       new Error(
-         'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
-       )
-     )
      expect(
        () =>
          new FixedThreadPool(
      expect(pool.opts.enableTasksQueue).toBe(true)
      expect(pool.opts.tasksQueueOptions).toStrictEqual({
        concurrency: 1,
-       size: 4
+       size: 4,
+       taskStealing: true,
+       tasksStealingOnBackPressure: true
      })
      pool.enableTasksQueue(true, { concurrency: 2 })
      expect(pool.opts.enableTasksQueue).toBe(true)
      expect(pool.opts.tasksQueueOptions).toStrictEqual({
        concurrency: 2,
-       size: 4
+       size: 4,
+       taskStealing: true,
+       tasksStealingOnBackPressure: true
      })
      pool.enableTasksQueue(false)
      expect(pool.opts.enableTasksQueue).toBe(false)
      )
      expect(pool.opts.tasksQueueOptions).toStrictEqual({
        concurrency: 1,
-       size: 4
+       size: 4,
+       taskStealing: true,
+       tasksStealingOnBackPressure: true
      })
      pool.setTasksQueueOptions({ concurrency: 2 })
      expect(pool.opts.tasksQueueOptions).toStrictEqual({
        concurrency: 2,
-       size: 4
+       size: 4,
+       taskStealing: true,
+       tasksStealingOnBackPressure: true
      })
      expect(() =>
        pool.setTasksQueueOptions('invalidTasksQueueOptions')
      expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
        new TypeError('Invalid worker node tasks concurrency: must be an integer')
      )
-     expect(() => pool.setTasksQueueOptions({ queueMaxSize: 2 })).toThrowError(
-       new Error(
-         'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
-       )
-     )
      expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
        new RangeError(
          'Invalid worker node tasks queue size: 0 is a negative integer or zero'
        version,
        type: PoolTypes.fixed,
        worker: WorkerTypes.thread,
+       started: true,
        ready: true,
        strategy: WorkerChoiceStrategies.ROUND_ROBIN,
        minSize: numberOfWorkers,
        version,
        type: PoolTypes.dynamic,
        worker: WorkerTypes.cluster,
+       started: true,
        ready: true,
        strategy: WorkerChoiceStrategies.ROUND_ROBIN,
        minSize: Math.floor(numberOfWorkers / 2),
        './tests/worker-files/cluster/testWorker.js'
      )
      for (const workerNode of pool.workerNodes) {
+       expect(workerNode).toBeInstanceOf(WorkerNode)
        expect(workerNode.usage).toStrictEqual({
          tasks: {
            executed: 0,
        './tests/worker-files/cluster/testWorker.js'
      )
      for (const workerNode of pool.workerNodes) {
-       expect(workerNode.tasksQueue).toBeDefined()
+       expect(workerNode).toBeInstanceOf(WorkerNode)
        expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
        expect(workerNode.tasksQueue.size).toBe(0)
        expect(workerNode.tasksQueue.maxSize).toBe(0)
        './tests/worker-files/thread/testWorker.js'
      )
      for (const workerNode of pool.workerNodes) {
-       expect(workerNode.tasksQueue).toBeDefined()
+       expect(workerNode).toBeInstanceOf(WorkerNode)
        expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
        expect(workerNode.tasksQueue.size).toBe(0)
        expect(workerNode.tasksQueue.maxSize).toBe(0)
        './tests/worker-files/cluster/testWorker.js'
      )
      for (const workerNode of pool.workerNodes) {
+       expect(workerNode).toBeInstanceOf(WorkerNode)
        expect(workerNode.info).toStrictEqual({
          id: expect.any(Number),
          type: WorkerTypes.cluster,
        './tests/worker-files/thread/testWorker.js'
      )
      for (const workerNode of pool.workerNodes) {
+       expect(workerNode).toBeInstanceOf(WorkerNode)
        expect(workerNode.info).toStrictEqual({
          id: expect.any(Number),
          type: WorkerTypes.thread,
      await pool.destroy()
    })
  
+   it('Verify that pool can be started after initialization', async () => {
+     const pool = new FixedClusterPool(
+       numberOfWorkers,
+       './tests/worker-files/cluster/testWorker.js',
+       {
+         startWorkers: false
+       }
+     )
+     expect(pool.info.started).toBe(false)
+     expect(pool.info.ready).toBe(false)
+     expect(pool.workerNodes).toStrictEqual([])
+     await expect(pool.execute()).rejects.toThrowError(
+       new Error('Cannot execute a task on not started pool')
+     )
+     pool.start()
+     expect(pool.info.started).toBe(true)
+     expect(pool.info.ready).toBe(true)
+     expect(pool.workerNodes.length).toBe(numberOfWorkers)
+     for (const workerNode of pool.workerNodes) {
+       expect(workerNode).toBeInstanceOf(WorkerNode)
+     }
+     await pool.destroy()
+   })
    it('Verify that pool execute() arguments are checked', async () => {
      const pool = new FixedClusterPool(
        numberOfWorkers,
        "Task function 'unknown' not found"
      )
      await pool.destroy()
-     await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
-       new Error('Cannot execute a task on destroyed pool')
+     await expect(pool.execute()).rejects.toThrowError(
+       new Error('Cannot execute a task on not started pool')
      )
    })
  
        version,
        type: PoolTypes.dynamic,
        worker: WorkerTypes.cluster,
+       started: true,
        ready: true,
        strategy: WorkerChoiceStrategies.ROUND_ROBIN,
        minSize: expect.any(Number),
        version,
        type: PoolTypes.fixed,
        worker: WorkerTypes.thread,
-       ready: expect.any(Boolean),
+       started: true,
+       ready: true,
        strategy: WorkerChoiceStrategies.ROUND_ROBIN,
        minSize: expect.any(Number),
        maxSize: expect.any(Number),
        version,
        type: PoolTypes.dynamic,
        worker: WorkerTypes.thread,
-       ready: expect.any(Boolean),
+       started: true,
+       ready: true,
        strategy: WorkerChoiceStrategies.ROUND_ROBIN,
        minSize: expect.any(Number),
        maxSize: expect.any(Number),
        version,
        type: PoolTypes.fixed,
        worker: WorkerTypes.thread,
-       ready: expect.any(Boolean),
+       started: true,
+       ready: true,
        strategy: WorkerChoiceStrategies.ROUND_ROBIN,
        minSize: expect.any(Number),
        maxSize: expect.any(Number),
        './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
      )
      await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
 -    expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
 +    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
        DEFAULT_TASK_NAME,
        'jsonIntegerSerialization',
        'factorial',
        './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
      )
      await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
 -    expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
 +    expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
        DEFAULT_TASK_NAME,
        'jsonIntegerSerialization',
        'factorial',
      expect(pool.info.executingTasks).toBe(0)
      expect(pool.info.executedTasks).toBe(4)
      for (const workerNode of pool.workerNodes) {
 -      expect(workerNode.info.taskFunctions).toStrictEqual([
 +      expect(workerNode.info.taskFunctionNames).toStrictEqual([
          DEFAULT_TASK_NAME,
          'jsonIntegerSerialization',
          'factorial',
          'fibonacci'
        ])
        expect(workerNode.taskFunctionsUsage.size).toBe(3)
 -      for (const name of pool.listTaskFunctions()) {
 +      for (const name of pool.listTaskFunctionNames()) {
          expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
            tasks: {
              executed: expect.any(Number),
        expect(
          workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
        ).toStrictEqual(
 -        workerNode.getTaskFunctionWorkerUsage(workerNode.info.taskFunctions[1])
 +        workerNode.getTaskFunctionWorkerUsage(
 +          workerNode.info.taskFunctionNames[1]
 +        )
        )
      }
      await pool.destroy()