From 9974369e3530cf80c21794b59bcbbc4c72e505eb Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 16 Dec 2023 13:26:32 +0100 Subject: [PATCH] refactor: cleanup worker error handling code MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 13 ++++++------- src/pools/utils.ts | 2 +- src/pools/worker-node.ts | 2 +- tests/pools/worker-node.test.mjs | 5 ++++- tests/utils.test.mjs | 2 +- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 28297985..6b95be94 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1285,7 +1285,6 @@ export abstract class AbstractPool< this.opts.errorHandler ?? EMPTY_FUNCTION ) workerNode.registerWorkerEventHandler('error', (error: Error) => { - const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker) workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) if ( @@ -1301,7 +1300,7 @@ export abstract class AbstractPool< } } if (this.started && this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(workerNodeKey) + this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } workerNode.terminate().catch(error => { this.emitter?.emit(PoolEvents.error, error) @@ -1312,7 +1311,7 @@ export abstract class AbstractPool< this.opts.exitHandler ?? EMPTY_FUNCTION ) workerNode.registerOnceWorkerEventHandler('exit', () => { - this.removeWorkerNode(workerNode.worker) + this.removeWorkerNode(workerNode) }) const workerNodeKey = this.addWorkerNode(workerNode) this.afterWorkerNodeSetup(workerNodeKey) @@ -1854,12 +1853,12 @@ export abstract class AbstractPool< } /** - * Removes the worker node associated to the given worker from the pool worker nodes. + * Removes the worker node from the pool worker nodes. * - * @param worker - The worker. + * @param workerNode - The worker node. */ - private removeWorkerNode (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + private removeWorkerNode (workerNode: IWorkerNode): void { + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext.remove(workerNodeKey) diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 11c30ad1..db8ef7b7 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -119,7 +119,7 @@ export const checkWorkerNodeArguments = ( 'Cannot construct a worker node without worker node options' ) } - if (opts != null && !isPlainObject(opts)) { + if (!isPlainObject(opts)) { throw new TypeError( 'Cannot construct a worker node with invalid options: must be a plain object' ) diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 7a6a1500..63aff757 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -49,7 +49,7 @@ export class WorkerNode * Constructs a new worker node. * * @param type - The worker type. - * @param filePath - The worker file path. + * @param filePath - Path to the worker file. * @param opts - The worker node options. */ constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) { diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index cfca3422..3b87a848 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -1,4 +1,5 @@ -import { MessageChannel } from 'node:worker_threads' +import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads' +import { Worker as ClusterWorker } from 'node:cluster' import { expect } from 'expect' import { WorkerNode } from '../../lib/pools/worker-node.js' import { WorkerTypes } from '../../lib/index.js' @@ -118,6 +119,7 @@ describe('Worker node test suite', () => { ) ) expect(threadWorkerNode).toBeInstanceOf(WorkerNode) + expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker) expect(threadWorkerNode.info).toStrictEqual({ id: threadWorkerNode.worker.threadId, type: WorkerTypes.thread, @@ -160,6 +162,7 @@ describe('Worker node test suite', () => { expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) expect(clusterWorkerNode).toBeInstanceOf(WorkerNode) + expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker) expect(clusterWorkerNode.info).toStrictEqual({ id: clusterWorkerNode.worker.id, type: WorkerTypes.cluster, diff --git a/tests/utils.test.mjs b/tests/utils.test.mjs index be764cd2..0ae80de2 100644 --- a/tests/utils.test.mjs +++ b/tests/utils.test.mjs @@ -85,7 +85,7 @@ describe('Utils test suite', () => { const start = performance.now() await sleep(1000) const elapsed = performance.now() - start - expect(elapsed).toBeGreaterThanOrEqual(999) + expect(elapsed).toBeGreaterThanOrEqual(1000) }) it('Verify exponentialDelay() behavior', () => { -- 2.34.1