From: Jérôme Benoit Date: Thu, 17 Aug 2023 22:16:27 +0000 (+0200) Subject: refactor: move message channel property from worker info to node X-Git-Tag: v2.6.29~21 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=7884d1837ee55026fe5204a56f4ebeca17e7e7dd;hp=12a8645b5b7fdcb0f1f46403b90ebcc1a8aca505;p=poolifier.git refactor: move message channel property from worker info to node Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 13e4a11a..8dd4e7e3 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -80,7 +80,7 @@ export class FixedThreadPool< transferList?: TransferListItem[] ): void { ( - this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel + this.workerNodes[workerNodeKey].messageChannel as MessageChannel ).port1.postMessage(message, transferList) } @@ -88,7 +88,7 @@ export class FixedThreadPool< protected sendStartupMessageToWorker (workerNodeKey: number): void { const worker = this.workerNodes[workerNodeKey].worker const port2: MessagePort = ( - this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel + this.workerNodes[workerNodeKey].messageChannel as MessageChannel ).port2 worker.postMessage( { @@ -106,7 +106,7 @@ export class FixedThreadPool< listener: (message: MessageValue) => void ): void { ( - this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel + this.workerNodes[workerNodeKey].messageChannel as MessageChannel ).port1.on('message', listener) } diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 292b39df..a5482dcd 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -22,6 +22,7 @@ export class WorkerNode implements IWorkerNode { public readonly worker: Worker public readonly info: WorkerInfo + public messageChannel?: MessageChannel public usage: WorkerUsage private readonly tasksUsage: Map private readonly tasksQueue: Queue> @@ -35,6 +36,9 @@ implements IWorkerNode { constructor (worker: Worker, workerType: WorkerType) { this.worker = worker this.info = this.initWorkerInfo(worker, workerType) + if (workerType === WorkerTypes.thread) { + this.messageChannel = new MessageChannel() + } this.usage = this.initWorkerUsage() this.tasksUsage = new Map() this.tasksQueue = new Queue>() @@ -77,12 +81,12 @@ implements IWorkerNode { /** @inheritdoc */ public closeChannel (): void { - if (this.info.messageChannel != null) { - this.info.messageChannel?.port1.unref() - this.info.messageChannel?.port2.unref() - this.info.messageChannel?.port1.close() - this.info.messageChannel?.port2.close() - delete this.info.messageChannel + if (this.messageChannel != null) { + this.messageChannel?.port1.unref() + this.messageChannel?.port2.unref() + this.messageChannel?.port1.close() + this.messageChannel?.port2.close() + delete this.messageChannel } } @@ -111,10 +115,7 @@ implements IWorkerNode { id: this.getWorkerId(worker, workerType), type: workerType, dynamic: false, - ready: false, - ...(workerType === WorkerTypes.thread && { - messageChannel: new MessageChannel() - }) + ready: false } } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 1de3cf09..09253242 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -141,10 +141,6 @@ export interface WorkerInfo { * Task function names. */ taskFunctions?: string[] - /** - * Message channel. - */ - messageChannel?: MessageChannel } /** @@ -215,6 +211,10 @@ export interface IWorkerNode { * Worker info. */ readonly info: WorkerInfo + /** + * Message channel. + */ + readonly messageChannel?: MessageChannel /** * Worker usage statistics. */ diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 5af15912..5c367aa2 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -1,4 +1,3 @@ -const { MessageChannel } = require('worker_threads') const { expect } = require('expect') const { DynamicClusterPool, @@ -588,8 +587,7 @@ describe('Abstract pool test suite', () => { id: expect.any(Number), type: WorkerTypes.thread, dynamic: false, - ready: true, - messageChannel: expect.any(MessageChannel) + ready: true }) } })