From 7884d1837ee55026fe5204a56f4ebeca17e7e7dd Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 18 Aug 2023 00:16:27 +0200 Subject: [PATCH] refactor: move message channel property from worker info to node MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/thread/fixed.ts | 6 +++--- src/pools/worker-node.ts | 21 +++++++++++---------- src/pools/worker.ts | 8 ++++---- tests/pools/abstract/abstract-pool.test.js | 4 +--- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 13e4a11a..8dd4e7e3 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -80,7 +80,7 @@ export class FixedThreadPool< transferList?: TransferListItem[] ): void { ( - this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel + this.workerNodes[workerNodeKey].messageChannel as MessageChannel ).port1.postMessage(message, transferList) } @@ -88,7 +88,7 @@ export class FixedThreadPool< protected sendStartupMessageToWorker (workerNodeKey: number): void { const worker = this.workerNodes[workerNodeKey].worker const port2: MessagePort = ( - this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel + this.workerNodes[workerNodeKey].messageChannel as MessageChannel ).port2 worker.postMessage( { @@ -106,7 +106,7 @@ export class FixedThreadPool< listener: (message: MessageValue) => void ): void { ( - this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel + this.workerNodes[workerNodeKey].messageChannel as MessageChannel ).port1.on('message', listener) } diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 292b39df..a5482dcd 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -22,6 +22,7 @@ export class WorkerNode implements IWorkerNode { public readonly worker: Worker public readonly info: WorkerInfo + public messageChannel?: MessageChannel public usage: WorkerUsage private readonly tasksUsage: Map private readonly tasksQueue: Queue> @@ -35,6 +36,9 @@ implements IWorkerNode { constructor (worker: Worker, workerType: WorkerType) { this.worker = worker this.info = this.initWorkerInfo(worker, workerType) + if (workerType === WorkerTypes.thread) { + this.messageChannel = new MessageChannel() + } this.usage = this.initWorkerUsage() this.tasksUsage = new Map() this.tasksQueue = new Queue>() @@ -77,12 +81,12 @@ implements IWorkerNode { /** @inheritdoc */ public closeChannel (): void { - if (this.info.messageChannel != null) { - this.info.messageChannel?.port1.unref() - this.info.messageChannel?.port2.unref() - this.info.messageChannel?.port1.close() - this.info.messageChannel?.port2.close() - delete this.info.messageChannel + if (this.messageChannel != null) { + this.messageChannel?.port1.unref() + this.messageChannel?.port2.unref() + this.messageChannel?.port1.close() + this.messageChannel?.port2.close() + delete this.messageChannel } } @@ -111,10 +115,7 @@ implements IWorkerNode { id: this.getWorkerId(worker, workerType), type: workerType, dynamic: false, - ready: false, - ...(workerType === WorkerTypes.thread && { - messageChannel: new MessageChannel() - }) + ready: false } } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 1de3cf09..09253242 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -141,10 +141,6 @@ export interface WorkerInfo { * Task function names. */ taskFunctions?: string[] - /** - * Message channel. - */ - messageChannel?: MessageChannel } /** @@ -215,6 +211,10 @@ export interface IWorkerNode { * Worker info. */ readonly info: WorkerInfo + /** + * Message channel. + */ + readonly messageChannel?: MessageChannel /** * Worker usage statistics. */ diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 5af15912..5c367aa2 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -1,4 +1,3 @@ -const { MessageChannel } = require('worker_threads') const { expect } = require('expect') const { DynamicClusterPool, @@ -588,8 +587,7 @@ describe('Abstract pool test suite', () => { id: expect.any(Number), type: WorkerTypes.thread, dynamic: false, - ready: true, - messageChannel: expect.any(MessageChannel) + ready: true }) } }) -- 2.34.1