fix: fix continuous tasks stealing on idle start at worker node idling
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 24 Nov 2023 23:51:49 +0000 (00:51 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 24 Nov 2023 23:51:49 +0000 (00:51 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
pnpm-lock.yaml
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
tests/pools/abstract-pool.test.mjs
tests/pools/cluster/fixed.test.mjs
tests/pools/selection-strategies/selection-strategies.test.mjs
tests/pools/thread/fixed.test.mjs
tests/pools/worker-node.test.mjs

index 49935144bd9349bc0e177271fc55586eb95e560e..bb18720ddb35adb16648735667bf3bd6c5859dec 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Ensure continuous tasks stealing on idle start at worker node idling
+
 ## [3.0.7] - 2023-11-24
 
 ### Changed
index bf018251c4c7ab20387ce026d181435fae50da52..a5b81c72eb74436a9b20a000b2888348f3bb154b 100644 (file)
@@ -1813,8 +1813,8 @@ packages:
       restore-cursor: 4.0.0
     dev: true
 
-  /cli-spinners@2.9.1:
-    resolution: {integrity: sha512-jHgecW0pxkonBJdrKsqxgRX9AcG+u/5k0Q7WPDfi8AogLAdwxEkyYYNWwZ5GvVFoFx2uiY1eNcSK00fh+1+FyQ==}
+  /cli-spinners@2.9.2:
+    resolution: {integrity: sha512-ywqV+5MmyL4E7ybXgKys4DugZbX0FC6LnwrhjuykIjnK9k8OQacQ7axGKnjDXWNhns0xot3bZI5h55H8yo9cJg==}
     engines: {node: '>=6'}
     dev: true
 
@@ -4884,7 +4884,7 @@ packages:
       bl: 4.1.0
       chalk: 4.1.2
       cli-cursor: 3.1.0
-      cli-spinners: 2.9.1
+      cli-spinners: 2.9.2
       is-interactive: 1.0.0
       is-unicode-supported: 0.1.0
       log-symbols: 4.1.0
@@ -4898,7 +4898,7 @@ packages:
     dependencies:
       chalk: 5.3.0
       cli-cursor: 4.0.0
-      cli-spinners: 2.9.1
+      cli-spinners: 2.9.2
       is-interactive: 2.0.0
       is-unicode-supported: 1.3.0
       log-symbols: 5.1.0
index 8c2dcc31d2e849c0fdbf4e0ed9d8bfa297c585de..ab319c627884c25c4851dc34ecb2626ebc528fc3 100644 (file)
@@ -12,12 +12,14 @@ import {
   DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
   average,
+  exponentialDelay,
   isKillBehavior,
   isPlainObject,
   max,
   median,
   min,
-  round
+  round,
+  sleep
 } from '../utils'
 import { KillBehaviors } from '../worker/worker-options'
 import type { TaskFunction } from '../worker/task-functions'
@@ -1478,11 +1480,79 @@ export abstract class AbstractPool<
     }
   }
 
+  private updateTaskSequentiallyStolenStatisticsWorkerUsage (
+    workerNodeKey: number,
+    taskName: string
+  ): void {
+    const workerNode = this.workerNodes[workerNodeKey]
+    if (workerNode?.usage != null) {
+      ++workerNode.usage.tasks.sequentiallyStolen
+    }
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      workerNode.getTaskFunctionWorkerUsage(taskName) != null
+    ) {
+      const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+        taskName
+      ) as WorkerUsage
+      ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+    }
+  }
+
+  private resetTaskSequentiallyStolenStatisticsWorkerUsage (
+    workerNodeKey: number,
+    taskName: string
+  ): void {
+    const workerNode = this.workerNodes[workerNodeKey]
+    if (workerNode?.usage != null) {
+      workerNode.usage.tasks.sequentiallyStolen = 0
+    }
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      workerNode.getTaskFunctionWorkerUsage(taskName) != null
+    ) {
+      const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+        taskName
+      ) as WorkerUsage
+      taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+    }
+  }
+
   private readonly handleIdleWorkerNodeEvent = (
-    event: CustomEvent<WorkerNodeEventDetail>
+    event: CustomEvent<WorkerNodeEventDetail>,
+    previousStolenTask?: Task<Data>
   ): void => {
-    const { workerId } = event.detail
-    const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+    const { workerNodeKey } = event.detail
+    if (workerNodeKey == null) {
+      throw new Error(
+        'WorkerNode event detail workerNodeKey attribute must be defined'
+      )
+    }
+    const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+    if (
+      previousStolenTask != null &&
+      workerNodeTasksUsage.sequentiallyStolen > 0 &&
+      (workerNodeTasksUsage.executing > 0 ||
+        this.tasksQueueSize(workerNodeKey) > 0)
+    ) {
+      this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+        workerNodeKey,
+        previousStolenTask.name as string
+      )
+      return
+    }
+    const stolenTask = this.workerNodeStealTask(workerNodeKey)
+    sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
+      .then(() => {
+        this.handleIdleWorkerNodeEvent(event, stolenTask)
+        return undefined
+      })
+      .catch(EMPTY_FUNCTION)
+  }
+
+  private readonly workerNodeStealTask = (
+    workerNodeKey: number
+  ): Task<Data> | undefined => {
     const workerNodes = this.workerNodes
       .slice()
       .sort(
@@ -1490,22 +1560,27 @@ export abstract class AbstractPool<
           workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
       )
     const sourceWorkerNode = workerNodes.find(
-      workerNode =>
-        workerNode.info.ready &&
-        workerNode.info.id !== workerId &&
-        workerNode.usage.tasks.queued > 0
+      (sourceWorkerNode, sourceWorkerNodeKey) =>
+        sourceWorkerNode.info.ready &&
+        sourceWorkerNodeKey !== workerNodeKey &&
+        sourceWorkerNode.usage.tasks.queued > 0
     )
     if (sourceWorkerNode != null) {
       const task = sourceWorkerNode.popTask() as Task<Data>
-      if (this.shallExecuteTask(destinationWorkerNodeKey)) {
-        this.executeTask(destinationWorkerNodeKey, task)
+      if (this.shallExecuteTask(workerNodeKey)) {
+        this.executeTask(workerNodeKey, task)
       } else {
-        this.enqueueTask(destinationWorkerNodeKey, task)
+        this.enqueueTask(workerNodeKey, task)
       }
+      this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
+        workerNodeKey,
+        task.name as string
+      )
       this.updateTaskStolenStatisticsWorkerUsage(
-        destinationWorkerNodeKey,
+        workerNodeKey,
         task.name as string
       )
+      return task
     }
   }
 
@@ -1567,16 +1642,15 @@ export abstract class AbstractPool<
   }
 
   private handleWorkerReadyResponse (message: MessageValue<Response>): void {
-    if (message.ready === false) {
-      throw new Error(
-        `Worker ${message.workerId as number} failed to initialize`
-      )
+    const { workerId, ready, taskFunctionNames } = message
+    if (ready === false) {
+      throw new Error(`Worker ${workerId as number} failed to initialize`)
     }
     const workerInfo = this.getWorkerInfo(
-      this.getWorkerNodeKeyByWorkerId(message.workerId)
+      this.getWorkerNodeKeyByWorkerId(workerId)
     )
-    workerInfo.ready = message.ready as boolean
-    workerInfo.taskFunctionNames = message.taskFunctionNames
+    workerInfo.ready = ready as boolean
+    workerInfo.taskFunctionNames = taskFunctionNames
     if (!this.readyEventEmitted && this.ready) {
       this.readyEventEmitted = true
       this.emitter?.emit(PoolEvents.ready, this.info)
@@ -1584,7 +1658,7 @@ export abstract class AbstractPool<
   }
 
   private handleTaskExecutionResponse (message: MessageValue<Response>): void {
-    const { taskId, workerError, data } = message
+    const { workerId, taskId, workerError, data } = message
     const promiseResponse = this.promiseResponseMap.get(taskId as string)
     if (promiseResponse != null) {
       const { resolve, reject, workerNodeKey } = promiseResponse
@@ -1597,16 +1671,29 @@ export abstract class AbstractPool<
       this.afterTaskExecutionHook(workerNodeKey, message)
       this.workerChoiceStrategyContext.update(workerNodeKey)
       this.promiseResponseMap.delete(taskId as string)
-      if (
-        this.opts.enableTasksQueue === true &&
-        this.tasksQueueSize(workerNodeKey) > 0 &&
-        this.workerNodes[workerNodeKey].usage.tasks.executing <
-          (this.opts.tasksQueueOptions?.concurrency as number)
-      ) {
-        this.executeTask(
-          workerNodeKey,
-          this.dequeueTask(workerNodeKey) as Task<Data>
-        )
+      if (this.opts.enableTasksQueue === true) {
+        const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+        if (
+          this.tasksQueueSize(workerNodeKey) > 0 &&
+          workerNodeTasksUsage.executing <
+            (this.opts.tasksQueueOptions?.concurrency as number)
+        ) {
+          this.executeTask(
+            workerNodeKey,
+            this.dequeueTask(workerNodeKey) as Task<Data>
+          )
+        }
+        if (
+          workerNodeTasksUsage.executing === 0 &&
+          this.tasksQueueSize(workerNodeKey) === 0 &&
+          workerNodeTasksUsage.sequentiallyStolen === 0
+        ) {
+          this.workerNodes[workerNodeKey].dispatchEvent(
+            new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
+              detail: { workerId: workerId as number, workerNodeKey }
+            })
+          )
+        }
       }
     }
   }
index 34a647fbd42f0b8b47fcc32d99d3784ce95242b8..3de3cb809cda9a3a3f2787579371a467e3f9d200 100644 (file)
@@ -1,14 +1,7 @@
 import { MessageChannel } from 'node:worker_threads'
 import { CircularArray } from '../circular-array'
 import type { Task } from '../utility-types'
-import {
-  DEFAULT_TASK_NAME,
-  EMPTY_FUNCTION,
-  exponentialDelay,
-  getWorkerId,
-  getWorkerType,
-  sleep
-} from '../utils'
+import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
 import { Deque } from '../deque'
 import {
   type IWorker,
@@ -45,7 +38,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   public tasksQueueBackPressureSize: number
   private readonly tasksQueue: Deque<Task<Data>>
   private onBackPressureStarted: boolean
-  private onIdleWorkerNodeCount: number
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
 
   /**
@@ -66,7 +58,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
     this.tasksQueue = new Deque<Task<Data>>()
     this.onBackPressureStarted = false
-    this.onIdleWorkerNodeCount = 0
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
   }
 
@@ -107,20 +98,12 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
 
   /** @inheritdoc */
   public dequeueTask (): Task<Data> | undefined {
-    const task = this.tasksQueue.shift()
-    if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
-      this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
-    }
-    return task
+    return this.tasksQueue.shift()
   }
 
   /** @inheritdoc */
   public popTask (): Task<Data> | undefined {
-    const task = this.tasksQueue.pop()
-    if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
-      this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
-    }
-    return task
+    return this.tasksQueue.pop()
   }
 
   /** @inheritdoc */
@@ -179,28 +162,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     return this.taskFunctionsUsage.delete(name)
   }
 
-  private async startOnIdleWorkerNode (): Promise<void> {
-    if (
-      this.onIdleWorkerNodeCount > 0 &&
-      (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
-    ) {
-      this.onIdleWorkerNodeCount = 0
-      return
-    }
-    ++this.onIdleWorkerNodeCount
-    this.dispatchEvent(
-      new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
-        detail: { workerId: this.info.id as number }
-      })
-    )
-    await sleep(exponentialDelay(this.onIdleWorkerNodeCount))
-    await this.startOnIdleWorkerNode()
-  }
-
-  private isIdle (): boolean {
-    return this.usage.tasks.executing === 0 && this.tasksQueue.size === 0
-  }
-
   private initWorkerInfo (worker: Worker): WorkerInfo {
     return {
       id: getWorkerId(worker),
@@ -227,6 +188,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
         get maxQueued (): number {
           return getTasksQueueMaxSize()
         },
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },
@@ -268,6 +230,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
         get queued (): number {
           return getTaskFunctionQueueSize()
         },
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },
index b3e02f3ceb5ce8631848e65fcfcba871c4dd9e3c..776986d5000f2c2d33cdf8f9daf444038df74ef6 100644 (file)
@@ -104,6 +104,10 @@ export interface TaskStatistics {
    * Maximum number of queued tasks.
    */
   readonly maxQueued?: number
+  /**
+   * Number of sequentially stolen tasks.
+   */
+  sequentiallyStolen: number
   /**
    * Number of stolen tasks.
    */
@@ -223,6 +227,7 @@ export interface IWorker {
  */
 export interface WorkerNodeEventDetail {
   workerId: number
+  workerNodeKey?: number
 }
 
 /**
index 1246a7d05bcc3cb06b81443fb50b9929216c9a0d..04e15c09f40b437d44cb019f52c64e25e3801a21 100644 (file)
@@ -807,6 +807,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          sequentiallyStolen: 0,
           stolen: 0,
           failed: 0
         },
@@ -972,6 +973,7 @@ describe('Abstract pool test suite', () => {
           executing: maxMultiplier,
           queued: 0,
           maxQueued: 0,
+          sequentiallyStolen: 0,
           stolen: 0,
           failed: 0
         },
@@ -999,6 +1001,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          sequentiallyStolen: 0,
           stolen: 0,
           failed: 0
         },
@@ -1040,6 +1043,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          sequentiallyStolen: 0,
           stolen: 0,
           failed: 0
         },
@@ -1075,6 +1079,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          sequentiallyStolen: 0,
           stolen: 0,
           failed: 0
         },
@@ -1342,6 +1347,7 @@ describe('Abstract pool test suite', () => {
           executing: 0,
           queued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: {
@@ -1516,6 +1522,7 @@ describe('Abstract pool test suite', () => {
             executing: 0,
             failed: 0,
             queued: 0,
+            sequentiallyStolen: 0,
             stolen: 0
           },
           runTime: {
index c9b528b9c2e5b57fedec5cdfc81f71e6f50b7ece..a79be4a39a7b37a6b5d2c3690786103fb25662f0 100644 (file)
@@ -130,6 +130,7 @@ describe('Fixed cluster pool test suite', () => {
       expect(workerNode.usage.tasks.maxQueued).toBe(
         maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
       )
+      expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
       expect(workerNode.usage.tasks.stolen).toBe(0)
     }
     expect(queuePool.info.executedTasks).toBe(0)
@@ -157,6 +158,12 @@ describe('Fixed cluster pool test suite', () => {
       expect(workerNode.usage.tasks.maxQueued).toBe(
         maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
       )
+      expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
+        0
+      )
+      expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
+        numberOfWorkers * maxMultiplier
+      )
       expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
       expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
         numberOfWorkers * maxMultiplier
index fc8a6feb65c1bf21d33b4796e9ecaecdbe3a2716..4a2ace0469d1da108fe49e3c5f776ab3ad8a5825 100644 (file)
@@ -278,6 +278,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: {
@@ -333,6 +334,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: {
@@ -558,6 +560,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: {
@@ -616,6 +619,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: {
@@ -755,6 +759,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -823,6 +828,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -972,6 +978,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: {
@@ -1046,6 +1053,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: {
@@ -1201,6 +1209,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -1286,6 +1295,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -1376,6 +1386,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -1576,6 +1587,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -1654,6 +1666,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -1737,6 +1750,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -1999,6 +2013,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -2084,6 +2099,7 @@ describe('Selection strategies test suite', () => {
           queued: 0,
           maxQueued: 0,
           stolen: 0,
+          sequentiallyStolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
index 6b46512ccd4184404d41087e60884fbf814364aa..6a435ffea4b7908a4b09f0fd8ff73f44810985b1 100644 (file)
@@ -130,6 +130,7 @@ describe('Fixed thread pool test suite', () => {
       expect(workerNode.usage.tasks.maxQueued).toBe(
         maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
       )
+      expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
       expect(workerNode.usage.tasks.stolen).toBe(0)
     }
     expect(queuePool.info.executedTasks).toBe(0)
@@ -157,6 +158,12 @@ describe('Fixed thread pool test suite', () => {
       expect(workerNode.usage.tasks.maxQueued).toBe(
         maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
       )
+      expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
+        0
+      )
+      expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
+        numberOfThreads * maxMultiplier
+      )
       expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
       expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
         numberOfThreads * maxMultiplier
index b72c28872dc65b04f55354403cd423ea1b369916..834c3cb6b642844babf0f2a232edfd1d618caa9d 100644 (file)
@@ -58,6 +58,7 @@ describe('Worker node test suite', () => {
         executing: 0,
         queued: 0,
         maxQueued: 0,
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },
@@ -84,7 +85,6 @@ describe('Worker node test suite', () => {
       threadWorkerNode.tasksQueue.size
     )
     expect(threadWorkerNode.onBackPressureStarted).toBe(false)
-    expect(threadWorkerNode.onIdleWorkerNodeCount).toBe(0)
     expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
 
     expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
@@ -101,6 +101,7 @@ describe('Worker node test suite', () => {
         executing: 0,
         queued: 0,
         maxQueued: 0,
+        sequentiallyStolen: 0,
         stolen: 0,
         failed: 0
       },
@@ -127,7 +128,6 @@ describe('Worker node test suite', () => {
       clusterWorkerNode.tasksQueue.size
     )
     expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
-    expect(clusterWorkerNode.onIdleWorkerNodeCount).toBe(0)
     expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
   })
 
@@ -156,6 +156,7 @@ describe('Worker node test suite', () => {
         executing: 0,
         queued: 0,
         stolen: 0,
+        sequentiallyStolen: 0,
         failed: 0
       },
       runTime: {
@@ -179,6 +180,7 @@ describe('Worker node test suite', () => {
         executing: 0,
         queued: 0,
         stolen: 0,
+        sequentiallyStolen: 0,
         failed: 0
       },
       runTime: {
@@ -202,6 +204,7 @@ describe('Worker node test suite', () => {
         executing: 0,
         queued: 0,
         stolen: 0,
+        sequentiallyStolen: 0,
         failed: 0
       },
       runTime: {