feat: add new pool options
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 16 Sep 2023 18:48:42 +0000 (20:48 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 16 Sep 2023 18:48:42 +0000 (20:48 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
benchmarks/internal/bench.mjs
docs/api.md
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker-node.ts
tests/pools/abstract/abstract-pool.test.js

index 87f4a8667423181d8eeaf7206e83b56acc651d6c..6fec036fbc1d8cdb0d34e5f1a5d16d097bff441e 100644 (file)
@@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ### Added
 
+- Add `startWorkers` to pool options to whether start the minimum number of workers at pool creation or not.
+- Add `tasksStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable tasks stealing or not and whether enable tasks stealing on back pressure or not.
 - Continuous internal benchmarking: https://poolifier.github.io/benchmark-results/dev/bench.
 
 ## [2.6.44] - 2023-09-08
index 23ab0625807e349b89f6fabd55be766e45f9fff5..258854f864702ecf92816e151b33d14f38c96840 100644 (file)
@@ -13,35 +13,52 @@ import {
   runPoolifierTest
 } from '../benchmarks-utils.mjs'
 
+const poolifierSuite = new Benchmark.Suite('Poolifier', {
+  onCycle: event => {
+    console.info(event.target.toString())
+  },
+  onComplete: function () {
+    console.info(
+      'Fastest is ' + LIST_FORMATTER.format(this.filter('fastest').map('name'))
+    )
+  }
+})
+
 const poolSize = availableParallelism()
-const pools = []
+const benchmarkSettings = []
 for (const poolType of Object.values(PoolTypes)) {
   for (const workerType of Object.values(WorkerTypes)) {
     if (workerType === WorkerTypes.cluster) {
       continue
     }
     for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
-      for (const enableTasksQueue of [false]) {
+      for (const enableTasksQueue of [false, true]) {
         if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
           for (const measurement of [Measurements.runTime, Measurements.elu]) {
-            pools.push([
+            benchmarkSettings.push([
               `${poolType}|${workerType}|${workerChoiceStrategy}|tasks queue:${enableTasksQueue}|measurement:${measurement}`,
-              buildPoolifierPool(workerType, poolType, poolSize, {
+              workerType,
+              poolType,
+              poolSize,
+              {
                 workerChoiceStrategy,
                 workerChoiceStrategyOptions: {
                   measurement
                 },
                 enableTasksQueue
-              })
+              }
             ])
           }
         } else {
-          pools.push([
+          benchmarkSettings.push([
             `${poolType}|${workerType}|${workerChoiceStrategy}|tasks queue:${enableTasksQueue}`,
-            buildPoolifierPool(workerType, poolType, poolSize, {
+            workerType,
+            poolType,
+            poolSize,
+            {
               workerChoiceStrategy,
               enableTasksQueue
-            })
+            }
           ])
         }
       }
@@ -55,25 +72,21 @@ const workerData = {
   taskSize: 100
 }
 
-const suite = new Benchmark.Suite('Poolifier')
-for (const [name, pool] of pools) {
-  suite.add(name, async () => {
+for (const [
+  name,
+  workerType,
+  poolType,
+  poolSize,
+  poolOptions
+] of benchmarkSettings) {
+  poolifierSuite.add(name, async () => {
+    const pool = buildPoolifierPool(workerType, poolType, poolSize, poolOptions)
     await runPoolifierTest(pool, {
       taskExecutions,
       workerData
     })
+    await pool.destroy()
   })
 }
 
-suite
-  .on('cycle', event => {
-    console.info(event.target.toString())
-  })
-  .on('complete', function () {
-    console.info(
-      'Fastest is ' + LIST_FORMATTER.format(this.filter('fastest').map('name'))
-    )
-    // eslint-disable-next-line n/no-process-exit
-    process.exit()
-  })
-  .run({ async: true, maxTime: 120 })
+poolifierSuite.run({ async: true })
index 0713392631eda5f48a37719c1b07b055c6ce6db2..5a5ffd49bf5c05c4ae70a091d22c0fc9a03729c0 100644 (file)
@@ -6,6 +6,7 @@
   - [`pool = new FixedThreadPool/FixedClusterPool(numberOfThreads/numberOfWorkers, filePath, opts)`](#pool--new-fixedthreadpoolfixedclusterpoolnumberofthreadsnumberofworkers-filepath-opts)
   - [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts)
   - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
+  - [`pool.start()`](#poolstart)
   - [`pool.destroy()`](#pooldestroy)
   - [`pool.listTaskFunctions()`](#poollisttaskfunctions)
   - [`PoolOptions`](#pooloptions)
 
 This method is available on both pool implementations and returns a promise with the task function execution response.
 
+### `pool.start()`
+
+This method is available on both pool implementations and will start the minimum number of workers.
+
 ### `pool.destroy()`
 
 This method is available on both pool implementations and will call the terminate method on each worker.
@@ -58,6 +63,8 @@ An object with these properties:
 - `messageHandler` (optional) - A function that will listen for message event on each worker
 - `errorHandler` (optional) - A function that will listen for error event on each worker
 - `exitHandler` (optional) - A function that will listen for exit event on each worker
+- `startWorkers` (optional) - Start the minimum number of workers at pool creation.  
+  Default: `true`
 - `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool:
 
   - `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
@@ -95,8 +102,10 @@ An object with these properties:
 
   - `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer.
   - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
+  - `tasksStealing` (optional) - Tasks stealing enablement.
+  - `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement on back pressure.
 
-  Default: `{ size: (pool maximum size)^2, concurrency: 1 }`
+  Default: `{ size: (pool maximum size)^2, concurrency: 1, tasksStealing: true, tasksStealingOnBackPressure: true }`
 
 #### `ThreadPoolOptions extends PoolOptions`
 
index 6457b1554ddbf5bfeb0156a0d40e9bb108dd02dc..69df163bb5775d7ee1c38fe2c8ebd4594ced165e 100644 (file)
@@ -92,14 +92,14 @@ export abstract class AbstractPool<
    */
   protected readonly max?: number
 
-  /**
-   * Whether the pool is starting or not.
-   */
-  private readonly starting: boolean
   /**
    * Whether the pool is started or not.
    */
   private started: boolean
+  /**
+   * Whether the pool is starting or not.
+   */
+  private starting: boolean
   /**
    * The start timestamp of the pool.
    */
@@ -145,10 +145,11 @@ export abstract class AbstractPool<
 
     this.setupHook()
 
-    this.starting = true
-    this.startPool()
+    this.started = false
     this.starting = false
-    this.started = true
+    if (this.opts.startWorkers === true) {
+      this.start()
+    }
 
     this.startTimestamp = performance.now()
   }
@@ -212,6 +213,7 @@ export abstract class AbstractPool<
 
   private checkPoolOptions (opts: PoolOptions<Worker>): void {
     if (isPlainObject(opts)) {
+      this.opts.startWorkers = opts.startWorkers ?? true
       this.opts.workerChoiceStrategy =
         opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
       this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
@@ -314,11 +316,6 @@ export abstract class AbstractPool<
         `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
       )
     }
-    if (tasksQueueOptions?.queueMaxSize != null) {
-      throw new Error(
-        'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
-      )
-    }
     if (
       tasksQueueOptions?.size != null &&
       !Number.isSafeInteger(tasksQueueOptions?.size)
@@ -334,24 +331,13 @@ export abstract class AbstractPool<
     }
   }
 
-  private startPool (): void {
-    while (
-      this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          !workerNode.info.dynamic ? accumulator + 1 : accumulator,
-        0
-      ) < this.numberOfWorkers
-    ) {
-      this.createAndSetupWorkerNode()
-    }
-  }
-
   /** @inheritDoc */
   public get info (): PoolInfo {
     return {
       version,
       type: this.type,
       worker: this.worker,
+      started: this.started,
       ready: this.ready,
       strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
       minSize: this.minSize,
@@ -675,7 +661,9 @@ export abstract class AbstractPool<
     return {
       ...{
         size: Math.pow(this.maxSize, 2),
-        concurrency: 1
+        concurrency: 1,
+        tasksStealing: true,
+        tasksStealingOnBackPressure: true
       },
       ...tasksQueueOptions
     }
@@ -751,7 +739,7 @@ export abstract class AbstractPool<
   ): Promise<Response> {
     return await new Promise<Response>((resolve, reject) => {
       if (!this.started) {
-        reject(new Error('Cannot execute a task on destroyed pool'))
+        reject(new Error('Cannot execute a task on not started pool'))
         return
       }
       if (name != null && typeof name !== 'string') {
@@ -798,6 +786,22 @@ export abstract class AbstractPool<
     })
   }
 
+  /** @inheritdoc */
+  public start (): void {
+    this.starting = true
+    while (
+      this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+        0
+      ) < this.numberOfWorkers
+    ) {
+      this.createAndSetupWorkerNode()
+    }
+    this.starting = false
+    this.started = true
+  }
+
   /** @inheritDoc */
   public async destroy (): Promise<void> {
     await Promise.all(
@@ -1166,10 +1170,14 @@ export abstract class AbstractPool<
     // Send the statistics message to worker.
     this.sendStatisticsMessageToWorker(workerNodeKey)
     if (this.opts.enableTasksQueue === true) {
-      this.workerNodes[workerNodeKey].onEmptyQueue =
-        this.taskStealingOnEmptyQueue.bind(this)
-      this.workerNodes[workerNodeKey].onBackPressure =
-        this.tasksStealingOnBackPressure.bind(this)
+      if (this.opts.tasksQueueOptions?.tasksStealing === true) {
+        this.workerNodes[workerNodeKey].onEmptyQueue =
+          this.taskStealingOnEmptyQueue.bind(this)
+      }
+      if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
+        this.workerNodes[workerNodeKey].onBackPressure =
+          this.tasksStealingOnBackPressure.bind(this)
+      }
     }
   }
 
index 7a10373393786ac56c0e7f9a98b69f81639cbbe1..f76be0c0d61bb04632439cf4d96ef6f5959e0e72 100644 (file)
@@ -63,6 +63,7 @@ export interface PoolInfo {
   readonly version: string
   readonly type: PoolType
   readonly worker: WorkerType
+  readonly started: boolean
   readonly ready: boolean
   readonly strategy: WorkerChoiceStrategy
   readonly minSize: number
@@ -106,16 +107,24 @@ export interface TasksQueueOptions {
    * @defaultValue (pool maximum size)^2
    */
   readonly size?: number
-  /**
-   * @deprecated Use `size` instead.
-   */
-  readonly queueMaxSize?: number
   /**
    * Maximum number of tasks that can be executed concurrently on a worker node.
    *
    * @defaultValue 1
    */
   readonly concurrency?: number
+  /**
+   * Whether to enable tasks stealing.
+   *
+   * @defaultValue true
+   */
+  readonly tasksStealing?: boolean
+  /**
+   * Whether to enable tasks stealing on back pressure.
+   *
+   * @defaultValue true
+   */
+  readonly tasksStealingOnBackPressure?: boolean
 }
 
 /**
@@ -140,6 +149,12 @@ export interface PoolOptions<Worker extends IWorker> {
    * A function that will listen for exit event on each worker.
    */
   exitHandler?: ExitHandler<Worker>
+  /**
+   * Whether to start the minimum number of workers at pool initialization.
+   *
+   * @defaultValue false
+   */
+  startWorkers?: boolean
   /**
    * The worker choice strategy to use in this pool.
    *
@@ -229,6 +244,10 @@ export interface IPool<
     name?: string,
     transferList?: TransferListItem[]
   ) => Promise<Response>
+  /**
+   * Starts the minimum number of workers in this pool.
+   */
+  readonly start: () => void
   /**
    * Terminates all workers in this pool.
    */
index ca275dedab806b9a4cc2c9c6aee5d3cf550b99f5..68c8cab2ef97070ccea235dfd391ed5479978120 100644 (file)
@@ -46,6 +46,7 @@ implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public onEmptyQueue?: WorkerNodeEventCallback
   private readonly tasksQueue: Deque<Task<Data>>
+  private onBackPressureStarted: boolean
   private onEmptyQueueCount: number
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
 
@@ -65,6 +66,7 @@ implements IWorkerNode<Worker, Data> {
     }
     this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
     this.tasksQueue = new Deque<Task<Data>>()
+    this.onBackPressureStarted = false
     this.onEmptyQueueCount = 0
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
   }
@@ -77,8 +79,14 @@ implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public enqueueTask (task: Task<Data>): number {
     const tasksQueueSize = this.tasksQueue.push(task)
-    if (this.onBackPressure != null && this.hasBackPressure()) {
+    if (
+      this.onBackPressure != null &&
+      this.hasBackPressure() &&
+      !this.onBackPressureStarted
+    ) {
+      this.onBackPressureStarted = true
       this.onBackPressure(this.info.id as number)
+      this.onBackPressureStarted = false
     }
     return tasksQueueSize
   }
@@ -86,8 +94,14 @@ implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public unshiftTask (task: Task<Data>): number {
     const tasksQueueSize = this.tasksQueue.unshift(task)
-    if (this.onBackPressure != null && this.hasBackPressure()) {
+    if (
+      this.onBackPressure != null &&
+      this.hasBackPressure() &&
+      !this.onBackPressureStarted
+    ) {
+      this.onBackPressureStarted = true
       this.onBackPressure(this.info.id as number)
+      this.onBackPressureStarted = false
     }
     return tasksQueueSize
   }
@@ -95,7 +109,11 @@ implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public dequeueTask (): Task<Data> | undefined {
     const task = this.tasksQueue.shift()
-    if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+    if (
+      this.onEmptyQueue != null &&
+      this.tasksQueue.size === 0 &&
+      this.onEmptyQueueCount === 0
+    ) {
       this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
     }
     return task
@@ -104,7 +122,11 @@ implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public popTask (): Task<Data> | undefined {
     const task = this.tasksQueue.pop()
-    if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+    if (
+      this.onEmptyQueue != null &&
+      this.tasksQueue.size === 0 &&
+      this.onEmptyQueueCount === 0
+    ) {
       this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
     }
     return task
@@ -169,8 +191,8 @@ implements IWorkerNode<Worker, Data> {
       this.onEmptyQueueCount = 0
       return
     }
-    (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number)
     ++this.onEmptyQueueCount
+    this.onEmptyQueue?.(this.info.id as number)
     await sleep(exponentialDelay(this.onEmptyQueueCount))
     await this.startOnEmptyQueue()
   }
index bedb1c6c42a9121697990121185c049c26ed87a4..df109fe1982e4c511e40a73b9e5449683a18ae00 100644 (file)
@@ -16,6 +16,7 @@ const { Deque } = require('../../../lib/deque')
 const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
 const { version } = require('../../../package.json')
 const { waitPoolEvents } = require('../../test-utils')
+const { WorkerNode } = require('../../../lib/pools/worker-node')
 
 describe('Abstract pool test suite', () => {
   const numberOfWorkers = 2
@@ -180,18 +181,18 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/thread/testWorker.js'
     )
     expect(pool.emitter).toBeInstanceOf(EventEmitter)
-    expect(pool.opts.enableEvents).toBe(true)
-    expect(pool.opts.restartWorkerOnError).toBe(true)
-    expect(pool.opts.enableTasksQueue).toBe(false)
-    expect(pool.opts.tasksQueueOptions).toBeUndefined()
-    expect(pool.opts.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.ROUND_ROBIN
-    )
-    expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
-      retries: 6,
-      runTime: { median: false },
-      waitTime: { median: false },
-      elu: { median: false }
+    expect(pool.opts).toStrictEqual({
+      startWorkers: true,
+      enableEvents: true,
+      restartWorkerOnError: true,
+      enableTasksQueue: false,
+      workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+      workerChoiceStrategyOptions: {
+        retries: 6,
+        runTime: { median: false },
+        waitTime: { median: false },
+        elu: { median: false }
+      }
     })
     expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
       retries: 6,
@@ -208,10 +209,6 @@ describe('Abstract pool test suite', () => {
         elu: { median: false }
       })
     }
-    expect(pool.opts.messageHandler).toBeUndefined()
-    expect(pool.opts.errorHandler).toBeUndefined()
-    expect(pool.opts.onlineHandler).toBeUndefined()
-    expect(pool.opts.exitHandler).toBeUndefined()
     await pool.destroy()
     const testHandler = () => console.info('test handler executed')
     pool = new FixedThreadPool(
@@ -234,22 +231,29 @@ describe('Abstract pool test suite', () => {
       }
     )
     expect(pool.emitter).toBeUndefined()
-    expect(pool.opts.enableEvents).toBe(false)
-    expect(pool.opts.restartWorkerOnError).toBe(false)
-    expect(pool.opts.enableTasksQueue).toBe(true)
-    expect(pool.opts.tasksQueueOptions).toStrictEqual({
-      concurrency: 2,
-      size: 4
-    })
-    expect(pool.opts.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LEAST_USED
-    )
-    expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
-      retries: 6,
-      runTime: { median: true },
-      waitTime: { median: false },
-      elu: { median: false },
-      weights: { 0: 300, 1: 200 }
+    expect(pool.opts).toStrictEqual({
+      startWorkers: true,
+      enableEvents: false,
+      restartWorkerOnError: false,
+      enableTasksQueue: true,
+      tasksQueueOptions: {
+        concurrency: 2,
+        size: 4,
+        tasksStealing: true,
+        tasksStealingOnBackPressure: true
+      },
+      workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
+      workerChoiceStrategyOptions: {
+        retries: 6,
+        runTime: { median: true },
+        waitTime: { median: false },
+        elu: { median: false },
+        weights: { 0: 300, 1: 200 }
+      },
+      onlineHandler: testHandler,
+      messageHandler: testHandler,
+      errorHandler: testHandler,
+      exitHandler: testHandler
     })
     expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
       retries: 6,
@@ -268,10 +272,6 @@ describe('Abstract pool test suite', () => {
         weights: { 0: 300, 1: 200 }
       })
     }
-    expect(pool.opts.messageHandler).toStrictEqual(testHandler)
-    expect(pool.opts.errorHandler).toStrictEqual(testHandler)
-    expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
-    expect(pool.opts.exitHandler).toStrictEqual(testHandler)
     await pool.destroy()
   })
 
@@ -404,21 +404,6 @@ describe('Abstract pool test suite', () => {
     ).toThrowError(
       new TypeError('Invalid worker node tasks concurrency: must be an integer')
     )
-    expect(
-      () =>
-        new FixedThreadPool(
-          numberOfWorkers,
-          './tests/worker-files/thread/testWorker.js',
-          {
-            enableTasksQueue: true,
-            tasksQueueOptions: { queueMaxSize: 2 }
-          }
-        )
-    ).toThrowError(
-      new Error(
-        'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
-      )
-    )
     expect(
       () =>
         new FixedThreadPool(
@@ -649,13 +634,17 @@ describe('Abstract pool test suite', () => {
     expect(pool.opts.enableTasksQueue).toBe(true)
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 1,
-      size: 4
+      size: 4,
+      tasksStealing: true,
+      tasksStealingOnBackPressure: true
     })
     pool.enableTasksQueue(true, { concurrency: 2 })
     expect(pool.opts.enableTasksQueue).toBe(true)
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 2,
-      size: 4
+      size: 4,
+      tasksStealing: true,
+      tasksStealingOnBackPressure: true
     })
     pool.enableTasksQueue(false)
     expect(pool.opts.enableTasksQueue).toBe(false)
@@ -671,12 +660,16 @@ describe('Abstract pool test suite', () => {
     )
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 1,
-      size: 4
+      size: 4,
+      tasksStealing: true,
+      tasksStealingOnBackPressure: true
     })
     pool.setTasksQueueOptions({ concurrency: 2 })
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 2,
-      size: 4
+      size: 4,
+      tasksStealing: true,
+      tasksStealingOnBackPressure: true
     })
     expect(() =>
       pool.setTasksQueueOptions('invalidTasksQueueOptions')
@@ -696,11 +689,6 @@ describe('Abstract pool test suite', () => {
     expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
       new TypeError('Invalid worker node tasks concurrency: must be an integer')
     )
-    expect(() => pool.setTasksQueueOptions({ queueMaxSize: 2 })).toThrowError(
-      new Error(
-        'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
-      )
-    )
     expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
       new RangeError(
         'Invalid worker node tasks queue size: 0 is a negative integer or zero'
@@ -726,6 +714,7 @@ describe('Abstract pool test suite', () => {
       version,
       type: PoolTypes.fixed,
       worker: WorkerTypes.thread,
+      started: true,
       ready: true,
       strategy: WorkerChoiceStrategies.ROUND_ROBIN,
       minSize: numberOfWorkers,
@@ -747,6 +736,7 @@ describe('Abstract pool test suite', () => {
       version,
       type: PoolTypes.dynamic,
       worker: WorkerTypes.cluster,
+      started: true,
       ready: true,
       strategy: WorkerChoiceStrategies.ROUND_ROBIN,
       minSize: Math.floor(numberOfWorkers / 2),
@@ -767,6 +757,7 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/cluster/testWorker.js'
     )
     for (const workerNode of pool.workerNodes) {
+      expect(workerNode).toBeInstanceOf(WorkerNode)
       expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: 0,
@@ -801,7 +792,7 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/cluster/testWorker.js'
     )
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.tasksQueue).toBeDefined()
+      expect(workerNode).toBeInstanceOf(WorkerNode)
       expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
       expect(workerNode.tasksQueue.size).toBe(0)
       expect(workerNode.tasksQueue.maxSize).toBe(0)
@@ -813,7 +804,7 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/thread/testWorker.js'
     )
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.tasksQueue).toBeDefined()
+      expect(workerNode).toBeInstanceOf(WorkerNode)
       expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
       expect(workerNode.tasksQueue.size).toBe(0)
       expect(workerNode.tasksQueue.maxSize).toBe(0)
@@ -827,6 +818,7 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/cluster/testWorker.js'
     )
     for (const workerNode of pool.workerNodes) {
+      expect(workerNode).toBeInstanceOf(WorkerNode)
       expect(workerNode.info).toStrictEqual({
         id: expect.any(Number),
         type: WorkerTypes.cluster,
@@ -841,6 +833,7 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/thread/testWorker.js'
     )
     for (const workerNode of pool.workerNodes) {
+      expect(workerNode).toBeInstanceOf(WorkerNode)
       expect(workerNode.info).toStrictEqual({
         id: expect.any(Number),
         type: WorkerTypes.thread,
@@ -851,6 +844,30 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify that pool can be started after initialization', async () => {
+    const pool = new FixedClusterPool(
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js',
+      {
+        startWorkers: false
+      }
+    )
+    expect(pool.info.started).toBe(false)
+    expect(pool.info.ready).toBe(false)
+    expect(pool.workerNodes).toStrictEqual([])
+    await expect(pool.execute()).rejects.toThrowError(
+      new Error('Cannot execute a task on not started pool')
+    )
+    pool.start()
+    expect(pool.info.started).toBe(true)
+    expect(pool.info.ready).toBe(true)
+    expect(pool.workerNodes.length).toBe(numberOfWorkers)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode).toBeInstanceOf(WorkerNode)
+    }
+    await pool.destroy()
+  })
+
   it('Verify that pool execute() arguments are checked', async () => {
     const pool = new FixedClusterPool(
       numberOfWorkers,
@@ -869,8 +886,8 @@ describe('Abstract pool test suite', () => {
       "Task function 'unknown' not found"
     )
     await pool.destroy()
-    await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
-      new Error('Cannot execute a task on destroyed pool')
+    await expect(pool.execute()).rejects.toThrowError(
+      new Error('Cannot execute a task on not started pool')
     )
   })
 
@@ -1038,6 +1055,7 @@ describe('Abstract pool test suite', () => {
       version,
       type: PoolTypes.dynamic,
       worker: WorkerTypes.cluster,
+      started: true,
       ready: true,
       strategy: WorkerChoiceStrategies.ROUND_ROBIN,
       minSize: expect.any(Number),
@@ -1075,7 +1093,8 @@ describe('Abstract pool test suite', () => {
       version,
       type: PoolTypes.fixed,
       worker: WorkerTypes.thread,
-      ready: expect.any(Boolean),
+      started: true,
+      ready: true,
       strategy: WorkerChoiceStrategies.ROUND_ROBIN,
       minSize: expect.any(Number),
       maxSize: expect.any(Number),
@@ -1111,7 +1130,8 @@ describe('Abstract pool test suite', () => {
       version,
       type: PoolTypes.dynamic,
       worker: WorkerTypes.thread,
-      ready: expect.any(Boolean),
+      started: true,
+      ready: true,
       strategy: WorkerChoiceStrategies.ROUND_ROBIN,
       minSize: expect.any(Number),
       maxSize: expect.any(Number),
@@ -1150,7 +1170,8 @@ describe('Abstract pool test suite', () => {
       version,
       type: PoolTypes.fixed,
       worker: WorkerTypes.thread,
-      ready: expect.any(Boolean),
+      started: true,
+      ready: true,
       strategy: WorkerChoiceStrategies.ROUND_ROBIN,
       minSize: expect.any(Number),
       maxSize: expect.any(Number),