X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=9aeeebed772b37234c2a8f0e88c5cb45c65f0dce;hb=b0b55f57cb5e2bc363bc75d84b483c9c29a5d22f;hp=6761882589bb14bd9d205c33684ac7e7b3ab5128;hpb=440042a64e5beb9e3e97f513198f4bd2d4a896a6;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 67618825..9aeeebed 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -223,16 +223,18 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { this.opts.startWorkers = opts.startWorkers ?? true + this.checkValidWorkerChoiceStrategy( + opts.workerChoiceStrategy as WorkerChoiceStrategy + ) this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN - this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) + this.checkValidWorkerChoiceStrategyOptions( + opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions + ) this.opts.workerChoiceStrategyOptions = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts.workerChoiceStrategyOptions } - this.checkValidWorkerChoiceStrategyOptions( - this.opts.workerChoiceStrategyOptions - ) this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false @@ -252,7 +254,10 @@ export abstract class AbstractPool< private checkValidWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy ): void { - if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) { + if ( + workerChoiceStrategy != null && + !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy) + ) { throw new Error( `Invalid worker choice strategy '${workerChoiceStrategy}'` ) @@ -262,13 +267,16 @@ export abstract class AbstractPool< private checkValidWorkerChoiceStrategyOptions ( workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { - if (!isPlainObject(workerChoiceStrategyOptions)) { + if ( + workerChoiceStrategyOptions != null && + !isPlainObject(workerChoiceStrategyOptions) + ) { throw new TypeError( 'Invalid worker choice strategy options: must be a plain object' ) } if ( - workerChoiceStrategyOptions.retries != null && + workerChoiceStrategyOptions?.retries != null && !Number.isSafeInteger(workerChoiceStrategyOptions.retries) ) { throw new TypeError( @@ -276,7 +284,7 @@ export abstract class AbstractPool< ) } if ( - workerChoiceStrategyOptions.retries != null && + workerChoiceStrategyOptions?.retries != null && workerChoiceStrategyOptions.retries < 0 ) { throw new RangeError( @@ -284,7 +292,7 @@ export abstract class AbstractPool< ) } if ( - workerChoiceStrategyOptions.weights != null && + workerChoiceStrategyOptions?.weights != null && Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize ) { throw new Error( @@ -292,7 +300,7 @@ export abstract class AbstractPool< ) } if ( - workerChoiceStrategyOptions.measurement != null && + workerChoiceStrategyOptions?.measurement != null && !Object.values(Measurements).includes( workerChoiceStrategyOptions.measurement ) @@ -311,7 +319,7 @@ export abstract class AbstractPool< } if ( tasksQueueOptions?.concurrency != null && - !Number.isSafeInteger(tasksQueueOptions?.concurrency) + !Number.isSafeInteger(tasksQueueOptions.concurrency) ) { throw new TypeError( 'Invalid worker node tasks concurrency: must be an integer' @@ -319,23 +327,23 @@ export abstract class AbstractPool< } if ( tasksQueueOptions?.concurrency != null && - tasksQueueOptions?.concurrency <= 0 + tasksQueueOptions.concurrency <= 0 ) { throw new RangeError( - `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero` + `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` ) } if ( tasksQueueOptions?.size != null && - !Number.isSafeInteger(tasksQueueOptions?.size) + !Number.isSafeInteger(tasksQueueOptions.size) ) { throw new TypeError( 'Invalid worker node tasks queue size: must be an integer' ) } - if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) { + if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { throw new RangeError( - `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero` + `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero` ) } } @@ -640,6 +648,8 @@ export abstract class AbstractPool< tasksQueueOptions?: TasksQueueOptions ): void { if (this.opts.enableTasksQueue === true && !enable) { + this.unsetTaskStealing() + this.unsetTasksStealingOnBackPressure() this.flushTasksQueues() } this.opts.enableTasksQueue = enable @@ -653,17 +663,21 @@ export abstract class AbstractPool< this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) + if (this.opts.tasksQueueOptions.taskStealing === true) { + this.setTaskStealing() + } else { + this.unsetTaskStealing() + } + if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) { + this.setTasksStealingOnBackPressure() + } else { + this.unsetTasksStealingOnBackPressure() + } } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } } - private setTasksQueueSize (size: number): void { - for (const workerNode of this.workerNodes) { - workerNode.tasksQueueBackPressureSize = size - } - } - private buildTasksQueueOptions ( tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { @@ -678,6 +692,38 @@ export abstract class AbstractPool< } } + private setTasksQueueSize (size: number): void { + for (const workerNode of this.workerNodes) { + workerNode.tasksQueueBackPressureSize = size + } + } + + private setTaskStealing (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.workerNodes[workerNodeKey].onEmptyQueue = + this.taskStealingOnEmptyQueue.bind(this) + } + } + + private unsetTaskStealing (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + delete this.workerNodes[workerNodeKey].onEmptyQueue + } + } + + private setTasksStealingOnBackPressure (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.workerNodes[workerNodeKey].onBackPressure = + this.tasksStealingOnBackPressure.bind(this) + } + } + + private unsetTasksStealingOnBackPressure (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + delete this.workerNodes[workerNodeKey].onBackPressure + } + } + /** * Whether the pool is full or not. * @@ -722,8 +768,8 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): Promise { - const workerId = this.getWorkerInfo(workerNodeKey).id as number return await new Promise((resolve, reject) => { + const workerId = this.getWorkerInfo(workerNodeKey).id as number this.registerWorkerMessageListener(workerNodeKey, message => { if ( message.workerId === workerId && @@ -736,9 +782,11 @@ export abstract class AbstractPool< ) { reject( new Error( - `Task function operation ${ + `Task function operation '${ message.taskFunctionOperation as string - } failed on worker ${message.workerId}` + }' failed on worker ${message.workerId} with error: '${ + message.workerError?.message as string + }'` ) ) } @@ -748,7 +796,7 @@ export abstract class AbstractPool< } private async sendTaskFunctionOperationToWorkers ( - message: Omit, 'workerId'> + message: MessageValue ): Promise { return await new Promise((resolve, reject) => { const responsesReceived = new Array>() @@ -769,11 +817,18 @@ export abstract class AbstractPool< message => message.taskFunctionOperationStatus === false ) ) { + const errorResponse = responsesReceived.find( + response => response.taskFunctionOperationStatus === false + ) reject( new Error( - `Task function operation ${ + `Task function operation '${ message.taskFunctionOperation as string - } failed on worker ${message.workerId as number}` + }' failed on worker ${ + errorResponse?.workerId as number + } with error: '${ + errorResponse?.workerError?.message as string + }'` ) ) } @@ -800,23 +855,40 @@ export abstract class AbstractPool< /** @inheritDoc */ public async addTaskFunction ( name: string, - taskFunction: TaskFunction + fn: TaskFunction ): Promise { - this.taskFunctions.set(name, taskFunction) - return await this.sendTaskFunctionOperationToWorkers({ + if (typeof name !== 'string') { + throw new TypeError('name argument must be a string') + } + if (typeof name === 'string' && name.trim().length === 0) { + throw new TypeError('name argument must not be an empty string') + } + if (typeof fn !== 'function') { + throw new TypeError('fn argument must be a function') + } + const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', taskFunctionName: name, - taskFunction: taskFunction.toString() + taskFunction: fn.toString() }) + this.taskFunctions.set(name, fn) + return opResult } /** @inheritDoc */ public async removeTaskFunction (name: string): Promise { - this.taskFunctions.delete(name) - return await this.sendTaskFunctionOperationToWorkers({ + if (!this.taskFunctions.has(name)) { + throw new Error( + 'Cannot remove a task function not handled on the pool side' + ) + } + const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'remove', taskFunctionName: name }) + this.deleteTaskFunctionWorkerUsages(name) + this.taskFunctions.delete(name) + return opResult } /** @inheritDoc */ @@ -840,6 +912,12 @@ export abstract class AbstractPool< }) } + private deleteTaskFunctionWorkerUsages (name: string): void { + for (const workerNode of this.workerNodes) { + workerNode.deleteTaskFunctionWorkerUsage(name) + } + } + private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && @@ -1195,9 +1273,9 @@ export abstract class AbstractPool< this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) if ( - this.opts.restartWorkerOnError === true && this.started && - !this.starting + !this.starting && + this.opts.restartWorkerOnError === true ) { if (workerInfo.dynamic) { this.createAndSetupDynamicWorkerNode() @@ -1205,7 +1283,7 @@ export abstract class AbstractPool< this.createAndSetupWorkerNode() } } - if (this.opts.enableTasksQueue === true) { + if (this.started && this.opts.enableTasksQueue === true) { this.redistributeQueuedTasks(workerNodeKey) } }) @@ -1473,8 +1551,8 @@ export abstract class AbstractPool< ) workerInfo.ready = message.ready as boolean workerInfo.taskFunctionNames = message.taskFunctionNames - if (this.emitter != null && this.ready) { - this.emitter.emit(PoolEvents.ready, this.info) + if (this.ready) { + this.emitter?.emit(PoolEvents.ready, this.info) } }