From 75de9f41ce00bec38febd6d82653d3d82f1bb884 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 27 Aug 2023 19:24:39 +0200 Subject: [PATCH] refactor: cleanup direct access to worker id MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 1 - src/pools/cluster/fixed.ts | 10 ++- src/pools/thread/fixed.ts | 15 ++-- src/pools/worker-node.ts | 43 +++-------- src/utils.ts | 44 ++++++++++- tests/pools/abstract/worker-node.test.js | 93 ++++++++++++++++-------- 6 files changed, 131 insertions(+), 75 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index be8ec1a0..482a20db 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1427,7 +1427,6 @@ export abstract class AbstractPool< private addWorkerNode (worker: Worker): number { const workerNode = new WorkerNode( worker, - this.worker, this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) ) // Flag the worker node as ready at pool startup. diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index c7c95c4b..75c08535 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -63,7 +63,8 @@ export class FixedClusterPool< protected async destroyWorkerNode (workerNodeKey: number): Promise { this.flushTasksQueue(workerNodeKey) // FIXME: wait for tasks to be finished - const worker = this.workerNodes[workerNodeKey].worker + const workerNode = this.workerNodes[workerNodeKey] + const worker = workerNode.worker const waitWorkerExit = new Promise((resolve) => { worker.on('exit', () => { resolve() @@ -72,7 +73,10 @@ export class FixedClusterPool< worker.on('disconnect', () => { worker.kill() }) - await this.sendKillMessageToWorker(workerNodeKey, worker.id) + await this.sendKillMessageToWorker( + workerNodeKey, + workerNode.info.id as number + ) worker.disconnect() await waitWorkerExit } @@ -89,7 +93,7 @@ export class FixedClusterPool< protected sendStartupMessageToWorker (workerNodeKey: number): void { this.sendToWorker(workerNodeKey, { ready: false, - workerId: this.workerNodes[workerNodeKey].worker.id + workerId: this.workerNodes[workerNodeKey].info.id as number }) } diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 32107e48..d69055cf 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -67,7 +67,10 @@ export class FixedThreadPool< resolve() }) }) - await this.sendKillMessageToWorker(workerNodeKey, worker.threadId) + await this.sendKillMessageToWorker( + workerNodeKey, + workerNode.info.id as number + ) workerNode.closeChannel() await worker.terminate() await waitWorkerExit @@ -86,14 +89,14 @@ export class FixedThreadPool< /** @inheritDoc */ protected sendStartupMessageToWorker (workerNodeKey: number): void { - const worker = this.workerNodes[workerNodeKey].worker - const port2: MessagePort = ( - this.workerNodes[workerNodeKey].messageChannel as MessageChannel - ).port2 + const workerNode = this.workerNodes[workerNodeKey] + const worker = workerNode.worker + const port2: MessagePort = (workerNode.messageChannel as MessageChannel) + .port2 worker.postMessage( { ready: false, - workerId: worker.threadId, + workerId: workerNode.info.id, port: port2 }, [port2] diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 03fc0df1..1f46075a 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -5,6 +5,8 @@ import { DEFAULT_TASK_NAME, EMPTY_FUNCTION, exponentialDelay, + getWorkerId, + getWorkerType, sleep } from '../utils' import { Deque } from '../deque' @@ -49,22 +51,13 @@ implements IWorkerNode { * Constructs a new worker node. * * @param worker - The worker. - * @param workerType - The worker type. * @param tasksQueueBackPressureSize - The tasks queue back pressure size. */ - constructor ( - worker: Worker, - workerType: WorkerType, - tasksQueueBackPressureSize: number - ) { + constructor (worker: Worker, tasksQueueBackPressureSize: number) { if (worker == null) { throw new TypeError('Cannot construct a worker node without a worker') } - if (workerType == null) { - throw new TypeError( - 'Cannot construct a worker node without a worker type' - ) - } + if (tasksQueueBackPressureSize == null) { throw new TypeError( 'Cannot construct a worker node without a tasks queue back pressure size' @@ -76,9 +69,9 @@ implements IWorkerNode { ) } this.worker = worker - this.info = this.initWorkerInfo(worker, workerType) + this.info = this.initWorkerInfo(worker) this.usage = this.initWorkerUsage() - if (workerType === WorkerTypes.thread) { + if (this.info.type === WorkerTypes.thread) { this.messageChannel = new MessageChannel() } this.tasksQueueBackPressureSize = tasksQueueBackPressureSize @@ -193,10 +186,10 @@ implements IWorkerNode { await this.startOnEmptyQueue() } - private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo { + private initWorkerInfo (worker: Worker): WorkerInfo { return { - id: this.getWorkerId(worker, workerType), - type: workerType, + id: getWorkerId(worker), + type: getWorkerType(worker) as WorkerType, dynamic: false, ready: false } @@ -279,22 +272,4 @@ implements IWorkerNode { } } } - - /** - * Gets the worker id. - * - * @param worker - The worker. - * @param workerType - The worker type. - * @returns The worker id. - */ - private getWorkerId ( - worker: Worker, - workerType: WorkerType - ): number | undefined { - if (workerType === WorkerTypes.thread) { - return worker.threadId - } else if (workerType === WorkerTypes.cluster) { - return worker.id - } - } } diff --git a/src/utils.ts b/src/utils.ts index b84c4c80..8b88d2af 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,11 +1,18 @@ import * as os from 'node:os' import { webcrypto } from 'node:crypto' +import { Worker as ClusterWorker } from 'node:cluster' +import { Worker as ThreadWorker } from 'node:worker_threads' import type { MeasurementStatisticsRequirements, WorkerChoiceStrategyOptions } from './pools/selection-strategies/selection-strategies-types' import type { KillBehavior } from './worker/worker-options' -import type { MeasurementStatistics } from './pools/worker' +import { + type IWorker, + type MeasurementStatistics, + type WorkerType, + WorkerTypes +} from './pools/worker' /** * Default task name. @@ -109,6 +116,41 @@ export const average = (dataSet: number[]): number => { ) } +/** + * Returns the worker type of the given worker. + * + * @param worker - The worker to get the type of. + * @returns The worker type of the given worker. + * @internal + */ +export const getWorkerType = ( + worker: Worker +): WorkerType | undefined => { + if (worker instanceof ThreadWorker) { + return WorkerTypes.thread + } + if (worker instanceof ClusterWorker) { + return WorkerTypes.cluster + } +} + +/** + * Returns the worker id of the given worker. + * + * @param worker - The worker to get the id of. + * @returns The worker id of the given worker. + * @internal + */ +export const getWorkerId = ( + worker: Worker +): number | undefined => { + if (worker instanceof ThreadWorker) { + return worker.threadId + } else if (worker instanceof ClusterWorker) { + return worker.id + } +} + /** * Computes the median of the given data set. * diff --git a/tests/pools/abstract/worker-node.test.js b/tests/pools/abstract/worker-node.test.js index ede278f2..12e05436 100644 --- a/tests/pools/abstract/worker-node.test.js +++ b/tests/pools/abstract/worker-node.test.js @@ -1,4 +1,5 @@ const { MessageChannel, Worker } = require('worker_threads') +const cluster = require('cluster') const { expect } = require('expect') const { WorkerNode } = require('../../../lib/pools/worker-node') const { WorkerTypes } = require('../../../lib') @@ -7,42 +8,74 @@ const { Deque } = require('../../../lib/deque') const { DEFAULT_TASK_NAME } = require('../../../lib/utils') describe('Worker node test suite', () => { - const worker = new Worker('./tests/worker-files/thread/testWorker.js') - const workerNode = new WorkerNode(worker, WorkerTypes.thread, 12) + const threadWorker = new Worker('./tests/worker-files/thread/testWorker.js') + const clusterWorker = cluster.fork() + const threadWorkerNode = new WorkerNode(threadWorker, 12) + const clusterWorkerNode = new WorkerNode(clusterWorker, 12) it('Worker node instantiation', () => { expect(() => new WorkerNode()).toThrowError( new TypeError('Cannot construct a worker node without a worker') ) - expect(() => new WorkerNode(worker)).toThrowError( - new TypeError('Cannot construct a worker node without a worker type') - ) - expect(() => new WorkerNode(worker, WorkerTypes.thread)).toThrowError( + expect(() => new WorkerNode(threadWorker)).toThrowError( new TypeError( 'Cannot construct a worker node without a tasks queue back pressure size' ) ) expect( - () => - new WorkerNode( - worker, - WorkerTypes.thread, - 'invalidTasksQueueBackPressureSize' - ) + () => new WorkerNode(threadWorker, 'invalidTasksQueueBackPressureSize') ).toThrowError( new TypeError( 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer' ) ) - expect(workerNode).toBeInstanceOf(WorkerNode) - expect(workerNode.worker).toBe(worker) - expect(workerNode.info).toStrictEqual({ - id: worker.threadId, + expect(threadWorkerNode).toBeInstanceOf(WorkerNode) + expect(threadWorkerNode.worker).toBe(threadWorker) + expect(threadWorkerNode.info).toStrictEqual({ + id: threadWorker.threadId, type: WorkerTypes.thread, dynamic: false, ready: false }) - expect(workerNode.usage).toStrictEqual({ + expect(threadWorkerNode.usage).toStrictEqual({ + tasks: { + executed: 0, + executing: 0, + queued: 0, + maxQueued: 0, + stolen: 0, + failed: 0 + }, + runTime: { + history: expect.any(CircularArray) + }, + waitTime: { + history: expect.any(CircularArray) + }, + elu: { + idle: { + history: expect.any(CircularArray) + }, + active: { + history: expect.any(CircularArray) + } + } + }) + expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel) + expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12) + expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque) + expect(threadWorkerNode.tasksQueue.size).toBe(0) + expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) + + expect(clusterWorkerNode).toBeInstanceOf(WorkerNode) + expect(clusterWorkerNode.worker).toBe(clusterWorker) + expect(clusterWorkerNode.info).toStrictEqual({ + id: clusterWorker.id, + type: WorkerTypes.cluster, + dynamic: false, + ready: false + }) + expect(clusterWorkerNode.usage).toStrictEqual({ tasks: { executed: 0, executing: 0, @@ -66,32 +99,32 @@ describe('Worker node test suite', () => { } } }) - expect(workerNode.messageChannel).toBeInstanceOf(MessageChannel) - expect(workerNode.tasksQueueBackPressureSize).toBe(12) - expect(workerNode.tasksQueue).toBeInstanceOf(Deque) - expect(workerNode.tasksQueue.size).toBe(0) - expect(workerNode.taskFunctionsUsage).toBeInstanceOf(Map) + expect(clusterWorkerNode.messageChannel).toBeUndefined() + expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12) + expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque) + expect(clusterWorkerNode.tasksQueue.size).toBe(0) + expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) }) it('Worker node getTaskFunctionWorkerUsage()', () => { expect(() => - workerNode.getTaskFunctionWorkerUsage('invalidTaskFunction') + threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction') ).toThrowError( new TypeError( "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined" ) ) - workerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1'] + threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1'] expect(() => - workerNode.getTaskFunctionWorkerUsage('invalidTaskFunction') + threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction') ).toThrowError( new TypeError( "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements" ) ) - workerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1', 'fn2'] + threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1', 'fn2'] expect( - workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) + threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) ).toStrictEqual({ tasks: { executed: 0, @@ -115,7 +148,7 @@ describe('Worker node test suite', () => { } } }) - expect(workerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({ + expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({ tasks: { executed: 0, executing: 0, @@ -138,7 +171,7 @@ describe('Worker node test suite', () => { } } }) - expect(workerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({ + expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({ tasks: { executed: 0, executing: 0, @@ -161,6 +194,6 @@ describe('Worker node test suite', () => { } } }) - expect(workerNode.taskFunctionsUsage.size).toBe(2) + expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2) }) }) -- 2.34.1