refactor: cleanup worker error handling code
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 16 Dec 2023 12:26:32 +0000 (13:26 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 16 Dec 2023 12:26:32 +0000 (13:26 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/utils.ts
src/pools/worker-node.ts
tests/pools/worker-node.test.mjs
tests/utils.test.mjs

index 282979852bf564de38942b3d8c86f1dddf0c1831..6b95be9482664deaa1131eced2ccbe3ccb48ca8a 100644 (file)
@@ -1285,7 +1285,6 @@ export abstract class AbstractPool<
       this.opts.errorHandler ?? EMPTY_FUNCTION
     )
     workerNode.registerWorkerEventHandler('error', (error: Error) => {
-      const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker)
       workerNode.info.ready = false
       this.emitter?.emit(PoolEvents.error, error)
       if (
@@ -1301,7 +1300,7 @@ export abstract class AbstractPool<
         }
       }
       if (this.started && this.opts.enableTasksQueue === true) {
-        this.redistributeQueuedTasks(workerNodeKey)
+        this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
       }
       workerNode.terminate().catch(error => {
         this.emitter?.emit(PoolEvents.error, error)
@@ -1312,7 +1311,7 @@ export abstract class AbstractPool<
       this.opts.exitHandler ?? EMPTY_FUNCTION
     )
     workerNode.registerOnceWorkerEventHandler('exit', () => {
-      this.removeWorkerNode(workerNode.worker)
+      this.removeWorkerNode(workerNode)
     })
     const workerNodeKey = this.addWorkerNode(workerNode)
     this.afterWorkerNodeSetup(workerNodeKey)
@@ -1854,12 +1853,12 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Removes the worker node associated to the given worker from the pool worker nodes.
+   * Removes the worker node from the pool worker nodes.
    *
-   * @param worker - The worker.
+   * @param workerNode - The worker node.
    */
-  private removeWorkerNode (worker: Worker): void {
-    const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+  private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
+    const workerNodeKey = this.workerNodes.indexOf(workerNode)
     if (workerNodeKey !== -1) {
       this.workerNodes.splice(workerNodeKey, 1)
       this.workerChoiceStrategyContext.remove(workerNodeKey)
index 11c30ad1da9b5cf875be570116fdf240cb0af89b..db8ef7b7cc5991ea71a3dc0a2b163ac9d7a438c2 100644 (file)
@@ -119,7 +119,7 @@ export const checkWorkerNodeArguments = (
       'Cannot construct a worker node without worker node options'
     )
   }
-  if (opts != null && !isPlainObject(opts)) {
+  if (!isPlainObject(opts)) {
     throw new TypeError(
       'Cannot construct a worker node with invalid options: must be a plain object'
     )
index 7a6a1500f962e4cc948ad593f3f40e45d49e53ad..63aff757c5f4c670144af2cbebd4399dc0f766e7 100644 (file)
@@ -49,7 +49,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
    * Constructs a new worker node.
    *
    * @param type - The worker type.
-   * @param filePath - The worker file path.
+   * @param filePath - Path to the worker file.
    * @param opts - The worker node options.
    */
   constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
index cfca3422b0f3ff727138608a65f7542dff63ba35..3b87a8485d39e0547707197c8667238ec35a0265 100644 (file)
@@ -1,4 +1,5 @@
-import { MessageChannel } from 'node:worker_threads'
+import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
+import { Worker as ClusterWorker } from 'node:cluster'
 import { expect } from 'expect'
 import { WorkerNode } from '../../lib/pools/worker-node.js'
 import { WorkerTypes } from '../../lib/index.js'
@@ -118,6 +119,7 @@ describe('Worker node test suite', () => {
       )
     )
     expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
+    expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
     expect(threadWorkerNode.info).toStrictEqual({
       id: threadWorkerNode.worker.threadId,
       type: WorkerTypes.thread,
@@ -160,6 +162,7 @@ describe('Worker node test suite', () => {
     expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
 
     expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
+    expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
     expect(clusterWorkerNode.info).toStrictEqual({
       id: clusterWorkerNode.worker.id,
       type: WorkerTypes.cluster,
index be764cd217a540cac154e8b5ab686ae4bb1070e5..0ae80de24bfa9c2badff5f5127a8e3137c50e6a9 100644 (file)
@@ -85,7 +85,7 @@ describe('Utils test suite', () => {
     const start = performance.now()
     await sleep(1000)
     const elapsed = performance.now() - start
-    expect(elapsed).toBeGreaterThanOrEqual(999)
+    expect(elapsed).toBeGreaterThanOrEqual(1000)
   })
 
   it('Verify exponentialDelay() behavior', () => {