build(deps-dev): apply updates
[poolifier.git] / src / pools / abstract-pool.ts
index b43d44a59eea1cb1c7990e660b96cb4cd5483a03..bc77874ff423bb3ee112f6150db81962ae5d6e69 100644 (file)
@@ -1,48 +1,64 @@
 import { randomUUID } from 'node:crypto'
 import { performance } from 'node:perf_hooks'
-import { existsSync } from 'node:fs'
+import type { TransferListItem } from 'node:worker_threads'
+import { EventEmitterAsyncResource } from 'node:events'
+import { AsyncResource } from 'node:async_hooks'
 import type {
   MessageValue,
   PromiseResponseWrapper,
   Task
-} from '../utility-types'
+} from '../utility-types.js'
 import {
   DEFAULT_TASK_NAME,
-  DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
+  average,
+  exponentialDelay,
   isKillBehavior,
   isPlainObject,
+  max,
   median,
-  round
-} from '../utils'
-import { KillBehaviors } from '../worker/worker-options'
+  min,
+  round,
+  sleep
+} from '../utils.js'
+import { KillBehaviors } from '../worker/worker-options.js'
+import type { TaskFunction } from '../worker/task-functions.js'
 import {
   type IPool,
-  PoolEmitter,
   PoolEvents,
   type PoolInfo,
   type PoolOptions,
   type PoolType,
   PoolTypes,
   type TasksQueueOptions
-} from './pool'
+} from './pool.js'
 import type {
   IWorker,
   IWorkerNode,
-  MessageHandler,
   WorkerInfo,
-  WorkerType,
-  WorkerUsage
-} from './worker'
+  WorkerNodeEventDetail,
+  WorkerType
+} from './worker.js'
 import {
   Measurements,
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy,
   type WorkerChoiceStrategyOptions
-} from './selection-strategies/selection-strategies-types'
-import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
-import { version } from './version'
-import { WorkerNode } from './worker-node'
+} from './selection-strategies/selection-strategies-types.js'
+import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+import { version } from './version.js'
+import { WorkerNode } from './worker-node.js'
+import {
+  checkFilePath,
+  checkValidTasksQueueOptions,
+  checkValidWorkerChoiceStrategy,
+  getDefaultTasksQueueOptions,
+  updateEluWorkerUsage,
+  updateRunTimeWorkerUsage,
+  updateTaskStatisticsWorkerUsage,
+  updateWaitTimeWorkerUsage,
+  waitWorkerNodeEvents
+} from './utils.js'
 
 /**
  * Base class that implements some shared logic for all poolifier pools.
@@ -60,30 +76,50 @@ export abstract class AbstractPool<
   public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
 
   /** @inheritDoc */
-  public readonly emitter?: PoolEmitter
+  public emitter?: EventEmitterAsyncResource
 
   /**
-   * The execution response promise map.
-   *
+   * The task execution response promise map:
    * - `key`: The message id of each submitted task.
    * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
    *
    * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
    */
-  protected promiseResponseMap: Map<
-  string,
-  PromiseResponseWrapper<Worker, Response>
-  > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
+  protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
+    new Map<string, PromiseResponseWrapper<Response>>()
 
   /**
    * Worker choice strategy context referencing a worker choice algorithm implementation.
    */
-  protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+  protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
   Worker,
   Data,
   Response
   >
 
+  /**
+   * The task functions added at runtime map:
+   * - `key`: The task function name.
+   * - `value`: The task function itself.
+   */
+  private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
+
+  /**
+   * Whether the pool is started or not.
+   */
+  private started: boolean
+  /**
+   * Whether the pool is starting or not.
+   */
+  private starting: boolean
+  /**
+   * Whether the pool is destroying or not.
+   */
+  private destroying: boolean
+  /**
+   * Whether the pool ready event has been emitted or not.
+   */
+  private readyEventEmitted: boolean
   /**
    * The start timestamp of the pool.
    */
@@ -92,29 +128,33 @@ export abstract class AbstractPool<
   /**
    * Constructs a new poolifier pool.
    *
-   * @param numberOfWorkers - Number of workers that this pool should manage.
+   * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
    * @param filePath - Path to the worker file.
    * @param opts - Options for the pool.
+   * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
    */
   public constructor (
-    protected readonly numberOfWorkers: number,
+    protected readonly minimumNumberOfWorkers: number,
     protected readonly filePath: string,
-    protected readonly opts: PoolOptions<Worker>
+    protected readonly opts: PoolOptions<Worker>,
+    protected readonly maximumNumberOfWorkers?: number
   ) {
     if (!this.isMain()) {
-      throw new Error('Cannot start a pool from a worker!')
+      throw new Error(
+        'Cannot start a pool from a worker with the same type as the pool'
+      )
     }
-    this.checkNumberOfWorkers(this.numberOfWorkers)
-    this.checkFilePath(this.filePath)
+    this.checkPoolType()
+    checkFilePath(this.filePath)
+    this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
     this.checkPoolOptions(this.opts)
 
     this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
     this.executeTask = this.executeTask.bind(this)
     this.enqueueTask = this.enqueueTask.bind(this)
-    this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
 
     if (this.opts.enableEvents === true) {
-      this.emitter = new PoolEmitter()
+      this.initializeEventEmitter()
     }
     this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
     Worker,
@@ -128,88 +168,66 @@ export abstract class AbstractPool<
 
     this.setupHook()
 
-    while (
-      this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          !workerNode.info.dynamic ? accumulator + 1 : accumulator,
-        0
-      ) < this.numberOfWorkers
-    ) {
-      this.createAndSetupWorker()
+    this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+
+    this.started = false
+    this.starting = false
+    this.destroying = false
+    this.readyEventEmitted = false
+    if (this.opts.startWorkers === true) {
+      this.start()
     }
 
     this.startTimestamp = performance.now()
   }
 
-  private checkFilePath (filePath: string): void {
-    if (
-      filePath == null ||
-      typeof filePath !== 'string' ||
-      (typeof filePath === 'string' && filePath.trim().length === 0)
-    ) {
-      throw new Error('Please specify a file with a worker implementation')
-    }
-    if (!existsSync(filePath)) {
-      throw new Error(`Cannot find the worker file '${filePath}'`)
+  private checkPoolType (): void {
+    if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
+      throw new Error(
+        'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+      )
     }
   }
 
-  private checkNumberOfWorkers (numberOfWorkers: number): void {
-    if (numberOfWorkers == null) {
+  private checkMinimumNumberOfWorkers (
+    minimumNumberOfWorkers: number | undefined
+  ): void {
+    if (minimumNumberOfWorkers == null) {
       throw new Error(
         'Cannot instantiate a pool without specifying the number of workers'
       )
-    } else if (!Number.isSafeInteger(numberOfWorkers)) {
+    } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
       throw new TypeError(
         'Cannot instantiate a pool with a non safe integer number of workers'
       )
-    } else if (numberOfWorkers < 0) {
+    } else if (minimumNumberOfWorkers < 0) {
       throw new RangeError(
         'Cannot instantiate a pool with a negative number of workers'
       )
-    } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
+    } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
       throw new RangeError('Cannot instantiate a fixed pool with zero worker')
     }
   }
 
-  protected checkDynamicPoolSize (min: number, max: number): void {
-    if (this.type === PoolTypes.dynamic) {
-      if (min > max) {
-        throw new RangeError(
-          'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
-        )
-      } else if (max === 0) {
-        throw new RangeError(
-          'Cannot instantiate a dynamic pool with a pool size equal to zero'
-        )
-      } else if (min === max) {
-        throw new RangeError(
-          'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
-        )
-      }
-    }
-  }
-
   private checkPoolOptions (opts: PoolOptions<Worker>): void {
     if (isPlainObject(opts)) {
+      this.opts.startWorkers = opts.startWorkers ?? true
+      checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
       this.opts.workerChoiceStrategy =
         opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
-      this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
-      this.opts.workerChoiceStrategyOptions =
-        opts.workerChoiceStrategyOptions ??
-        DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
       this.checkValidWorkerChoiceStrategyOptions(
-        this.opts.workerChoiceStrategyOptions
+        opts.workerChoiceStrategyOptions
       )
+      if (opts.workerChoiceStrategyOptions != null) {
+        this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
+      }
       this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
       this.opts.enableEvents = opts.enableEvents ?? true
       this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
       if (this.opts.enableTasksQueue) {
-        this.checkValidTasksQueueOptions(
-          opts.tasksQueueOptions as TasksQueueOptions
-        )
+        checkValidTasksQueueOptions(opts.tasksQueueOptions)
         this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
-          opts.tasksQueueOptions as TasksQueueOptions
+          opts.tasksQueueOptions
         )
       }
     } else {
@@ -217,34 +235,28 @@ export abstract class AbstractPool<
     }
   }
 
-  private checkValidWorkerChoiceStrategy (
-    workerChoiceStrategy: WorkerChoiceStrategy
-  ): void {
-    if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
-      throw new Error(
-        `Invalid worker choice strategy '${workerChoiceStrategy}'`
-      )
-    }
-  }
-
   private checkValidWorkerChoiceStrategyOptions (
-    workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+    workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
   ): void {
-    if (!isPlainObject(workerChoiceStrategyOptions)) {
+    if (
+      workerChoiceStrategyOptions != null &&
+      !isPlainObject(workerChoiceStrategyOptions)
+    ) {
       throw new TypeError(
         'Invalid worker choice strategy options: must be a plain object'
       )
     }
     if (
-      workerChoiceStrategyOptions.weights != null &&
-      Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
+      workerChoiceStrategyOptions?.weights != null &&
+      Object.keys(workerChoiceStrategyOptions.weights).length !==
+        (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
     ) {
       throw new Error(
         'Invalid worker choice strategy options: must have a weight for each worker node'
       )
     }
     if (
-      workerChoiceStrategyOptions.measurement != null &&
+      workerChoiceStrategyOptions?.measurement != null &&
       !Object.values(Measurements).includes(
         workerChoiceStrategyOptions.measurement
       )
@@ -255,28 +267,10 @@ export abstract class AbstractPool<
     }
   }
 
-  private checkValidTasksQueueOptions (
-    tasksQueueOptions: TasksQueueOptions
-  ): void {
-    if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
-      throw new TypeError('Invalid tasks queue options: must be a plain object')
-    }
-    if (
-      tasksQueueOptions?.concurrency != null &&
-      !Number.isSafeInteger(tasksQueueOptions.concurrency)
-    ) {
-      throw new TypeError(
-        'Invalid worker tasks concurrency: must be an integer'
-      )
-    }
-    if (
-      tasksQueueOptions?.concurrency != null &&
-      tasksQueueOptions.concurrency <= 0
-    ) {
-      throw new Error(
-        `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
-      )
-    }
+  private initializeEventEmitter (): void {
+    this.emitter = new EventEmitterAsyncResource({
+      name: `poolifier:${this.type}-${this.worker}-pool`
+    })
   }
 
   /** @inheritDoc */
@@ -285,14 +279,19 @@ export abstract class AbstractPool<
       version,
       type: this.type,
       worker: this.worker,
+      started: this.started,
       ready: this.ready,
-      strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
-      minSize: this.minSize,
-      maxSize: this.maxSize,
-      ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-        .runTime.aggregate &&
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      strategy: this.opts.workerChoiceStrategy!,
+      strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
+      minSize: this.minimumNumberOfWorkers,
+      maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
+      ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+        .runTime.aggregate === true &&
         this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-          .waitTime.aggregate && { utilization: round(this.utilization) }),
+          .waitTime.aggregate && {
+        utilization: round(this.utilization)
+      }),
       workerNodes: this.workerNodes.length,
       idleWorkerNodes: this.workerNodes.reduce(
         (accumulator, workerNode) =>
@@ -301,9 +300,16 @@ export abstract class AbstractPool<
             : accumulator,
         0
       ),
+      ...(this.opts.enableTasksQueue === true && {
+        stealingWorkerNodes: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            workerNode.info.stealing ? accumulator + 1 : accumulator,
+          0
+        )
+      }),
       busyWorkerNodes: this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
+        (accumulator, _workerNode, workerNodeKey) =>
+          this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
         0
       ),
       executedTasks: this.workerNodes.reduce(
@@ -316,97 +322,115 @@ export abstract class AbstractPool<
           accumulator + workerNode.usage.tasks.executing,
         0
       ),
-      queuedTasks: this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          accumulator + workerNode.usage.tasks.queued,
-        0
-      ),
-      maxQueuedTasks: this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
-        0
-      ),
+      ...(this.opts.enableTasksQueue === true && {
+        queuedTasks: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            accumulator + workerNode.usage.tasks.queued,
+          0
+        )
+      }),
+      ...(this.opts.enableTasksQueue === true && {
+        maxQueuedTasks: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
+          0
+        )
+      }),
+      ...(this.opts.enableTasksQueue === true && {
+        backPressure: this.hasBackPressure()
+      }),
+      ...(this.opts.enableTasksQueue === true && {
+        stolenTasks: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            accumulator + workerNode.usage.tasks.stolen,
+          0
+        )
+      }),
       failedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
           accumulator + workerNode.usage.tasks.failed,
         0
       ),
-      ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-        .runTime.aggregate && {
+      ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+        .runTime.aggregate === true && {
         runTime: {
           minimum: round(
-            Math.min(
+            min(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.runTime?.minimum ?? Infinity
+                workerNode => workerNode.usage.runTime.minimum ?? Infinity
               )
             )
           ),
           maximum: round(
-            Math.max(
+            max(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
+                workerNode => workerNode.usage.runTime.maximum ?? -Infinity
               )
             )
           ),
-          average: round(
-            this.workerNodes.reduce(
-              (accumulator, workerNode) =>
-                accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
-              0
-            ) /
-              this.workerNodes.reduce(
-                (accumulator, workerNode) =>
-                  accumulator + (workerNode.usage.tasks?.executed ?? 0),
-                0
+          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+            .runTime.average && {
+            average: round(
+              average(
+                this.workerNodes.reduce<number[]>(
+                  (accumulator, workerNode) =>
+                    accumulator.concat(workerNode.usage.runTime.history),
+                  []
+                )
               )
-          ),
+            )
+          }),
           ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
             .runTime.median && {
             median: round(
               median(
-                this.workerNodes.map(
-                  workerNode => workerNode.usage.runTime?.median ?? 0
+                this.workerNodes.reduce<number[]>(
+                  (accumulator, workerNode) =>
+                    accumulator.concat(workerNode.usage.runTime.history),
+                  []
                 )
               )
             )
           })
         }
       }),
-      ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-        .waitTime.aggregate && {
+      ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+        .waitTime.aggregate === true && {
         waitTime: {
           minimum: round(
-            Math.min(
+            min(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
+                workerNode => workerNode.usage.waitTime.minimum ?? Infinity
               )
             )
           ),
           maximum: round(
-            Math.max(
+            max(
               ...this.workerNodes.map(
-                workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
+                workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
               )
             )
           ),
-          average: round(
-            this.workerNodes.reduce(
-              (accumulator, workerNode) =>
-                accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
-              0
-            ) /
-              this.workerNodes.reduce(
-                (accumulator, workerNode) =>
-                  accumulator + (workerNode.usage.tasks?.executed ?? 0),
-                0
+          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+            .waitTime.average && {
+            average: round(
+              average(
+                this.workerNodes.reduce<number[]>(
+                  (accumulator, workerNode) =>
+                    accumulator.concat(workerNode.usage.waitTime.history),
+                  []
+                )
               )
-          ),
+            )
+          }),
           ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
             .waitTime.median && {
             median: round(
               median(
-                this.workerNodes.map(
-                  workerNode => workerNode.usage.waitTime?.median ?? 0
+                this.workerNodes.reduce<number[]>(
+                  (accumulator, workerNode) =>
+                    accumulator.concat(workerNode.usage.waitTime.history),
+                  []
                 )
               )
             )
@@ -416,17 +440,13 @@ export abstract class AbstractPool<
     }
   }
 
-  private get starting (): boolean {
-    return (
-      this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          !workerNode.info.dynamic ? accumulator + 1 : accumulator,
-        0
-      ) < this.minSize
-    )
-  }
-
+  /**
+   * The pool readiness boolean status.
+   */
   private get ready (): boolean {
+    if (this.empty) {
+      return false
+    }
     return (
       this.workerNodes.reduce(
         (accumulator, workerNode) =>
@@ -434,75 +454,61 @@ export abstract class AbstractPool<
             ? accumulator + 1
             : accumulator,
         0
-      ) >= this.minSize
+      ) >= this.minimumNumberOfWorkers
     )
   }
 
   /**
-   * Gets the approximate pool utilization.
+   * The pool emptiness boolean status.
+   */
+  protected get empty (): boolean {
+    return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
+  }
+
+  /**
+   * The approximate pool utilization.
    *
    * @returns The pool utilization.
    */
   private get utilization (): number {
     const poolTimeCapacity =
-      (performance.now() - this.startTimestamp) * this.maxSize
+      (performance.now() - this.startTimestamp) *
+      (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
     const totalTasksRunTime = this.workerNodes.reduce(
       (accumulator, workerNode) =>
-        accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
+        accumulator + (workerNode.usage.runTime.aggregate ?? 0),
       0
     )
     const totalTasksWaitTime = this.workerNodes.reduce(
       (accumulator, workerNode) =>
-        accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
+        accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
       0
     )
     return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
   }
 
   /**
-   * Pool type.
+   * The pool type.
    *
    * If it is `'dynamic'`, it provides the `max` property.
    */
   protected abstract get type (): PoolType
 
   /**
-   * Gets the worker type.
+   * The worker type.
    */
   protected abstract get worker (): WorkerType
 
-  /**
-   * Pool minimum size.
-   */
-  protected abstract get minSize (): number
-
-  /**
-   * Pool maximum size.
-   */
-  protected abstract get maxSize (): number
-
-  /**
-   * Get the worker given its id.
-   *
-   * @param workerId - The worker id.
-   * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
-   */
-  private getWorkerById (workerId: number): Worker | undefined {
-    return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
-      ?.worker
-  }
-
   /**
    * Checks if the worker id sent in the received message from a worker is valid.
    *
    * @param message - The received message.
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
    */
-  private checkMessageWorkerId (message: MessageValue<Response>): void {
-    if (
-      message.workerId != null &&
-      this.getWorkerById(message.workerId) == null
-    ) {
+  private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
+    if (message.workerId == null) {
+      throw new Error('Worker message received without worker id')
+    } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
       throw new Error(
         `Worker message received from unknown worker '${message.workerId}'`
       )
@@ -510,14 +516,14 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Gets the given worker its worker node key.
+   * Gets the worker node key given its worker id.
    *
-   * @param worker - The worker.
-   * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
+   * @param workerId - The worker id.
+   * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
    */
-  private getWorkerNodeKey (worker: Worker): number {
+  private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
     return this.workerNodes.findIndex(
-      workerNode => workerNode.worker === worker
+      workerNode => workerNode.info.id === workerId
     )
   }
 
@@ -526,27 +532,29 @@ export abstract class AbstractPool<
     workerChoiceStrategy: WorkerChoiceStrategy,
     workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
   ): void {
-    this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
+    checkValidWorkerChoiceStrategy(workerChoiceStrategy)
     this.opts.workerChoiceStrategy = workerChoiceStrategy
-    this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
+    this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
       this.opts.workerChoiceStrategy
     )
     if (workerChoiceStrategyOptions != null) {
       this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
     }
-    for (const workerNode of this.workerNodes) {
+    for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
       workerNode.resetUsage()
-      this.setWorkerStatistics(workerNode.worker)
+      this.sendStatisticsMessageToWorker(workerNodeKey)
     }
   }
 
   /** @inheritDoc */
   public setWorkerChoiceStrategyOptions (
-    workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+    workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
   ): void {
     this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
-    this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
-    this.workerChoiceStrategyContext.setOptions(
+    if (workerChoiceStrategyOptions != null) {
+      this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+    }
+    this.workerChoiceStrategyContext?.setOptions(
       this.opts.workerChoiceStrategyOptions
     )
   }
@@ -557,28 +565,88 @@ export abstract class AbstractPool<
     tasksQueueOptions?: TasksQueueOptions
   ): void {
     if (this.opts.enableTasksQueue === true && !enable) {
+      this.unsetTaskStealing()
+      this.unsetTasksStealingOnBackPressure()
       this.flushTasksQueues()
     }
     this.opts.enableTasksQueue = enable
-    this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
+    this.setTasksQueueOptions(tasksQueueOptions)
   }
 
   /** @inheritDoc */
-  public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
+  public setTasksQueueOptions (
+    tasksQueueOptions: TasksQueueOptions | undefined
+  ): void {
     if (this.opts.enableTasksQueue === true) {
-      this.checkValidTasksQueueOptions(tasksQueueOptions)
+      checkValidTasksQueueOptions(tasksQueueOptions)
       this.opts.tasksQueueOptions =
         this.buildTasksQueueOptions(tasksQueueOptions)
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
+      if (this.opts.tasksQueueOptions.taskStealing === true) {
+        this.unsetTaskStealing()
+        this.setTaskStealing()
+      } else {
+        this.unsetTaskStealing()
+      }
+      if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+        this.unsetTasksStealingOnBackPressure()
+        this.setTasksStealingOnBackPressure()
+      } else {
+        this.unsetTasksStealingOnBackPressure()
+      }
     } else if (this.opts.tasksQueueOptions != null) {
       delete this.opts.tasksQueueOptions
     }
   }
 
   private buildTasksQueueOptions (
-    tasksQueueOptions: TasksQueueOptions
+    tasksQueueOptions: TasksQueueOptions | undefined
   ): TasksQueueOptions {
     return {
-      concurrency: tasksQueueOptions?.concurrency ?? 1
+      ...getDefaultTasksQueueOptions(
+        this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+      ),
+      ...tasksQueueOptions
+    }
+  }
+
+  private setTasksQueueSize (size: number): void {
+    for (const workerNode of this.workerNodes) {
+      workerNode.tasksQueueBackPressureSize = size
+    }
+  }
+
+  private setTaskStealing (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
+    }
+  }
+
+  private unsetTaskStealing (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.workerNodes[workerNodeKey].off(
+        'idle',
+        this.handleWorkerNodeIdleEvent
+      )
+    }
+  }
+
+  private setTasksStealingOnBackPressure (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.workerNodes[workerNodeKey].on(
+        'backPressure',
+        this.handleWorkerNodeBackPressureEvent
+      )
+    }
+  }
+
+  private unsetTasksStealingOnBackPressure (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.workerNodes[workerNodeKey].off(
+        'backPressure',
+        this.handleWorkerNodeBackPressureEvent
+      )
     }
   }
 
@@ -588,7 +656,10 @@ export abstract class AbstractPool<
    * The pool filling boolean status.
    */
   protected get full (): boolean {
-    return this.workerNodes.length >= this.maxSize
+    return (
+      this.workerNodes.length >=
+      (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
+    )
   }
 
   /**
@@ -599,307 +670,529 @@ export abstract class AbstractPool<
   protected abstract get busy (): boolean
 
   /**
-   * Whether worker nodes are executing at least one task.
+   * Whether worker nodes are executing concurrently their tasks quota or not.
    *
    * @returns Worker nodes busyness boolean status.
    */
   protected internalBusy (): boolean {
+    if (this.opts.enableTasksQueue === true) {
+      return (
+        this.workerNodes.findIndex(
+          workerNode =>
+            workerNode.info.ready &&
+            workerNode.usage.tasks.executing <
+              // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+              this.opts.tasksQueueOptions!.concurrency!
+        ) === -1
+      )
+    }
     return (
-      this.workerNodes.findIndex(workerNode => {
-        return workerNode.usage.tasks.executing === 0
-      }) === -1
+      this.workerNodes.findIndex(
+        workerNode =>
+          workerNode.info.ready && workerNode.usage.tasks.executing === 0
+      ) === -1
     )
   }
 
-  /** @inheritDoc */
-  public async execute (data?: Data, name?: string): Promise<Response> {
-    const timestamp = performance.now()
-    const workerNodeKey = this.chooseWorkerNode()
-    const submittedTask: Task<Data> = {
-      name: name ?? DEFAULT_TASK_NAME,
-      // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
-      data: data ?? ({} as Data),
-      timestamp,
-      workerId: this.getWorkerInfo(workerNodeKey).id as number,
-      id: randomUUID()
-    }
-    const res = new Promise<Response>((resolve, reject) => {
-      this.promiseResponseMap.set(submittedTask.id as string, {
-        resolve,
-        reject,
-        worker: this.workerNodes[workerNodeKey].worker
-      })
-    })
-    if (
-      this.opts.enableTasksQueue === true &&
-      (this.busy ||
+  private isWorkerNodeBusy (workerNodeKey: number): boolean {
+    if (this.opts.enableTasksQueue === true) {
+      return (
         this.workerNodes[workerNodeKey].usage.tasks.executing >=
-          ((this.opts.tasksQueueOptions as TasksQueueOptions)
-            .concurrency as number))
-    ) {
-      this.enqueueTask(workerNodeKey, submittedTask)
-    } else {
-      this.executeTask(workerNodeKey, submittedTask)
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        this.opts.tasksQueueOptions!.concurrency!
+      )
     }
-    this.checkAndEmitEvents()
-    // eslint-disable-next-line @typescript-eslint/return-await
-    return res
+    return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
   }
 
-  /** @inheritDoc */
-  public async destroy (): Promise<void> {
-    await Promise.all(
-      this.workerNodes.map(async (workerNode, workerNodeKey) => {
-        this.flushTasksQueue(workerNodeKey)
-        // FIXME: wait for tasks to be finished
-        const workerExitPromise = new Promise<void>(resolve => {
-          workerNode.worker.on('exit', () => {
-            resolve()
-          })
-        })
-        await this.destroyWorker(workerNode.worker)
-        await workerExitPromise
-      })
-    )
+  private async sendTaskFunctionOperationToWorker (
+    workerNodeKey: number,
+    message: MessageValue<Data>
+  ): Promise<boolean> {
+    return await new Promise<boolean>((resolve, reject) => {
+      const taskFunctionOperationListener = (
+        message: MessageValue<Response>
+      ): void => {
+        this.checkMessageWorkerId(message)
+        const workerId = this.getWorkerInfo(workerNodeKey)?.id
+        if (
+          message.taskFunctionOperationStatus != null &&
+          message.workerId === workerId
+        ) {
+          if (message.taskFunctionOperationStatus) {
+            resolve(true)
+          } else {
+            reject(
+              new Error(
+                `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
+              )
+            )
+          }
+          this.deregisterWorkerMessageListener(
+            this.getWorkerNodeKeyByWorkerId(message.workerId),
+            taskFunctionOperationListener
+          )
+        }
+      }
+      this.registerWorkerMessageListener(
+        workerNodeKey,
+        taskFunctionOperationListener
+      )
+      this.sendToWorker(workerNodeKey, message)
+    })
   }
 
-  /**
-   * Terminates the given worker.
-   *
-   * @param worker - A worker within `workerNodes`.
-   */
-  protected abstract destroyWorker (worker: Worker): void | Promise<void>
-
-  /**
-   * Setup hook to execute code before worker nodes are created in the abstract constructor.
-   * Can be overridden.
-   *
-   * @virtual
-   */
-  protected setupHook (): void {
-    // Intentionally empty
+  private async sendTaskFunctionOperationToWorkers (
+    message: MessageValue<Data>
+  ): Promise<boolean> {
+    return await new Promise<boolean>((resolve, reject) => {
+      const responsesReceived = new Array<MessageValue<Response>>()
+      const taskFunctionOperationsListener = (
+        message: MessageValue<Response>
+      ): void => {
+        this.checkMessageWorkerId(message)
+        if (message.taskFunctionOperationStatus != null) {
+          responsesReceived.push(message)
+          if (responsesReceived.length === this.workerNodes.length) {
+            if (
+              responsesReceived.every(
+                message => message.taskFunctionOperationStatus === true
+              )
+            ) {
+              resolve(true)
+            } else if (
+              responsesReceived.some(
+                message => message.taskFunctionOperationStatus === false
+              )
+            ) {
+              const errorResponse = responsesReceived.find(
+                response => response.taskFunctionOperationStatus === false
+              )
+              reject(
+                new Error(
+                  `Task function operation '${
+                    message.taskFunctionOperation as string
+                  }' failed on worker ${errorResponse?.workerId} with error: '${
+                    errorResponse?.workerError?.message
+                  }'`
+                )
+              )
+            }
+            this.deregisterWorkerMessageListener(
+              this.getWorkerNodeKeyByWorkerId(message.workerId),
+              taskFunctionOperationsListener
+            )
+          }
+        }
+      }
+      for (const [workerNodeKey] of this.workerNodes.entries()) {
+        this.registerWorkerMessageListener(
+          workerNodeKey,
+          taskFunctionOperationsListener
+        )
+        this.sendToWorker(workerNodeKey, message)
+      }
+    })
   }
 
-  /**
-   * Should return whether the worker is the main worker or not.
-   */
-  protected abstract isMain (): boolean
-
-  /**
-   * Hook executed before the worker task execution.
-   * Can be overridden.
-   *
-   * @param workerNodeKey - The worker node key.
-   * @param task - The task to execute.
-   */
-  protected beforeTaskExecutionHook (
-    workerNodeKey: number,
-    task: Task<Data>
-  ): void {
-    const workerUsage = this.workerNodes[workerNodeKey].usage
-    ++workerUsage.tasks.executing
-    this.updateWaitTimeWorkerUsage(workerUsage, task)
-    const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
-      task.name as string
-    ) as WorkerUsage
-    ++taskWorkerUsage.tasks.executing
-    this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+  /** @inheritDoc */
+  public hasTaskFunction (name: string): boolean {
+    for (const workerNode of this.workerNodes) {
+      if (
+        Array.isArray(workerNode.info.taskFunctionNames) &&
+        workerNode.info.taskFunctionNames.includes(name)
+      ) {
+        return true
+      }
+    }
+    return false
   }
 
-  /**
-   * Hook executed after the worker task execution.
-   * Can be overridden.
-   *
-   * @param worker - The worker.
-   * @param message - The received message.
-   */
-  protected afterTaskExecutionHook (
-    worker: Worker,
-    message: MessageValue<Response>
-  ): void {
-    const workerNodeKey = this.getWorkerNodeKey(worker)
-    const workerUsage = this.workerNodes[workerNodeKey].usage
-    this.updateTaskStatisticsWorkerUsage(workerUsage, message)
-    this.updateRunTimeWorkerUsage(workerUsage, message)
-    this.updateEluWorkerUsage(workerUsage, message)
-    const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
-      message.taskPerformance?.name ?? DEFAULT_TASK_NAME
-    ) as WorkerUsage
-    this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
-    this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
-    this.updateEluWorkerUsage(taskWorkerUsage, message)
-  }
-
-  private updateTaskStatisticsWorkerUsage (
-    workerUsage: WorkerUsage,
-    message: MessageValue<Response>
-  ): void {
-    const workerTaskStatistics = workerUsage.tasks
-    --workerTaskStatistics.executing
-    if (message.taskError == null) {
-      ++workerTaskStatistics.executed
-    } else {
-      ++workerTaskStatistics.failed
+  /** @inheritDoc */
+  public async addTaskFunction (
+    name: string,
+    fn: TaskFunction<Data, Response>
+  ): Promise<boolean> {
+    if (typeof name !== 'string') {
+      throw new TypeError('name argument must be a string')
+    }
+    if (typeof name === 'string' && name.trim().length === 0) {
+      throw new TypeError('name argument must not be an empty string')
     }
+    if (typeof fn !== 'function') {
+      throw new TypeError('fn argument must be a function')
+    }
+    const opResult = await this.sendTaskFunctionOperationToWorkers({
+      taskFunctionOperation: 'add',
+      taskFunctionName: name,
+      taskFunction: fn.toString()
+    })
+    this.taskFunctions.set(name, fn)
+    return opResult
   }
 
-  private updateRunTimeWorkerUsage (
-    workerUsage: WorkerUsage,
-    message: MessageValue<Response>
-  ): void {
-    if (
-      this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
-        .aggregate
-    ) {
-      const taskRunTime = message.taskPerformance?.runTime ?? 0
-      workerUsage.runTime.aggregate =
-        (workerUsage.runTime.aggregate ?? 0) + taskRunTime
-      workerUsage.runTime.minimum = Math.min(
-        taskRunTime,
-        workerUsage.runTime?.minimum ?? Infinity
-      )
-      workerUsage.runTime.maximum = Math.max(
-        taskRunTime,
-        workerUsage.runTime?.maximum ?? -Infinity
+  /** @inheritDoc */
+  public async removeTaskFunction (name: string): Promise<boolean> {
+    if (!this.taskFunctions.has(name)) {
+      throw new Error(
+        'Cannot remove a task function not handled on the pool side'
       )
-      if (
-        this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
-          .average &&
-        workerUsage.tasks.executed !== 0
-      ) {
-        workerUsage.runTime.average =
-          workerUsage.runTime.aggregate / workerUsage.tasks.executed
-      }
-      if (
-        this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
-          .median &&
-        message.taskPerformance?.runTime != null
-      ) {
-        workerUsage.runTime.history.push(message.taskPerformance.runTime)
-        workerUsage.runTime.median = median(workerUsage.runTime.history)
-      }
     }
+    const opResult = await this.sendTaskFunctionOperationToWorkers({
+      taskFunctionOperation: 'remove',
+      taskFunctionName: name
+    })
+    this.deleteTaskFunctionWorkerUsages(name)
+    this.taskFunctions.delete(name)
+    return opResult
   }
 
-  private updateWaitTimeWorkerUsage (
-    workerUsage: WorkerUsage,
-    task: Task<Data>
-  ): void {
-    const timestamp = performance.now()
-    const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
-    if (
-      this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
-        .aggregate
-    ) {
-      workerUsage.waitTime.aggregate =
-        (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
-      workerUsage.waitTime.minimum = Math.min(
-        taskWaitTime,
-        workerUsage.waitTime?.minimum ?? Infinity
-      )
-      workerUsage.waitTime.maximum = Math.max(
-        taskWaitTime,
-        workerUsage.waitTime?.maximum ?? -Infinity
-      )
+  /** @inheritDoc */
+  public listTaskFunctionNames (): string[] {
+    for (const workerNode of this.workerNodes) {
       if (
-        this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-          .waitTime.average &&
-        workerUsage.tasks.executed !== 0
+        Array.isArray(workerNode.info.taskFunctionNames) &&
+        workerNode.info.taskFunctionNames.length > 0
       ) {
-        workerUsage.waitTime.average =
-          workerUsage.waitTime.aggregate / workerUsage.tasks.executed
+        return workerNode.info.taskFunctionNames
+      }
+    }
+    return []
+  }
+
+  /** @inheritDoc */
+  public async setDefaultTaskFunction (name: string): Promise<boolean> {
+    return await this.sendTaskFunctionOperationToWorkers({
+      taskFunctionOperation: 'default',
+      taskFunctionName: name
+    })
+  }
+
+  private deleteTaskFunctionWorkerUsages (name: string): void {
+    for (const workerNode of this.workerNodes) {
+      workerNode.deleteTaskFunctionWorkerUsage(name)
+    }
+  }
+
+  private shallExecuteTask (workerNodeKey: number): boolean {
+    return (
+      this.tasksQueueSize(workerNodeKey) === 0 &&
+      this.workerNodes[workerNodeKey].usage.tasks.executing <
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        this.opts.tasksQueueOptions!.concurrency!
+    )
+  }
+
+  /** @inheritDoc */
+  public async execute (
+    data?: Data,
+    name?: string,
+    transferList?: TransferListItem[]
+  ): Promise<Response> {
+    return await new Promise<Response>((resolve, reject) => {
+      if (!this.started) {
+        reject(new Error('Cannot execute a task on not started pool'))
+        return
+      }
+      if (this.destroying) {
+        reject(new Error('Cannot execute a task on destroying pool'))
+        return
+      }
+      if (name != null && typeof name !== 'string') {
+        reject(new TypeError('name argument must be a string'))
+        return
       }
       if (
-        this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-          .waitTime.median
+        name != null &&
+        typeof name === 'string' &&
+        name.trim().length === 0
       ) {
-        workerUsage.waitTime.history.push(taskWaitTime)
-        workerUsage.waitTime.median = median(workerUsage.waitTime.history)
+        reject(new TypeError('name argument must not be an empty string'))
+        return
+      }
+      if (transferList != null && !Array.isArray(transferList)) {
+        reject(new TypeError('transferList argument must be an array'))
+        return
       }
+      const timestamp = performance.now()
+      const workerNodeKey = this.chooseWorkerNode()
+      const task: Task<Data> = {
+        name: name ?? DEFAULT_TASK_NAME,
+        // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
+        data: data ?? ({} as Data),
+        transferList,
+        timestamp,
+        taskId: randomUUID()
+      }
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      this.promiseResponseMap.set(task.taskId!, {
+        resolve,
+        reject,
+        workerNodeKey,
+        ...(this.emitter != null && {
+          asyncResource: new AsyncResource('poolifier:task', {
+            triggerAsyncId: this.emitter.asyncId,
+            requireManualDestroy: true
+          })
+        })
+      })
+      if (
+        this.opts.enableTasksQueue === false ||
+        (this.opts.enableTasksQueue === true &&
+          this.shallExecuteTask(workerNodeKey))
+      ) {
+        this.executeTask(workerNodeKey, task)
+      } else {
+        this.enqueueTask(workerNodeKey, task)
+      }
+    })
+  }
+
+  /** @inheritdoc */
+  public start (): void {
+    if (this.started) {
+      throw new Error('Cannot start an already started pool')
+    }
+    if (this.starting) {
+      throw new Error('Cannot start an already starting pool')
+    }
+    if (this.destroying) {
+      throw new Error('Cannot start a destroying pool')
+    }
+    this.starting = true
+    while (
+      this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+        0
+      ) < this.minimumNumberOfWorkers
+    ) {
+      this.createAndSetupWorkerNode()
+    }
+    this.starting = false
+    this.started = true
+  }
+
+  /** @inheritDoc */
+  public async destroy (): Promise<void> {
+    if (!this.started) {
+      throw new Error('Cannot destroy an already destroyed pool')
+    }
+    if (this.starting) {
+      throw new Error('Cannot destroy an starting pool')
+    }
+    if (this.destroying) {
+      throw new Error('Cannot destroy an already destroying pool')
+    }
+    this.destroying = true
+    await Promise.all(
+      this.workerNodes.map(async (_workerNode, workerNodeKey) => {
+        await this.destroyWorkerNode(workerNodeKey)
+      })
+    )
+    this.emitter?.emit(PoolEvents.destroy, this.info)
+    this.emitter?.emitDestroy()
+    this.emitter?.removeAllListeners()
+    this.readyEventEmitted = false
+    this.destroying = false
+    this.started = false
+  }
+
+  private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
+    await new Promise<void>((resolve, reject) => {
+      // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+      if (this.workerNodes[workerNodeKey] == null) {
+        resolve()
+        return
+      }
+      const killMessageListener = (message: MessageValue<Response>): void => {
+        this.checkMessageWorkerId(message)
+        if (message.kill === 'success') {
+          resolve()
+        } else if (message.kill === 'failure') {
+          reject(
+            new Error(
+              `Kill message handling failed on worker ${message.workerId}`
+            )
+          )
+        }
+      }
+      // FIXME: should be registered only once
+      this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
+      this.sendToWorker(workerNodeKey, { kill: true })
+    })
+  }
+
+  /**
+   * Terminates the worker node given its worker node key.
+   *
+   * @param workerNodeKey - The worker node key.
+   */
+  protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+    this.flagWorkerNodeAsNotReady(workerNodeKey)
+    const flushedTasks = this.flushTasksQueue(workerNodeKey)
+    const workerNode = this.workerNodes[workerNodeKey]
+    await waitWorkerNodeEvents(
+      workerNode,
+      'taskFinished',
+      flushedTasks,
+      this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
+        getDefaultTasksQueueOptions(
+          this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+        ).tasksFinishedTimeout
+    )
+    await this.sendKillMessageToWorker(workerNodeKey)
+    await workerNode.terminate()
+  }
+
+  /**
+   * Setup hook to execute code before worker nodes are created in the abstract constructor.
+   * Can be overridden.
+   *
+   * @virtual
+   */
+  protected setupHook (): void {
+    /* Intentionally empty */
+  }
+
+  /**
+   * Should return whether the worker is the main worker or not.
+   */
+  protected abstract isMain (): boolean
+
+  /**
+   * Hook executed before the worker task execution.
+   * Can be overridden.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param task - The task to execute.
+   */
+  protected beforeTaskExecutionHook (
+    workerNodeKey: number,
+    task: Task<Data>
+  ): void {
+    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+    if (this.workerNodes[workerNodeKey]?.usage != null) {
+      const workerUsage = this.workerNodes[workerNodeKey].usage
+      ++workerUsage.tasks.executing
+      updateWaitTimeWorkerUsage(
+        this.workerChoiceStrategyContext,
+        workerUsage,
+        task
+      )
+    }
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
+        null
+    ) {
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      const taskFunctionWorkerUsage = this.workerNodes[
+        workerNodeKey
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      ].getTaskFunctionWorkerUsage(task.name!)!
+      ++taskFunctionWorkerUsage.tasks.executing
+      updateWaitTimeWorkerUsage(
+        this.workerChoiceStrategyContext,
+        taskFunctionWorkerUsage,
+        task
+      )
     }
   }
 
-  private updateEluWorkerUsage (
-    workerUsage: WorkerUsage,
+  /**
+   * Hook executed after the worker task execution.
+   * Can be overridden.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param message - The received message.
+   */
+  protected afterTaskExecutionHook (
+    workerNodeKey: number,
     message: MessageValue<Response>
   ): void {
+    let needWorkerChoiceStrategyUpdate = false
+    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+    if (this.workerNodes[workerNodeKey]?.usage != null) {
+      const workerUsage = this.workerNodes[workerNodeKey].usage
+      updateTaskStatisticsWorkerUsage(workerUsage, message)
+      updateRunTimeWorkerUsage(
+        this.workerChoiceStrategyContext,
+        workerUsage,
+        message
+      )
+      updateEluWorkerUsage(
+        this.workerChoiceStrategyContext,
+        workerUsage,
+        message
+      )
+      needWorkerChoiceStrategyUpdate = true
+    }
     if (
-      this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
-        .aggregate
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        message.taskPerformance!.name
+      ) != null
     ) {
-      if (message.taskPerformance?.elu != null) {
-        workerUsage.elu.idle.aggregate =
-          (workerUsage.elu.idle?.aggregate ?? 0) +
-          message.taskPerformance.elu.idle
-        workerUsage.elu.active.aggregate =
-          (workerUsage.elu.active?.aggregate ?? 0) +
-          message.taskPerformance.elu.active
-        if (workerUsage.elu.utilization != null) {
-          workerUsage.elu.utilization =
-            (workerUsage.elu.utilization +
-              message.taskPerformance.elu.utilization) /
-            2
-        } else {
-          workerUsage.elu.utilization = message.taskPerformance.elu.utilization
-        }
-        workerUsage.elu.idle.minimum = Math.min(
-          message.taskPerformance.elu.idle,
-          workerUsage.elu.idle?.minimum ?? Infinity
-        )
-        workerUsage.elu.idle.maximum = Math.max(
-          message.taskPerformance.elu.idle,
-          workerUsage.elu.idle?.maximum ?? -Infinity
-        )
-        workerUsage.elu.active.minimum = Math.min(
-          message.taskPerformance.elu.active,
-          workerUsage.elu.active?.minimum ?? Infinity
-        )
-        workerUsage.elu.active.maximum = Math.max(
-          message.taskPerformance.elu.active,
-          workerUsage.elu.active?.maximum ?? -Infinity
-        )
-        if (
-          this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
-            .average &&
-          workerUsage.tasks.executed !== 0
-        ) {
-          workerUsage.elu.idle.average =
-            workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
-          workerUsage.elu.active.average =
-            workerUsage.elu.active.aggregate / workerUsage.tasks.executed
-        }
-        if (
-          this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
-            .median
-        ) {
-          workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
-          workerUsage.elu.active.history.push(
-            message.taskPerformance.elu.active
-          )
-          workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
-          workerUsage.elu.active.median = median(workerUsage.elu.active.history)
-        }
-      }
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      const taskFunctionWorkerUsage = this.workerNodes[
+        workerNodeKey
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
+      updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
+      updateRunTimeWorkerUsage(
+        this.workerChoiceStrategyContext,
+        taskFunctionWorkerUsage,
+        message
+      )
+      updateEluWorkerUsage(
+        this.workerChoiceStrategyContext,
+        taskFunctionWorkerUsage,
+        message
+      )
+      needWorkerChoiceStrategyUpdate = true
+    }
+    if (needWorkerChoiceStrategyUpdate) {
+      this.workerChoiceStrategyContext?.update(workerNodeKey)
     }
   }
 
+  /**
+   * Whether the worker node shall update its task function worker usage or not.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
+   */
+  private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    return (
+      workerInfo != null &&
+      Array.isArray(workerInfo.taskFunctionNames) &&
+      workerInfo.taskFunctionNames.length > 2
+    )
+  }
+
   /**
    * Chooses a worker node for the next task.
    *
    * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
    *
-   * @returns The worker node key
+   * @returns The chosen worker node key
    */
   private chooseWorkerNode (): number {
     if (this.shallCreateDynamicWorker()) {
-      const worker = this.createAndSetupDynamicWorker()
+      const workerNodeKey = this.createAndSetupDynamicWorkerNode()
       if (
-        this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
+        this.workerChoiceStrategyContext?.getStrategyPolicy()
+          .dynamicWorkerUsage === true
       ) {
-        return this.getWorkerNodeKey(worker)
+        return workerNodeKey
       }
     }
-    return this.workerChoiceStrategyContext.execute()
+    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+    return this.workerChoiceStrategyContext!.execute()
   }
 
   /**
@@ -907,283 +1200,726 @@ export abstract class AbstractPool<
    *
    * @returns Whether to create a dynamic worker or not.
    */
-  private shallCreateDynamicWorker (): boolean {
-    return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
-  }
+  protected abstract shallCreateDynamicWorker (): boolean
 
   /**
-   * Sends a message to the given worker.
+   * Sends a message to worker given its worker node key.
    *
-   * @param worker - The worker which should receive the message.
+   * @param workerNodeKey - The worker node key.
    * @param message - The message.
+   * @param transferList - The optional array of transferable objects.
    */
   protected abstract sendToWorker (
-    worker: Worker,
-    message: MessageValue<Data>
+    workerNodeKey: number,
+    message: MessageValue<Data>,
+    transferList?: TransferListItem[]
   ): void
 
   /**
-   * Creates a new worker.
-   *
-   * @returns Newly created worker.
-   */
-  protected abstract createWorker (): Worker
-
-  /**
-   * Creates a new worker and sets it up completely in the pool worker nodes.
+   * Creates a new, completely set up worker node.
    *
-   * @returns New, completely set up worker.
+   * @returns New, completely set up worker node key.
    */
-  protected createAndSetupWorker (): Worker {
-    const worker = this.createWorker()
-
-    worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
-    worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
-    worker.on('error', error => {
-      const workerNodeKey = this.getWorkerNodeKey(worker)
-      const workerInfo = this.getWorkerInfo(workerNodeKey)
-      workerInfo.ready = false
-      if (this.emitter != null) {
-        this.emitter.emit(PoolEvents.error, error)
-      }
-      if (this.opts.restartWorkerOnError === true && !this.starting) {
-        if (workerInfo.dynamic) {
-          this.createAndSetupDynamicWorker()
+  protected createAndSetupWorkerNode (): number {
+    const workerNode = this.createWorkerNode()
+    workerNode.registerWorkerEventHandler(
+      'online',
+      this.opts.onlineHandler ?? EMPTY_FUNCTION
+    )
+    workerNode.registerWorkerEventHandler(
+      'message',
+      this.opts.messageHandler ?? EMPTY_FUNCTION
+    )
+    workerNode.registerWorkerEventHandler(
+      'error',
+      this.opts.errorHandler ?? EMPTY_FUNCTION
+    )
+    workerNode.registerWorkerEventHandler('error', (error: Error) => {
+      workerNode.info.ready = false
+      this.emitter?.emit(PoolEvents.error, error)
+      if (
+        this.started &&
+        !this.destroying &&
+        this.opts.restartWorkerOnError === true
+      ) {
+        if (workerNode.info.dynamic) {
+          this.createAndSetupDynamicWorkerNode()
         } else {
-          this.createAndSetupWorker()
+          this.createAndSetupWorkerNode()
         }
       }
-      if (this.opts.enableTasksQueue === true) {
-        this.redistributeQueuedTasks(workerNodeKey)
+      if (
+        this.started &&
+        !this.destroying &&
+        this.opts.enableTasksQueue === true
+      ) {
+        this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
       }
+      // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+      workerNode?.terminate().catch(error => {
+        this.emitter?.emit(PoolEvents.error, error)
+      })
     })
-    worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
-    worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
-    worker.once('exit', () => {
-      this.removeWorkerNode(worker)
+    workerNode.registerWorkerEventHandler(
+      'exit',
+      this.opts.exitHandler ?? EMPTY_FUNCTION
+    )
+    workerNode.registerOnceWorkerEventHandler('exit', () => {
+      this.removeWorkerNode(workerNode)
     })
-
-    this.addWorkerNode(worker)
-
-    this.afterWorkerSetup(worker)
-
-    return worker
+    const workerNodeKey = this.addWorkerNode(workerNode)
+    this.afterWorkerNodeSetup(workerNodeKey)
+    return workerNodeKey
   }
 
   /**
-   * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
+   * Creates a new, completely set up dynamic worker node.
    *
-   * @returns New, completely set up dynamic worker.
+   * @returns New, completely set up dynamic worker node key.
    */
-  protected createAndSetupDynamicWorker (): Worker {
-    const worker = this.createAndSetupWorker()
-    this.registerWorkerMessageListener(worker, message => {
-      const workerNodeKey = this.getWorkerNodeKey(worker)
+  protected createAndSetupDynamicWorkerNode (): number {
+    const workerNodeKey = this.createAndSetupWorkerNode()
+    this.registerWorkerMessageListener(workerNodeKey, message => {
+      this.checkMessageWorkerId(message)
+      const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
+        message.workerId
+      )
+      const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
+      // Kill message received from worker
       if (
         isKillBehavior(KillBehaviors.HARD, message.kill) ||
-        (message.kill != null &&
+        (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
           ((this.opts.enableTasksQueue === false &&
-            this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
+            workerUsage.tasks.executing === 0) ||
             (this.opts.enableTasksQueue === true &&
-              this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
-              this.tasksQueueSize(workerNodeKey) === 0)))
+              workerUsage.tasks.executing === 0 &&
+              this.tasksQueueSize(localWorkerNodeKey) === 0)))
       ) {
-        // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-        void (this.destroyWorker(worker) as Promise<void>)
+        // Flag the worker node as not ready immediately
+        this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
+        this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
+          this.emitter?.emit(PoolEvents.error, error)
+        })
       }
     })
-    const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
-    workerInfo.dynamic = true
-    if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
-      workerInfo.ready = true
-    }
-    this.sendToWorker(worker, {
-      checkActive: true,
-      workerId: workerInfo.id as number
+    this.sendToWorker(workerNodeKey, {
+      checkActive: true
     })
-    return worker
+    if (this.taskFunctions.size > 0) {
+      for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+        this.sendTaskFunctionOperationToWorker(workerNodeKey, {
+          taskFunctionOperation: 'add',
+          taskFunctionName,
+          taskFunction: taskFunction.toString()
+        }).catch(error => {
+          this.emitter?.emit(PoolEvents.error, error)
+        })
+      }
+    }
+    const workerNode = this.workerNodes[workerNodeKey]
+    workerNode.info.dynamic = true
+    if (
+      this.workerChoiceStrategyContext?.getStrategyPolicy()
+        .dynamicWorkerReady === true ||
+      this.workerChoiceStrategyContext?.getStrategyPolicy()
+        .dynamicWorkerUsage === true
+    ) {
+      workerNode.info.ready = true
+    }
+    this.checkAndEmitDynamicWorkerCreationEvents()
+    return workerNodeKey
   }
 
   /**
-   * Registers a listener callback on the given worker.
+   * Registers a listener callback on the worker given its worker node key.
    *
-   * @param worker - The worker which should register a listener.
+   * @param workerNodeKey - The worker node key.
    * @param listener - The message listener callback.
    */
-  private registerWorkerMessageListener<Message extends Data | Response>(
-    worker: Worker,
+  protected abstract registerWorkerMessageListener<
+    Message extends Data | Response
+  >(
+    workerNodeKey: number,
     listener: (message: MessageValue<Message>) => void
-  ): void {
-    worker.on('message', listener as MessageHandler<Worker>)
-  }
+  ): void
+
+  /**
+   * Registers once a listener callback on the worker given its worker node key.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param listener - The message listener callback.
+   */
+  protected abstract registerOnceWorkerMessageListener<
+    Message extends Data | Response
+  >(
+    workerNodeKey: number,
+    listener: (message: MessageValue<Message>) => void
+  ): void
+
+  /**
+   * Deregisters a listener callback on the worker given its worker node key.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param listener - The message listener callback.
+   */
+  protected abstract deregisterWorkerMessageListener<
+    Message extends Data | Response
+  >(
+    workerNodeKey: number,
+    listener: (message: MessageValue<Message>) => void
+  ): void
 
   /**
-   * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
+   * Method hooked up after a worker node has been newly created.
    * Can be overridden.
    *
-   * @param worker - The newly created worker.
+   * @param workerNodeKey - The newly created worker node key.
    */
-  protected afterWorkerSetup (worker: Worker): void {
+  protected afterWorkerNodeSetup (workerNodeKey: number): void {
     // Listen to worker messages.
-    this.registerWorkerMessageListener(worker, this.workerListener())
-    // Send startup message to worker.
-    this.sendWorkerStartupMessage(worker)
-    // Setup worker task statistics computation.
-    this.setWorkerStatistics(worker)
+    this.registerWorkerMessageListener(
+      workerNodeKey,
+      this.workerMessageListener
+    )
+    // Send the startup message to worker.
+    this.sendStartupMessageToWorker(workerNodeKey)
+    // Send the statistics message to worker.
+    this.sendStatisticsMessageToWorker(workerNodeKey)
+    if (this.opts.enableTasksQueue === true) {
+      if (this.opts.tasksQueueOptions?.taskStealing === true) {
+        this.workerNodes[workerNodeKey].on(
+          'idle',
+          this.handleWorkerNodeIdleEvent
+        )
+      }
+      if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
+        this.workerNodes[workerNodeKey].on(
+          'backPressure',
+          this.handleWorkerNodeBackPressureEvent
+        )
+      }
+    }
   }
 
-  private sendWorkerStartupMessage (worker: Worker): void {
-    this.sendToWorker(worker, {
-      ready: false,
-      workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
+  /**
+   * Sends the startup message to worker given its worker node key.
+   *
+   * @param workerNodeKey - The worker node key.
+   */
+  protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
+
+  /**
+   * Sends the statistics message to worker given its worker node key.
+   *
+   * @param workerNodeKey - The worker node key.
+   */
+  private sendStatisticsMessageToWorker (workerNodeKey: number): void {
+    this.sendToWorker(workerNodeKey, {
+      statistics: {
+        runTime:
+          this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+            .runTime.aggregate ?? false,
+        elu:
+          this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
+            .aggregate ?? false
+      }
     })
   }
 
+  private cannotStealTask (): boolean {
+    return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
+  }
+
+  private handleTask (workerNodeKey: number, task: Task<Data>): void {
+    if (this.shallExecuteTask(workerNodeKey)) {
+      this.executeTask(workerNodeKey, task)
+    } else {
+      this.enqueueTask(workerNodeKey, task)
+    }
+  }
+
   private redistributeQueuedTasks (workerNodeKey: number): void {
+    if (workerNodeKey === -1 || this.cannotStealTask()) {
+      return
+    }
     while (this.tasksQueueSize(workerNodeKey) > 0) {
-      let targetWorkerNodeKey: number = workerNodeKey
-      let minQueuedTasks = Infinity
-      for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
-        const workerInfo = this.getWorkerInfo(workerNodeId)
-        if (
-          workerNodeId !== workerNodeKey &&
-          workerInfo.ready &&
-          workerNode.usage.tasks.queued === 0
-        ) {
-          targetWorkerNodeKey = workerNodeId
-          break
-        }
-        if (
-          workerNodeId !== workerNodeKey &&
-          workerInfo.ready &&
-          workerNode.usage.tasks.queued < minQueuedTasks
-        ) {
-          minQueuedTasks = workerNode.usage.tasks.queued
-          targetWorkerNodeKey = workerNodeId
-        }
+      const destinationWorkerNodeKey = this.workerNodes.reduce(
+        (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
+          return workerNode.info.ready &&
+            workerNode.usage.tasks.queued <
+              workerNodes[minWorkerNodeKey].usage.tasks.queued
+            ? workerNodeKey
+            : minWorkerNodeKey
+        },
+        0
+      )
+      this.handleTask(
+        destinationWorkerNodeKey,
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        this.dequeueTask(workerNodeKey)!
+      )
+    }
+  }
+
+  private updateTaskStolenStatisticsWorkerUsage (
+    workerNodeKey: number,
+    taskName: string
+  ): void {
+    const workerNode = this.workerNodes[workerNodeKey]
+    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+    if (workerNode?.usage != null) {
+      ++workerNode.usage.tasks.stolen
+    }
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      workerNode.getTaskFunctionWorkerUsage(taskName) != null
+    ) {
+      const taskFunctionWorkerUsage =
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        workerNode.getTaskFunctionWorkerUsage(taskName)!
+      ++taskFunctionWorkerUsage.tasks.stolen
+    }
+  }
+
+  private updateTaskSequentiallyStolenStatisticsWorkerUsage (
+    workerNodeKey: number
+  ): void {
+    const workerNode = this.workerNodes[workerNodeKey]
+    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+    if (workerNode?.usage != null) {
+      ++workerNode.usage.tasks.sequentiallyStolen
+    }
+  }
+
+  private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+    workerNodeKey: number,
+    taskName: string
+  ): void {
+    const workerNode = this.workerNodes[workerNodeKey]
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      workerNode.getTaskFunctionWorkerUsage(taskName) != null
+    ) {
+      const taskFunctionWorkerUsage =
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        workerNode.getTaskFunctionWorkerUsage(taskName)!
+      ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+    }
+  }
+
+  private resetTaskSequentiallyStolenStatisticsWorkerUsage (
+    workerNodeKey: number
+  ): void {
+    const workerNode = this.workerNodes[workerNodeKey]
+    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+    if (workerNode?.usage != null) {
+      workerNode.usage.tasks.sequentiallyStolen = 0
+    }
+  }
+
+  private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+    workerNodeKey: number,
+    taskName: string
+  ): void {
+    const workerNode = this.workerNodes[workerNodeKey]
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      workerNode.getTaskFunctionWorkerUsage(taskName) != null
+    ) {
+      const taskFunctionWorkerUsage =
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        workerNode.getTaskFunctionWorkerUsage(taskName)!
+      taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+    }
+  }
+
+  private readonly handleWorkerNodeIdleEvent = (
+    eventDetail: WorkerNodeEventDetail,
+    previousStolenTask?: Task<Data>
+  ): void => {
+    const { workerNodeKey } = eventDetail
+    if (workerNodeKey == null) {
+      throw new Error(
+        "WorkerNode event detail 'workerNodeKey' property must be defined"
+      )
+    }
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    if (
+      this.cannotStealTask() ||
+      (this.info.stealingWorkerNodes ?? 0) >
+        Math.floor(this.workerNodes.length / 2)
+    ) {
+      if (workerInfo != null && previousStolenTask != null) {
+        workerInfo.stealing = false
       }
-      this.enqueueTask(
-        targetWorkerNodeKey,
-        this.dequeueTask(workerNodeKey) as Task<Data>
+      return
+    }
+    const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+    if (
+      workerInfo != null &&
+      previousStolenTask != null &&
+      workerNodeTasksUsage.sequentiallyStolen > 0 &&
+      (workerNodeTasksUsage.executing > 0 ||
+        this.tasksQueueSize(workerNodeKey) > 0)
+    ) {
+      workerInfo.stealing = false
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      for (const taskName of this.workerNodes[workerNodeKey].info
+        .taskFunctionNames!) {
+        this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+          workerNodeKey,
+          taskName
+        )
+      }
+      this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
+      return
+    }
+    if (workerInfo == null) {
+      throw new Error(
+        `Worker node with key '${workerNodeKey}' not found in pool`
+      )
+    }
+    workerInfo.stealing = true
+    const stolenTask = this.workerNodeStealTask(workerNodeKey)
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      stolenTask != null
+    ) {
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      const taskFunctionTasksWorkerUsage = this.workerNodes[
+        workerNodeKey
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
+      if (
+        taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
+        (previousStolenTask != null &&
+          previousStolenTask.name === stolenTask.name &&
+          taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
+      ) {
+        this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+          workerNodeKey,
+          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+          stolenTask.name!
+        )
+      } else {
+        this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+          workerNodeKey,
+          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+          stolenTask.name!
+        )
+      }
+    }
+    sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
+      .then(() => {
+        this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
+        return undefined
+      })
+      .catch(error => {
+        this.emitter?.emit(PoolEvents.error, error)
+      })
+  }
+
+  private readonly workerNodeStealTask = (
+    workerNodeKey: number
+  ): Task<Data> | undefined => {
+    const workerNodes = this.workerNodes
+      .slice()
+      .sort(
+        (workerNodeA, workerNodeB) =>
+          workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
       )
+    const sourceWorkerNode = workerNodes.find(
+      (sourceWorkerNode, sourceWorkerNodeKey) =>
+        sourceWorkerNode.info.ready &&
+        !sourceWorkerNode.info.stealing &&
+        sourceWorkerNodeKey !== workerNodeKey &&
+        sourceWorkerNode.usage.tasks.queued > 0
+    )
+    if (sourceWorkerNode != null) {
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      const task = sourceWorkerNode.popTask()!
+      this.handleTask(workerNodeKey, task)
+      this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
+      return task
+    }
+  }
+
+  private readonly handleWorkerNodeBackPressureEvent = (
+    eventDetail: WorkerNodeEventDetail
+  ): void => {
+    if (
+      this.cannotStealTask() ||
+      (this.info.stealingWorkerNodes ?? 0) >
+        Math.floor(this.workerNodes.length / 2)
+    ) {
+      return
+    }
+    const { workerId } = eventDetail
+    const sizeOffset = 1
+    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+    if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
+      return
+    }
+    const sourceWorkerNode =
+      this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+    const workerNodes = this.workerNodes
+      .slice()
+      .sort(
+        (workerNodeA, workerNodeB) =>
+          workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
+      )
+    for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
+      if (
+        sourceWorkerNode.usage.tasks.queued > 0 &&
+        workerNode.info.ready &&
+        !workerNode.info.stealing &&
+        workerNode.info.id !== workerId &&
+        workerNode.usage.tasks.queued <
+          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+          this.opts.tasksQueueOptions!.size! - sizeOffset
+      ) {
+        const workerInfo = this.getWorkerInfo(workerNodeKey)
+        if (workerInfo == null) {
+          throw new Error(
+            `Worker node with key '${workerNodeKey}' not found in pool`
+          )
+        }
+        workerInfo.stealing = true
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        const task = sourceWorkerNode.popTask()!
+        this.handleTask(workerNodeKey, task)
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
+        workerInfo.stealing = false
+      }
     }
   }
 
   /**
-   * This function is the listener registered for each worker message.
-   *
-   * @returns The listener function to execute when a message is received from a worker.
+   * This method is the message listener registered on each worker.
    */
-  protected workerListener (): (message: MessageValue<Response>) => void {
-    return message => {
-      this.checkMessageWorkerId(message)
-      if (message.ready != null) {
-        // Worker ready response received
-        this.handleWorkerReadyResponse(message)
-      } else if (message.id != null) {
-        // Task execution response received
-        this.handleTaskExecutionResponse(message)
+  protected readonly workerMessageListener = (
+    message: MessageValue<Response>
+  ): void => {
+    this.checkMessageWorkerId(message)
+    const { workerId, ready, taskId, taskFunctionNames } = message
+    if (ready != null && taskFunctionNames != null) {
+      // Worker ready response received from worker
+      this.handleWorkerReadyResponse(message)
+    } else if (taskId != null) {
+      // Task execution response received from worker
+      this.handleTaskExecutionResponse(message)
+    } else if (taskFunctionNames != null) {
+      // Task function names message received from worker
+      const workerInfo = this.getWorkerInfo(
+        this.getWorkerNodeKeyByWorkerId(workerId)
+      )
+      if (workerInfo != null) {
+        workerInfo.taskFunctionNames = taskFunctionNames
       }
     }
   }
 
+  private checkAndEmitReadyEvent (): void {
+    if (!this.readyEventEmitted && this.ready) {
+      this.emitter?.emit(PoolEvents.ready, this.info)
+      this.readyEventEmitted = true
+    }
+  }
+
   private handleWorkerReadyResponse (message: MessageValue<Response>): void {
-    const worker = this.getWorkerById(message.workerId)
-    this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
-      message.ready as boolean
-    if (this.emitter != null && this.ready) {
-      this.emitter.emit(PoolEvents.ready, this.info)
+    const { workerId, ready, taskFunctionNames } = message
+    if (ready == null || !ready) {
+      throw new Error(`Worker ${workerId} failed to initialize`)
     }
+    const workerNode =
+      this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+    workerNode.info.ready = ready
+    workerNode.info.taskFunctionNames = taskFunctionNames
+    this.checkAndEmitReadyEvent()
   }
 
   private handleTaskExecutionResponse (message: MessageValue<Response>): void {
-    const promiseResponse = this.promiseResponseMap.get(message.id as string)
+    const { workerId, taskId, workerError, data } = message
+    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+    const promiseResponse = this.promiseResponseMap.get(taskId!)
     if (promiseResponse != null) {
-      if (message.taskError != null) {
-        if (this.emitter != null) {
-          this.emitter.emit(PoolEvents.taskError, message.taskError)
-        }
-        promiseResponse.reject(message.taskError.message)
+      const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
+      const workerNode = this.workerNodes[workerNodeKey]
+      if (workerError != null) {
+        this.emitter?.emit(PoolEvents.taskError, workerError)
+        asyncResource != null
+          ? asyncResource.runInAsyncScope(
+            reject,
+            this.emitter,
+            workerError.message
+          )
+          : reject(workerError.message)
       } else {
-        promiseResponse.resolve(message.data as Response)
+        asyncResource != null
+          ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
+          : resolve(data as Response)
       }
-      this.afterTaskExecutionHook(promiseResponse.worker, message)
-      this.promiseResponseMap.delete(message.id as string)
-      const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
+      asyncResource?.emitDestroy()
+      this.afterTaskExecutionHook(workerNodeKey, message)
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      this.promiseResponseMap.delete(taskId!)
+      // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+      workerNode?.emit('taskFinished', taskId)
       if (
         this.opts.enableTasksQueue === true &&
-        this.tasksQueueSize(workerNodeKey) > 0
+        !this.destroying &&
+        // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+        workerNode != null
       ) {
-        this.executeTask(
-          workerNodeKey,
-          this.dequeueTask(workerNodeKey) as Task<Data>
-        )
+        const workerNodeTasksUsage = workerNode.usage.tasks
+        if (
+          this.tasksQueueSize(workerNodeKey) > 0 &&
+          workerNodeTasksUsage.executing <
+            // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+            this.opts.tasksQueueOptions!.concurrency!
+        ) {
+          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+          this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
+        }
+        if (
+          workerNodeTasksUsage.executing === 0 &&
+          this.tasksQueueSize(workerNodeKey) === 0 &&
+          workerNodeTasksUsage.sequentiallyStolen === 0
+        ) {
+          workerNode.emit('idle', {
+            workerId,
+            workerNodeKey
+          })
+        }
       }
-      this.workerChoiceStrategyContext.update(workerNodeKey)
     }
   }
 
-  private checkAndEmitEvents (): void {
-    if (this.emitter != null) {
-      if (this.busy) {
-        this.emitter.emit(PoolEvents.busy, this.info)
-      }
-      if (this.type === PoolTypes.dynamic && this.full) {
-        this.emitter.emit(PoolEvents.full, this.info)
-      }
+  private checkAndEmitTaskExecutionEvents (): void {
+    if (this.busy) {
+      this.emitter?.emit(PoolEvents.busy, this.info)
     }
   }
 
+  private checkAndEmitTaskQueuingEvents (): void {
+    if (this.hasBackPressure()) {
+      this.emitter?.emit(PoolEvents.backPressure, this.info)
+    }
+  }
+
+  /**
+   * Emits dynamic worker creation events.
+   */
+  protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
+
   /**
-   * Gets the worker information.
+   * Gets the worker information given its worker node key.
    *
    * @param workerNodeKey - The worker node key.
+   * @returns The worker information.
    */
-  private getWorkerInfo (workerNodeKey: number): WorkerInfo {
-    return this.workerNodes[workerNodeKey].info
+  protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
+    return this.workerNodes[workerNodeKey]?.info
   }
 
   /**
-   * Adds the given worker in the pool worker nodes.
+   * Creates a worker node.
    *
-   * @param worker - The worker.
-   * @returns The worker nodes length.
+   * @returns The created worker node.
    */
-  private addWorkerNode (worker: Worker): number {
-    const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+  private createWorkerNode (): IWorkerNode<Worker, Data> {
+    const workerNode = new WorkerNode<Worker, Data>(
+      this.worker,
+      this.filePath,
+      {
+        env: this.opts.env,
+        workerOptions: this.opts.workerOptions,
+        tasksQueueBackPressureSize:
+          this.opts.tasksQueueOptions?.size ??
+          getDefaultTasksQueueOptions(
+            this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+          ).size
+      }
+    )
     // Flag the worker node as ready at pool startup.
     if (this.starting) {
       workerNode.info.ready = true
     }
-    return this.workerNodes.push(workerNode)
+    return workerNode
+  }
+
+  /**
+   * Adds the given worker node in the pool worker nodes.
+   *
+   * @param workerNode - The worker node.
+   * @returns The added worker node key.
+   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
+   */
+  private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
+    this.workerNodes.push(workerNode)
+    const workerNodeKey = this.workerNodes.indexOf(workerNode)
+    if (workerNodeKey === -1) {
+      throw new Error('Worker added not found in worker nodes')
+    }
+    return workerNodeKey
+  }
+
+  private checkAndEmitEmptyEvent (): void {
+    if (this.empty) {
+      this.emitter?.emit(PoolEvents.empty, this.info)
+      this.readyEventEmitted = false
+    }
   }
 
   /**
-   * Removes the given worker from the pool worker nodes.
+   * Removes the worker node from the pool worker nodes.
    *
-   * @param worker - The worker.
+   * @param workerNode - The worker node.
    */
-  private removeWorkerNode (worker: Worker): void {
-    const workerNodeKey = this.getWorkerNodeKey(worker)
+  private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
+    const workerNodeKey = this.workerNodes.indexOf(workerNode)
     if (workerNodeKey !== -1) {
       this.workerNodes.splice(workerNodeKey, 1)
-      this.workerChoiceStrategyContext.remove(workerNodeKey)
+      this.workerChoiceStrategyContext?.remove(workerNodeKey)
     }
+    this.checkAndEmitEmptyEvent()
+  }
+
+  protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    if (workerInfo != null) {
+      workerInfo.ready = false
+    }
+  }
+
+  private hasBackPressure (): boolean {
+    return (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes.findIndex(
+        workerNode => !workerNode.hasBackPressure()
+      ) === -1
+    )
   }
 
   /**
-   * Executes the given task on the given worker.
+   * Executes the given task on the worker given its worker node key.
    *
-   * @param worker - The worker.
+   * @param workerNodeKey - The worker node key.
    * @param task - The task to execute.
    */
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
     this.beforeTaskExecutionHook(workerNodeKey, task)
-    this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
+    this.sendToWorker(workerNodeKey, task, task.transferList)
+    this.checkAndEmitTaskExecutionEvents()
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
-    return this.workerNodes[workerNodeKey].enqueueTask(task)
+    const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
+    this.checkAndEmitTaskQueuingEvents()
+    return tasksQueueSize
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
@@ -1194,14 +1930,15 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey].tasksQueueSize()
   }
 
-  private flushTasksQueue (workerNodeKey: number): void {
+  protected flushTasksQueue (workerNodeKey: number): number {
+    let flushedTasks = 0
     while (this.tasksQueueSize(workerNodeKey) > 0) {
-      this.executeTask(
-        workerNodeKey,
-        this.dequeueTask(workerNodeKey) as Task<Data>
-      )
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
+      ++flushedTasks
     }
     this.workerNodes[workerNodeKey].clearTasksQueue()
+    return flushedTasks
   }
 
   private flushTasksQueues (): void {
@@ -1209,17 +1946,4 @@ export abstract class AbstractPool<
       this.flushTasksQueue(workerNodeKey)
     }
   }
-
-  private setWorkerStatistics (worker: Worker): void {
-    this.sendToWorker(worker, {
-      statistics: {
-        runTime:
-          this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-            .runTime.aggregate,
-        elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
-          .elu.aggregate
-      },
-      workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
-    })
-  }
 }