From a5d152043363ea21ec667205c88e0b9f2a6ff04e Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 17 Aug 2023 19:23:50 +0200 Subject: [PATCH] fix: fix race condition between ready and task functions message at startup MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 24 +++++++++++++----------- src/pools/worker-node.ts | 4 ++-- src/pools/worker.ts | 2 +- src/worker/cluster-worker.ts | 13 ++++++++++--- src/worker/thread-worker.ts | 13 ++++++++++--- 5 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index e7c27cda..f89af5ac 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -678,12 +678,11 @@ export abstract class AbstractPool< } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() + const workerInfo = this.getWorkerInfo(workerNodeKey) if ( name != null && - Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) && - !(this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).includes( - name - ) + Array.isArray(workerInfo.taskFunctions) && + !workerInfo.taskFunctions.includes(name) ) { reject( new Error(`Task function '${name}' is not registered in the pool`) @@ -695,7 +694,7 @@ export abstract class AbstractPool< data: data ?? ({} as Data), transferList, timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, + workerId: workerInfo.id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -816,9 +815,10 @@ export abstract class AbstractPool< } private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean { + const workerInfo = this.getWorkerInfo(workerNodeKey) return ( - Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) && - (this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).length > 1 + Array.isArray(workerInfo.taskFunctions) && + workerInfo.taskFunctions.length > 1 ) } @@ -1125,7 +1125,7 @@ export abstract class AbstractPool< protected workerListener (): (message: MessageValue) => void { return (message) => { this.checkMessageWorkerId(message) - if (message.ready != null) { + if (message.ready != null && message.taskFunctions != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.taskId != null) { @@ -1144,9 +1144,11 @@ export abstract class AbstractPool< if (message.ready === false) { throw new Error(`Worker ${message.workerId} failed to initialize`) } - this.getWorkerInfo( + const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ).ready = message.ready as boolean + ) + workerInfo.ready = message.ready as boolean + workerInfo.taskFunctions = message.taskFunctions if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) } @@ -1217,7 +1219,7 @@ export abstract class AbstractPool< this.workerNodes.push(workerNode) const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) if (workerNodeKey === -1) { - throw new Error('Worker node not found') + throw new Error('Worker node added not found') } return workerNodeKey } diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 4349ed72..292b39df 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -88,9 +88,9 @@ implements IWorkerNode { /** @inheritdoc */ public getTaskWorkerUsage (name: string): WorkerUsage | undefined { - if (name === DEFAULT_TASK_NAME && !Array.isArray(this.info.taskFunctions)) { + if (!Array.isArray(this.info.taskFunctions)) { throw new Error( - 'Cannot get task worker usage for default task function name when task function names list is not yet defined' + `Cannot get task worker usage for task function name '${name}' when task function names list is not yet defined` ) } if ( diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 5ac90fbf..1de3cf09 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -247,7 +247,7 @@ export interface IWorkerNode { */ readonly resetUsage: () => void /** - * Close communication channel. + * Closes communication channel. */ readonly closeChannel: () => void /** diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index e5bcb727..35899e79 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -46,10 +46,17 @@ export class ClusterWorker< if (message.workerId === this.id && message.ready === false) { try { this.getMainWorker()?.on('message', this.messageListener.bind(this)) - this.sendTaskFunctionsListToMainWorker() - this.sendToMainWorker({ ready: true, workerId: this.id }) + this.sendToMainWorker({ + ready: true, + taskFunctions: this.listTaskFunctions(), + workerId: this.id + }) } catch { - this.sendToMainWorker({ ready: false, workerId: this.id }) + this.sendToMainWorker({ + ready: false, + taskFunctions: this.listTaskFunctions(), + workerId: this.id + }) } } } diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index 6f36664b..8d30ef21 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -60,10 +60,17 @@ export class ThreadWorker< try { this.port = message.port this.port.on('message', this.messageListener.bind(this)) - this.sendTaskFunctionsListToMainWorker() - this.sendToMainWorker({ ready: true, workerId: this.id }) + this.sendToMainWorker({ + ready: true, + taskFunctions: this.listTaskFunctions(), + workerId: this.id + }) } catch { - this.sendToMainWorker({ ready: false, workerId: this.id }) + this.sendToMainWorker({ + ready: false, + taskFunctions: this.listTaskFunctions(), + workerId: this.id + }) } } } -- 2.34.1