refactor: cleanup direct access to worker id
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 27 Aug 2023 17:24:39 +0000 (19:24 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 27 Aug 2023 17:24:39 +0000 (19:24 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/thread/fixed.ts
src/pools/worker-node.ts
src/utils.ts
tests/pools/abstract/worker-node.test.js

index be8ec1a0125644bbcb00b3a5ec48e3056e16ed11..482a20db4fc044d0f118b6d7ae2cc9f5af962de9 100644 (file)
@@ -1427,7 +1427,6 @@ export abstract class AbstractPool<
   private addWorkerNode (worker: Worker): number {
     const workerNode = new WorkerNode<Worker, Data>(
       worker,
-      this.worker,
       this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
     )
     // Flag the worker node as ready at pool startup.
index c7c95c4b3dad537b39dfd44d37481178035fcb4c..75c0853586c21739ff217eb3be4e63c64e7bb271 100644 (file)
@@ -63,7 +63,8 @@ export class FixedClusterPool<
   protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
     this.flushTasksQueue(workerNodeKey)
     // FIXME: wait for tasks to be finished
-    const worker = this.workerNodes[workerNodeKey].worker
+    const workerNode = this.workerNodes[workerNodeKey]
+    const worker = workerNode.worker
     const waitWorkerExit = new Promise<void>((resolve) => {
       worker.on('exit', () => {
         resolve()
@@ -72,7 +73,10 @@ export class FixedClusterPool<
     worker.on('disconnect', () => {
       worker.kill()
     })
-    await this.sendKillMessageToWorker(workerNodeKey, worker.id)
+    await this.sendKillMessageToWorker(
+      workerNodeKey,
+      workerNode.info.id as number
+    )
     worker.disconnect()
     await waitWorkerExit
   }
@@ -89,7 +93,7 @@ export class FixedClusterPool<
   protected sendStartupMessageToWorker (workerNodeKey: number): void {
     this.sendToWorker(workerNodeKey, {
       ready: false,
-      workerId: this.workerNodes[workerNodeKey].worker.id
+      workerId: this.workerNodes[workerNodeKey].info.id as number
     })
   }
 
index 32107e48635314e25269cc37e2c0ad7fdd3fcd75..d69055cf0909efa3b9cf45983163e0be6a35c513 100644 (file)
@@ -67,7 +67,10 @@ export class FixedThreadPool<
         resolve()
       })
     })
-    await this.sendKillMessageToWorker(workerNodeKey, worker.threadId)
+    await this.sendKillMessageToWorker(
+      workerNodeKey,
+      workerNode.info.id as number
+    )
     workerNode.closeChannel()
     await worker.terminate()
     await waitWorkerExit
@@ -86,14 +89,14 @@ export class FixedThreadPool<
 
   /** @inheritDoc */
   protected sendStartupMessageToWorker (workerNodeKey: number): void {
-    const worker = this.workerNodes[workerNodeKey].worker
-    const port2: MessagePort = (
-      this.workerNodes[workerNodeKey].messageChannel as MessageChannel
-    ).port2
+    const workerNode = this.workerNodes[workerNodeKey]
+    const worker = workerNode.worker
+    const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
+      .port2
     worker.postMessage(
       {
         ready: false,
-        workerId: worker.threadId,
+        workerId: workerNode.info.id,
         port: port2
       },
       [port2]
index 03fc0df192f3ce6689306ba3f6eb38c12dabed40..1f46075a07e2a4849b6dcdd4a2f996fcea64e6be 100644 (file)
@@ -5,6 +5,8 @@ import {
   DEFAULT_TASK_NAME,
   EMPTY_FUNCTION,
   exponentialDelay,
+  getWorkerId,
+  getWorkerType,
   sleep
 } from '../utils'
 import { Deque } from '../deque'
@@ -49,22 +51,13 @@ implements IWorkerNode<Worker, Data> {
    * Constructs a new worker node.
    *
    * @param worker - The worker.
-   * @param workerType - The worker type.
    * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
    */
-  constructor (
-    worker: Worker,
-    workerType: WorkerType,
-    tasksQueueBackPressureSize: number
-  ) {
+  constructor (worker: Worker, tasksQueueBackPressureSize: number) {
     if (worker == null) {
       throw new TypeError('Cannot construct a worker node without a worker')
     }
-    if (workerType == null) {
-      throw new TypeError(
-        'Cannot construct a worker node without a worker type'
-      )
-    }
+
     if (tasksQueueBackPressureSize == null) {
       throw new TypeError(
         'Cannot construct a worker node without a tasks queue back pressure size'
@@ -76,9 +69,9 @@ implements IWorkerNode<Worker, Data> {
       )
     }
     this.worker = worker
-    this.info = this.initWorkerInfo(worker, workerType)
+    this.info = this.initWorkerInfo(worker)
     this.usage = this.initWorkerUsage()
-    if (workerType === WorkerTypes.thread) {
+    if (this.info.type === WorkerTypes.thread) {
       this.messageChannel = new MessageChannel()
     }
     this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
@@ -193,10 +186,10 @@ implements IWorkerNode<Worker, Data> {
     await this.startOnEmptyQueue()
   }
 
-  private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
+  private initWorkerInfo (worker: Worker): WorkerInfo {
     return {
-      id: this.getWorkerId(worker, workerType),
-      type: workerType,
+      id: getWorkerId(worker),
+      type: getWorkerType(worker) as WorkerType,
       dynamic: false,
       ready: false
     }
@@ -279,22 +272,4 @@ implements IWorkerNode<Worker, Data> {
       }
     }
   }
-
-  /**
-   * Gets the worker id.
-   *
-   * @param worker - The worker.
-   * @param workerType - The worker type.
-   * @returns The worker id.
-   */
-  private getWorkerId (
-    worker: Worker,
-    workerType: WorkerType
-  ): number | undefined {
-    if (workerType === WorkerTypes.thread) {
-      return worker.threadId
-    } else if (workerType === WorkerTypes.cluster) {
-      return worker.id
-    }
-  }
 }
index b84c4c803efe974b1dde4d8e3988465bef6d94b7..8b88d2afc90424bdc23a4e2746f331295770b15c 100644 (file)
@@ -1,11 +1,18 @@
 import * as os from 'node:os'
 import { webcrypto } from 'node:crypto'
+import { Worker as ClusterWorker } from 'node:cluster'
+import { Worker as ThreadWorker } from 'node:worker_threads'
 import type {
   MeasurementStatisticsRequirements,
   WorkerChoiceStrategyOptions
 } from './pools/selection-strategies/selection-strategies-types'
 import type { KillBehavior } from './worker/worker-options'
-import type { MeasurementStatistics } from './pools/worker'
+import {
+  type IWorker,
+  type MeasurementStatistics,
+  type WorkerType,
+  WorkerTypes
+} from './pools/worker'
 
 /**
  * Default task name.
@@ -109,6 +116,41 @@ export const average = (dataSet: number[]): number => {
   )
 }
 
+/**
+ * Returns the worker type of the given worker.
+ *
+ * @param worker - The worker to get the type of.
+ * @returns The worker type of the given worker.
+ * @internal
+ */
+export const getWorkerType = <Worker extends IWorker>(
+  worker: Worker
+): WorkerType | undefined => {
+  if (worker instanceof ThreadWorker) {
+    return WorkerTypes.thread
+  }
+  if (worker instanceof ClusterWorker) {
+    return WorkerTypes.cluster
+  }
+}
+
+/**
+ * Returns the worker id of the given worker.
+ *
+ * @param worker - The worker to get the id of.
+ * @returns The worker id of the given worker.
+ * @internal
+ */
+export const getWorkerId = <Worker extends IWorker>(
+  worker: Worker
+): number | undefined => {
+  if (worker instanceof ThreadWorker) {
+    return worker.threadId
+  } else if (worker instanceof ClusterWorker) {
+    return worker.id
+  }
+}
+
 /**
  * Computes the median of the given data set.
  *
index ede278f2a89c6eace999bc0837928393f7a5293c..12e05436bb001dfb07fb1421a1e0be1cececa901 100644 (file)
@@ -1,4 +1,5 @@
 const { MessageChannel, Worker } = require('worker_threads')
+const cluster = require('cluster')
 const { expect } = require('expect')
 const { WorkerNode } = require('../../../lib/pools/worker-node')
 const { WorkerTypes } = require('../../../lib')
@@ -7,42 +8,74 @@ const { Deque } = require('../../../lib/deque')
 const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
 
 describe('Worker node test suite', () => {
-  const worker = new Worker('./tests/worker-files/thread/testWorker.js')
-  const workerNode = new WorkerNode(worker, WorkerTypes.thread, 12)
+  const threadWorker = new Worker('./tests/worker-files/thread/testWorker.js')
+  const clusterWorker = cluster.fork()
+  const threadWorkerNode = new WorkerNode(threadWorker, 12)
+  const clusterWorkerNode = new WorkerNode(clusterWorker, 12)
 
   it('Worker node instantiation', () => {
     expect(() => new WorkerNode()).toThrowError(
       new TypeError('Cannot construct a worker node without a worker')
     )
-    expect(() => new WorkerNode(worker)).toThrowError(
-      new TypeError('Cannot construct a worker node without a worker type')
-    )
-    expect(() => new WorkerNode(worker, WorkerTypes.thread)).toThrowError(
+    expect(() => new WorkerNode(threadWorker)).toThrowError(
       new TypeError(
         'Cannot construct a worker node without a tasks queue back pressure size'
       )
     )
     expect(
-      () =>
-        new WorkerNode(
-          worker,
-          WorkerTypes.thread,
-          'invalidTasksQueueBackPressureSize'
-        )
+      () => new WorkerNode(threadWorker, 'invalidTasksQueueBackPressureSize')
     ).toThrowError(
       new TypeError(
         'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
       )
     )
-    expect(workerNode).toBeInstanceOf(WorkerNode)
-    expect(workerNode.worker).toBe(worker)
-    expect(workerNode.info).toStrictEqual({
-      id: worker.threadId,
+    expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
+    expect(threadWorkerNode.worker).toBe(threadWorker)
+    expect(threadWorkerNode.info).toStrictEqual({
+      id: threadWorker.threadId,
       type: WorkerTypes.thread,
       dynamic: false,
       ready: false
     })
-    expect(workerNode.usage).toStrictEqual({
+    expect(threadWorkerNode.usage).toStrictEqual({
+      tasks: {
+        executed: 0,
+        executing: 0,
+        queued: 0,
+        maxQueued: 0,
+        stolen: 0,
+        failed: 0
+      },
+      runTime: {
+        history: expect.any(CircularArray)
+      },
+      waitTime: {
+        history: expect.any(CircularArray)
+      },
+      elu: {
+        idle: {
+          history: expect.any(CircularArray)
+        },
+        active: {
+          history: expect.any(CircularArray)
+        }
+      }
+    })
+    expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
+    expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
+    expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
+    expect(threadWorkerNode.tasksQueue.size).toBe(0)
+    expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
+
+    expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
+    expect(clusterWorkerNode.worker).toBe(clusterWorker)
+    expect(clusterWorkerNode.info).toStrictEqual({
+      id: clusterWorker.id,
+      type: WorkerTypes.cluster,
+      dynamic: false,
+      ready: false
+    })
+    expect(clusterWorkerNode.usage).toStrictEqual({
       tasks: {
         executed: 0,
         executing: 0,
@@ -66,32 +99,32 @@ describe('Worker node test suite', () => {
         }
       }
     })
-    expect(workerNode.messageChannel).toBeInstanceOf(MessageChannel)
-    expect(workerNode.tasksQueueBackPressureSize).toBe(12)
-    expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
-    expect(workerNode.tasksQueue.size).toBe(0)
-    expect(workerNode.taskFunctionsUsage).toBeInstanceOf(Map)
+    expect(clusterWorkerNode.messageChannel).toBeUndefined()
+    expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
+    expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
+    expect(clusterWorkerNode.tasksQueue.size).toBe(0)
+    expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
   })
 
   it('Worker node getTaskFunctionWorkerUsage()', () => {
     expect(() =>
-      workerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
+      threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
     ).toThrowError(
       new TypeError(
         "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
       )
     )
-    workerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1']
+    threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1']
     expect(() =>
-      workerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
+      threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
     ).toThrowError(
       new TypeError(
         "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
       )
     )
-    workerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
+    threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
     expect(
-      workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
+      threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
     ).toStrictEqual({
       tasks: {
         executed: 0,
@@ -115,7 +148,7 @@ describe('Worker node test suite', () => {
         }
       }
     })
-    expect(workerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
+    expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
       tasks: {
         executed: 0,
         executing: 0,
@@ -138,7 +171,7 @@ describe('Worker node test suite', () => {
         }
       }
     })
-    expect(workerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
+    expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
       tasks: {
         executed: 0,
         executing: 0,
@@ -161,6 +194,6 @@ describe('Worker node test suite', () => {
         }
       }
     })
-    expect(workerNode.taskFunctionsUsage.size).toBe(2)
+    expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
   })
 })