From 85aeb3f356d749b96361e74cf17d403a697e3dd7 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 19 Jul 2023 23:41:53 +0200 Subject: [PATCH] feat: add dedicated message channel for threads pool MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Reference: #801 Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 33 +++++++++++---------- src/pools/cluster/fixed.ts | 16 ++++++++++ src/pools/thread/fixed.ts | 34 +++++++++++++++++++++- src/pools/worker-node.ts | 6 +++- src/pools/worker.ts | 5 ++++ src/utility-types.ts | 5 ++++ src/worker/abstract-worker.ts | 21 ++++++------- src/worker/cluster-worker.ts | 12 +++++++- src/worker/thread-worker.ts | 24 +++++++++++++-- tests/pools/abstract/abstract-pool.test.js | 4 ++- tests/worker/thread-worker.test.js | 7 +++-- 11 files changed, 130 insertions(+), 37 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 10a2d514..ddf4cbcc 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -30,7 +30,6 @@ import { import type { IWorker, IWorkerNode, - MessageHandler, WorkerInfo, WorkerType, WorkerUsage @@ -878,6 +877,11 @@ export abstract class AbstractPool< worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { + const workerInfo = this.getWorkerInfoByWorker(worker) + if (workerInfo.messageChannel != null) { + workerInfo.messageChannel?.port1.close() + workerInfo.messageChannel?.port1.close() + } this.removeWorkerNode(worker) }) @@ -928,12 +932,9 @@ export abstract class AbstractPool< * @param worker - The worker which should register a listener. * @param listener - The message listener callback. */ - private registerWorkerMessageListener( - worker: Worker, - listener: (message: MessageValue) => void - ): void { - worker.on('message', listener as MessageHandler) - } + protected abstract registerWorkerMessageListener< + Message extends Data | Response + >(worker: Worker, listener: (message: MessageValue) => void): void /** * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. @@ -944,18 +945,18 @@ export abstract class AbstractPool< protected afterWorkerSetup (worker: Worker): void { // Listen to worker messages. this.registerWorkerMessageListener(worker, this.workerListener()) - // Send startup message to worker. - this.sendWorkerStartupMessage(worker) + // Send the startup message to worker. + this.sendStartupMessageToWorker(worker) // Setup worker task statistics computation. this.setWorkerStatistics(worker) } - private sendWorkerStartupMessage (worker: Worker): void { - this.sendToWorker(worker, { - ready: false, - workerId: this.getWorkerInfoByWorker(worker).id as number - }) - } + /** + * Sends the startup message to the given worker. + * + * @param worker - The worker which should receive the startup message. + */ + protected abstract sendStartupMessageToWorker (worker: Worker): void private redistributeQueuedTasks (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { @@ -1064,7 +1065,7 @@ export abstract class AbstractPool< * * @param worker - The worker. */ - private getWorkerInfoByWorker (worker: Worker): WorkerInfo { + protected getWorkerInfoByWorker (worker: Worker): WorkerInfo { return this.workerNodes[this.getWorkerNodeKey(worker)].info } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 6e9b4b07..d7e3cc4b 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -73,6 +73,22 @@ export class FixedClusterPool< worker.send(message) } + /** @inheritDoc */ + protected sendStartupMessageToWorker (worker: Worker): void { + this.sendToWorker(worker, { + ready: false, + workerId: this.getWorkerInfoByWorker(worker).id as number + }) + } + + /** @inheritDoc */ + protected registerWorkerMessageListener( + worker: Worker, + listener: (message: MessageValue) => void + ): void { + worker.on('message', listener) + } + /** @inheritDoc */ protected createWorker (): Worker { return cluster.fork(this.opts.env) diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index c9145f57..52868fd7 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -1,4 +1,6 @@ import { + type MessageChannel, + type MessagePort, SHARE_ENV, Worker, type WorkerOptions, @@ -56,12 +58,42 @@ export class FixedThreadPool< /** @inheritDoc */ protected async destroyWorker (worker: Worker): Promise { this.sendToWorker(worker, { kill: true, workerId: worker.threadId }) + const workerInfo = this.getWorkerInfoByWorker(worker) + workerInfo.messageChannel?.port1.close() + workerInfo.messageChannel?.port2.close() await worker.terminate() } /** @inheritDoc */ protected sendToWorker (worker: Worker, message: MessageValue): void { - worker.postMessage(message) + ( + this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel + ).port1.postMessage(message) + } + + /** @inheritDoc */ + protected sendStartupMessageToWorker (worker: Worker): void { + const port2: MessagePort = ( + this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel + ).port2 + worker.postMessage( + { + ready: false, + workerId: this.getWorkerInfoByWorker(worker).id as number, + port: port2 + }, + [port2] + ) + } + + /** @inheritDoc */ + protected registerWorkerMessageListener( + worker: Worker, + listener: (message: MessageValue) => void + ): void { + ( + this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel + ).port1.on('message', listener) } /** @inheritDoc */ diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 76169c89..2c39393b 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -1,3 +1,4 @@ +import { MessageChannel } from 'node:worker_threads' import { CircularArray } from '../circular-array' import { Queue } from '../queue' import type { Task } from '../utility-types' @@ -86,7 +87,10 @@ implements IWorkerNode { id: this.getWorkerId(worker, workerType), type: workerType, dynamic: false, - ready: false + ready: false, + ...(workerType === WorkerTypes.thread && { + messageChannel: new MessageChannel() + }) } } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 507396de..b7cd7f67 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -1,3 +1,4 @@ +import type { MessageChannel } from 'node:worker_threads' import type { CircularArray } from '../circular-array' import type { Task } from '../utility-types' @@ -136,6 +137,10 @@ export interface WorkerInfo { * Ready flag. */ ready: boolean + /** + * Message channel. + */ + messageChannel?: MessageChannel } /** diff --git a/src/utility-types.ts b/src/utility-types.ts index 0723b439..91644713 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,4 +1,5 @@ import type { EventLoopUtilization } from 'node:perf_hooks' +import type { MessagePort } from 'node:worker_threads' import type { KillBehavior } from './worker/worker-options' import type { IWorker } from './pools/worker' @@ -118,6 +119,10 @@ export interface MessageValue * Whether the worker starts or stops its activity check. */ readonly checkActive?: boolean + /** + * Message port. + */ + readonly port?: MessagePort } /** diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index a431f5a7..77d2e007 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -66,17 +66,17 @@ export abstract class AbstractWorker< * * @param type - The type of async event. * @param isMain - Whether this is the main worker or not. - * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function. * @param mainWorker - Reference to main worker. + * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function. * @param opts - Options for the worker. */ public constructor ( type: string, protected readonly isMain: boolean, + protected readonly mainWorker: MainWorker, taskFunctions: | WorkerFunction | TaskFunctions, - protected readonly mainWorker: MainWorker, protected readonly opts: WorkerOptions = { /** * The kill behavior option on this worker or its default value. @@ -93,7 +93,7 @@ export abstract class AbstractWorker< this.checkWorkerOptions(this.opts) this.checkTaskFunctions(taskFunctions) if (!this.isMain) { - this.mainWorker?.on('message', this.messageListener.bind(this)) + this.getMainWorker()?.on('message', this.handleReadyMessage.bind(this)) } } @@ -289,12 +289,9 @@ export abstract class AbstractWorker< * * @param message - The received message. */ - protected messageListener (message: MessageValue): void { + protected messageListener (message: MessageValue): void { if (message.workerId === this.id) { - if (message.ready != null) { - // Startup message received - this.sendReadyResponse() - } else if (message.statistics != null) { + if (message.statistics != null) { // Statistics message received this.statistics = message.statistics } else if (message.checkActive != null) { @@ -314,11 +311,11 @@ export abstract class AbstractWorker< } /** - * Sends the ready response to the main worker. + * Handles the ready message sent by the main worker. + * + * @param message - The ready message. */ - protected sendReadyResponse (): void { - !this.isMain && this.sendToMainWorker({ ready: true, workerId: this.id }) - } + protected abstract handleReadyMessage (message: MessageValue): void /** * Starts the worker check active interval. diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index c43e7f75..c820641d 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -37,10 +37,20 @@ export class ClusterWorker< super( 'worker-cluster-pool:poolifier', cluster.isPrimary, - taskFunctions, cluster.worker as Worker, + taskFunctions, opts ) + if (!this.isMain) { + this.getMainWorker()?.on('message', this.messageListener.bind(this)) + } + } + + /** @inheritDoc */ + protected handleReadyMessage (message: MessageValue): void { + if (message.workerId === this.id && message.ready != null) { + !this.isMain && this.sendToMainWorker({ ready: true, workerId: this.id }) + } } /** @inheritDoc */ diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index 09135aff..a8658f03 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -27,6 +27,10 @@ export class ThreadWorker< Data = unknown, Response = unknown > extends AbstractWorker { + /** + * Message port used to communicate with the main thread. + */ + private port!: MessagePort /** * Constructs a new poolifier thread worker. * @@ -42,19 +46,35 @@ export class ThreadWorker< super( 'worker-thread-pool:poolifier', isMainThread, - taskFunctions, parentPort as MessagePort, + taskFunctions, opts ) } + /** @inheritDoc */ + protected handleReadyMessage (message: MessageValue): void { + if ( + message.workerId === this.id && + message.ready != null && + message.port != null + ) { + if (!this.isMain) { + this.port = message.port + this.port.on('message', this.messageListener.bind(this)) + this.sendToMainWorker({ ready: true, workerId: this.id }) + } + } + } + + /** @inheritDoc */ protected get id (): number { return threadId } /** @inheritDoc */ protected sendToMainWorker (message: MessageValue): void { - this.getMainWorker().postMessage(message) + this.port.postMessage(message) } /** @inheritDoc */ diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 62c46184..b26ca43d 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -1,3 +1,4 @@ +const { MessageChannel } = require('worker_threads') const { expect } = require('expect') const { DynamicClusterPool, @@ -549,7 +550,8 @@ describe('Abstract pool test suite', () => { id: expect.any(Number), type: WorkerTypes.thread, dynamic: false, - ready: true + ready: true, + messageChannel: expect.any(MessageChannel) }) } }) diff --git a/tests/worker/thread-worker.test.js b/tests/worker/thread-worker.test.js index c08adffd..9eeaaa0a 100644 --- a/tests/worker/thread-worker.test.js +++ b/tests/worker/thread-worker.test.js @@ -7,8 +7,9 @@ describe('Thread worker test suite', () => { ++numberOfMessagesPosted } class SpyWorker extends ThreadWorker { - getMainWorker () { - return { postMessage } + constructor (fn) { + super(fn) + this.port = { postMessage } } } @@ -25,7 +26,7 @@ describe('Thread worker test suite', () => { expect(worker.handleError(errorMessage)).toStrictEqual(errorMessage) }) - it('Verify worker invokes the getMainWorker() and postMessage() methods', () => { + it('Verify worker invokes the postMessage() method on port property', () => { const worker = new SpyWorker(() => {}) worker.sendToMainWorker({ ok: 1 }) expect(numberOfMessagesPosted).toBe(1) -- 2.34.1