From 0d80593b9a7596645612087f687fc6f5cab3101a Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 5 May 2023 23:21:58 +0200 Subject: [PATCH 1/1] refactor: factor out inputs type check MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 50 +++++++++++++------ .../fair-share-worker-choice-strategy.ts | 8 ++- ...hted-round-robin-worker-choice-strategy.ts | 6 +-- src/utils.ts | 6 +++ src/worker/abstract-worker.ts | 21 ++++---- tests/pools/abstract/abstract-pool.test.js | 2 +- tests/worker/abstract-worker.test.js | 2 +- 7 files changed, 58 insertions(+), 37 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 71067f5c..820656bb 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -3,6 +3,7 @@ import type { MessageValue, PromiseResponseWrapper } from '../utility-types' import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, + isPlainObject, median } from '../utils' import { KillBehaviors, isKillBehavior } from '../worker/worker-options' @@ -126,7 +127,7 @@ export abstract class AbstractPool< ) } else if (!Number.isSafeInteger(numberOfWorkers)) { throw new TypeError( - 'Cannot instantiate a pool with a non integer number of workers' + 'Cannot instantiate a pool with a non safe integer number of workers' ) } else if (numberOfWorkers < 0) { throw new RangeError( @@ -138,20 +139,25 @@ export abstract class AbstractPool< } private checkPoolOptions (opts: PoolOptions): void { - this.opts.workerChoiceStrategy = - opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN - this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) - this.opts.workerChoiceStrategyOptions = - opts.workerChoiceStrategyOptions ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS - this.opts.enableEvents = opts.enableEvents ?? true - this.opts.enableTasksQueue = opts.enableTasksQueue ?? false - if (this.opts.enableTasksQueue) { - this.checkValidTasksQueueOptions( - opts.tasksQueueOptions as TasksQueueOptions - ) - this.opts.tasksQueueOptions = this.buildTasksQueueOptions( - opts.tasksQueueOptions as TasksQueueOptions - ) + if (isPlainObject(opts)) { + this.opts.workerChoiceStrategy = + opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN + this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) + this.opts.workerChoiceStrategyOptions = + opts.workerChoiceStrategyOptions ?? + DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + this.opts.enableEvents = opts.enableEvents ?? true + this.opts.enableTasksQueue = opts.enableTasksQueue ?? false + if (this.opts.enableTasksQueue) { + this.checkValidTasksQueueOptions( + opts.tasksQueueOptions as TasksQueueOptions + ) + this.opts.tasksQueueOptions = this.buildTasksQueueOptions( + opts.tasksQueueOptions as TasksQueueOptions + ) + } + } else { + throw new TypeError('Invalid pool options: must be a plain object') } } @@ -165,9 +171,22 @@ export abstract class AbstractPool< } } + private checkValidWorkerChoiceStrategyOptions ( + workerChoiceStrategyOptions: WorkerChoiceStrategyOptions + ): void { + if (!isPlainObject(workerChoiceStrategyOptions)) { + throw new TypeError( + 'Invalid worker choice strategy options: must be a plain object' + ) + } + } + private checkValidTasksQueueOptions ( tasksQueueOptions: TasksQueueOptions ): void { + if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { + throw new TypeError('Invalid tasks queue options: must be a plain object') + } if ((tasksQueueOptions?.concurrency as number) <= 0) { throw new Error( `Invalid worker tasks concurrency '${ @@ -248,6 +267,7 @@ export abstract class AbstractPool< public setWorkerChoiceStrategyOptions ( workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { + this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions this.workerChoiceStrategyContext.setOptions( this.opts.workerChoiceStrategyOptions diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index e42d54d9..f0bc8787 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -64,12 +64,10 @@ export class FairShareWorkerChoiceStrategy< let chosenWorkerNodeKey!: number for (const [workerNodeKey] of this.pool.workerNodes.entries()) { this.computeWorkerVirtualTaskTimestamp(workerNodeKey) - const workerLastVirtualTaskEndTimestamp = + const workerVirtualTaskEndTimestamp = this.workersVirtualTaskTimestamp[workerNodeKey]?.end ?? 0 - if ( - workerLastVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp - ) { - minWorkerVirtualTaskEndTimestamp = workerLastVirtualTaskEndTimestamp + if (workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp) { + minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp chosenWorkerNodeKey = workerNodeKey } } diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index b0750949..ad20db42 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -64,12 +64,12 @@ export class WeightedRoundRobinWorkerChoiceStrategy< /** @inheritDoc */ public choose (): number { const chosenWorkerNodeKey = this.currentWorkerNodeId - const workerTaskRunTime = this.workerVirtualTaskRunTime ?? 0 + const workerVirtualTaskRunTime = this.workerVirtualTaskRunTime ?? 0 const workerTaskWeight = this.opts.weights?.[chosenWorkerNodeKey] ?? this.defaultWorkerWeight - if (workerTaskRunTime < workerTaskWeight) { + if (workerVirtualTaskRunTime < workerTaskWeight) { this.workerVirtualTaskRunTime = - workerTaskRunTime + + workerVirtualTaskRunTime + (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey) ?? 0) } else { this.currentWorkerNodeId = diff --git a/src/utils.ts b/src/utils.ts index fc328a6c..f30e175c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -32,3 +32,9 @@ export const median = (dataSet: number[]): number => { } return (sortedDataSet[middleIndex - 1] + sortedDataSet[middleIndex]) / 2 } + +export const isPlainObject = (obj: unknown): boolean => + typeof obj === 'object' && + obj !== null && + obj?.constructor === Object && + Object.prototype.toString.call(obj) === '[object Object]' diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 8b461f7d..12fb67af 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -8,7 +8,7 @@ import type { WorkerFunction, WorkerSyncFunction } from '../utility-types' -import { EMPTY_FUNCTION } from '../utils' +import { EMPTY_FUNCTION, isPlainObject } from '../utils' import type { KillBehavior, WorkerOptions } from './worker-options' import { KillBehaviors } from './worker-options' @@ -107,21 +107,18 @@ export abstract class AbstractWorker< typeof taskFunctions !== 'function' && typeof taskFunctions !== 'object' ) { - throw new Error('taskFunctions parameter is not a function or an object') - } - if ( - typeof taskFunctions === 'object' && - taskFunctions.constructor !== Object && - Object.prototype.toString.call(taskFunctions) !== '[object Object]' - ) { - throw new Error('taskFunctions parameter is not an object literal') + throw new TypeError( + 'taskFunctions parameter is not a function or an object' + ) } this.taskFunctions = new Map>() - if (typeof taskFunctions !== 'function') { + if (typeof taskFunctions === 'function') { + this.taskFunctions.set(DEFAULT_FUNCTION_NAME, taskFunctions.bind(this)) + } else if (isPlainObject(taskFunctions)) { let firstEntry = true for (const [name, fn] of Object.entries(taskFunctions)) { if (typeof fn !== 'function') { - throw new Error( + throw new TypeError( 'A taskFunctions parameter object value is not a function' ) } @@ -135,7 +132,7 @@ export abstract class AbstractWorker< throw new Error('taskFunctions parameter object is empty') } } else { - this.taskFunctions.set(DEFAULT_FUNCTION_NAME, taskFunctions.bind(this)) + throw new TypeError('taskFunctions parameter is not an object literal') } } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 81ae3649..a852bab4 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -75,7 +75,7 @@ describe('Abstract pool test suite', () => { new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js') ).toThrowError( new TypeError( - 'Cannot instantiate a pool with a non integer number of workers' + 'Cannot instantiate a pool with a non safe integer number of workers' ) ) }) diff --git a/tests/worker/abstract-worker.test.js b/tests/worker/abstract-worker.test.js index 7f05cbb7..3c0beda3 100644 --- a/tests/worker/abstract-worker.test.js +++ b/tests/worker/abstract-worker.test.js @@ -62,7 +62,7 @@ describe('Abstract worker test suite', () => { new TypeError('taskFunctions parameter is not an object literal') ) expect(() => new ClusterWorker({})).toThrowError( - new TypeError('taskFunctions parameter object is empty') + new Error('taskFunctions parameter object is empty') ) }) -- 2.34.1