From: Jérôme Benoit Date: Sat, 8 Jul 2023 20:42:41 +0000 (+0200) Subject: feat: internal messaging strict worker id checking X-Git-Tag: v2.6.10~6 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=21f710aa73abbb5d90328cfb199adfc0f7a70406;p=poolifier.git feat: internal messaging strict worker id checking Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 6c9a0fd1..687ae25f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -162,6 +162,10 @@ export abstract class AbstractPool< throw new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' ) + } else if (this.type === PoolTypes.dynamic && min === 0 && max === 0) { + throw new RangeError( + 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero' + ) } else if (this.type === PoolTypes.dynamic && min === max) { throw new RangeError( 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' @@ -462,6 +466,17 @@ export abstract class AbstractPool< ?.worker } + private checkMessageWorkerId (message: MessageValue): void { + if ( + message.workerId != null && + this.getWorkerById(message.workerId) == null + ) { + throw new Error( + `Worker message received from unknown worker '${message.workerId}'` + ) + } + } + /** * Gets the given worker its worker node key. * @@ -573,6 +588,7 @@ export abstract class AbstractPool< // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), timestamp, + workerId: this.getWorkerInfo(workerNodeKey).id as number, id: randomUUID() } const res = new Promise((resolve, reject) => { @@ -894,7 +910,7 @@ export abstract class AbstractPool< // Send startup message to worker. this.sendToWorker(worker, { ready: false, - workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id + workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number }) // Setup worker task statistics computation. this.setWorkerStatistics(worker) @@ -988,8 +1004,12 @@ export abstract class AbstractPool< void (this.destroyWorker(worker) as Promise) } }) - this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true - this.sendToWorker(worker, { checkAlive: true }) + const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker)) + workerInfo.dynamic = true + this.sendToWorker(worker, { + checkAlive: true, + workerId: workerInfo.id as number + }) return worker } @@ -1000,6 +1020,7 @@ export abstract class AbstractPool< */ protected workerListener (): (message: MessageValue) => void { return message => { + this.checkMessageWorkerId(message) if (message.ready != null && message.workerId != null) { // Worker ready message received this.handleWorkerReadyMessage(message) @@ -1011,17 +1032,9 @@ export abstract class AbstractPool< } private handleWorkerReadyMessage (message: MessageValue): void { - const worker = this.getWorkerById(message.workerId as number) - if (worker != null) { - this.getWorkerInfo(this.getWorkerNodeKey(worker)).ready = - message.ready as boolean - } else { - throw new Error( - `Worker ready message received from unknown worker '${ - message.workerId as number - }'` - ) - } + const worker = this.getWorkerById(message.workerId) + this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready = + message.ready as boolean if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) } @@ -1138,7 +1151,8 @@ export abstract class AbstractPool< .runTime.aggregate, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate - } + }, + workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number }) } } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 3c69ec2f..6e9b4b07 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -61,7 +61,7 @@ export class FixedClusterPool< /** @inheritDoc */ protected destroyWorker (worker: Worker): void { - this.sendToWorker(worker, { kill: true }) + this.sendToWorker(worker, { kill: true, workerId: worker.id }) worker.on('disconnect', () => { worker.kill() }) diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 29f00880..c9145f57 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -55,7 +55,7 @@ export class FixedThreadPool< /** @inheritDoc */ protected async destroyWorker (worker: Worker): Promise { - this.sendToWorker(worker, { kill: true }) + this.sendToWorker(worker, { kill: true, workerId: worker.threadId }) await worker.terminate() } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 5b427a56..c6303221 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -36,6 +36,10 @@ export type ExitHandler = ( * @internal */ export interface Task { + /** + * Worker id. + */ + readonly workerId: number /** * Task name. */ diff --git a/src/utility-types.ts b/src/utility-types.ts index f1b773eb..a8da491e 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -8,10 +8,6 @@ import type { IWorker, Task } from './pools/worker' * @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data. */ export interface TaskError { - /** - * Worker id. - */ - readonly workerId: number /** * Error message. */ @@ -61,10 +57,6 @@ export interface WorkerStatistics { */ export interface MessageValue extends Task { - /** - * Worker id. - */ - readonly workerId?: number /** * Kill code. */ diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 4e92be01..8122f909 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -148,13 +148,17 @@ export abstract class AbstractWorker< if (message.ready != null && message.workerId === this.id) { // Startup message received this.workerReady() - } else if (message.statistics != null) { + } else if (message.statistics != null && message.workerId === this.id) { // Statistics message received this.statistics = message.statistics - } else if (message.checkAlive != null) { + } else if (message.checkAlive != null && message.workerId === this.id) { // Check alive message received message.checkAlive ? this.startCheckAlive() : this.stopCheckAlive() - } else if (message.id != null && message.data != null) { + } else if ( + message.id != null && + message.data != null && + message.workerId === this.id + ) { // Task message received const fn = this.getTaskFunction(message.name) if (isAsyncFunction(fn)) { @@ -162,7 +166,7 @@ export abstract class AbstractWorker< } else { this.runInAsyncScope(this.runSync.bind(this), this, fn, message) } - } else if (message.kill === true) { + } else if (message.kill === true && message.workerId === this.id) { // Kill message received this.stopCheckAlive() this.emitDestroy() @@ -203,7 +207,7 @@ export abstract class AbstractWorker< performance.now() - this.lastTaskTimestamp > (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) ) { - this.sendToMainWorker({ kill: this.opts.killBehavior }) + this.sendToMainWorker({ kill: this.opts.killBehavior, workerId: this.id }) } } @@ -262,10 +266,10 @@ export abstract class AbstractWorker< const errorMessage = this.handleError(e as Error | string) this.sendToMainWorker({ taskError: { - workerId: this.id, message: errorMessage, data: message.data }, + workerId: this.id, id: message.id }) } finally { @@ -301,10 +305,10 @@ export abstract class AbstractWorker< const errorMessage = this.handleError(e as Error | string) this.sendToMainWorker({ taskError: { - workerId: this.id, message: errorMessage, data: message.data }, + workerId: this.id, id: message.id }) }) diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index caec34fd..6c340ed9 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -16,13 +16,6 @@ const { waitPoolEvents } = require('../../test-utils') describe('Abstract pool test suite', () => { const numberOfWorkers = 2 - class StubPoolWithRemoveAllWorker extends FixedThreadPool { - removeAllWorker () { - this.workerNodes = [] - this.promiseResponseMap.clear() - this.handleWorkerReadyMessage = () => {} - } - } class StubPoolWithIsMain extends FixedThreadPool { isMain () { return false @@ -99,6 +92,14 @@ describe('Abstract pool test suite', () => { 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' ) ) + expect( + () => + new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js') + ).toThrowError( + new RangeError( + 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero' + ) + ) }) it('Verify that pool options are checked', async () => { @@ -457,21 +458,6 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it('Simulate worker not found', async () => { - const pool = new StubPoolWithRemoveAllWorker( - numberOfWorkers, - './tests/worker-files/thread/testWorker.js', - { - errorHandler: e => console.error(e) - } - ) - expect(pool.workerNodes.length).toBe(numberOfWorkers) - // Simulate worker not found. - pool.removeAllWorker() - expect(pool.workerNodes.length).toBe(0) - await pool.destroy() - }) - it('Verify that pool worker tasks usage are initialized', async () => { const pool = new FixedClusterPool( numberOfWorkers, diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index e0ad0ccc..f9423e73 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -147,7 +147,6 @@ describe('Fixed cluster pool test suite', () => { expect(typeof inError === 'string').toBe(true) expect(inError).toBe('Error Message from ClusterWorker') expect(taskError).toStrictEqual({ - workerId: expect.any(Number), message: 'Error Message from ClusterWorker', data }) @@ -174,7 +173,6 @@ describe('Fixed cluster pool test suite', () => { expect(typeof inError === 'string').toBe(true) expect(inError).toBe('Error Message from ClusterWorker:async') expect(taskError).toStrictEqual({ - workerId: expect.any(Number), message: 'Error Message from ClusterWorker:async', data }) diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 4ffc3547..829c8ade 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -149,7 +149,6 @@ describe('Fixed thread pool test suite', () => { expect(typeof inError.message === 'string').toBe(true) expect(inError.message).toBe('Error Message from ThreadWorker') expect(taskError).toStrictEqual({ - workerId: expect.any(Number), message: new Error('Error Message from ThreadWorker'), data }) @@ -178,7 +177,6 @@ describe('Fixed thread pool test suite', () => { expect(typeof inError.message === 'string').toBe(true) expect(inError.message).toBe('Error Message from ThreadWorker:async') expect(taskError).toStrictEqual({ - workerId: expect.any(Number), message: new Error('Error Message from ThreadWorker:async'), data })