X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=3a1a87807cc065df871bb1ca6d2195619672a64d;hb=ff3f866a041b682830639663364c389628095658;hp=f0119f4ed3c8135e596d171a0f6ec093d7b29935;hpb=72695f86742c18357f42f000ce6488d545187133;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index f0119f4e..3a1a8780 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -5,7 +5,8 @@ import { type TransferListItem } from 'node:worker_threads' import type { MessageValue, PromiseResponseWrapper, - Task + Task, + Writable } from '../utility-types' import { DEFAULT_TASK_NAME, @@ -93,6 +94,10 @@ export abstract class AbstractPool< * Whether the pool is starting or not. */ private readonly starting: boolean + /** + * Whether the pool is started or not. + */ + private started: boolean /** * The start timestamp of the pool. */ @@ -141,6 +146,7 @@ export abstract class AbstractPool< this.starting = true this.startPool() this.starting = false + this.started = true this.startTimestamp = performance.now() } @@ -285,7 +291,7 @@ export abstract class AbstractPool< } private checkValidTasksQueueOptions ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: Writable ): void { if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { throw new TypeError('Invalid tasks queue options: must be a plain object') @@ -308,18 +314,26 @@ export abstract class AbstractPool< } if ( tasksQueueOptions?.queueMaxSize != null && - !Number.isSafeInteger(tasksQueueOptions.queueMaxSize) + tasksQueueOptions?.size != null ) { - throw new TypeError( - 'Invalid worker node tasks queue max size: must be an integer' + throw new Error( + 'Invalid tasks queue options: cannot specify both queueMaxSize and size' ) } + if (tasksQueueOptions?.queueMaxSize != null) { + tasksQueueOptions.size = tasksQueueOptions.queueMaxSize + } if ( - tasksQueueOptions?.queueMaxSize != null && - tasksQueueOptions.queueMaxSize <= 0 + tasksQueueOptions?.size != null && + !Number.isSafeInteger(tasksQueueOptions.size) ) { + throw new TypeError( + 'Invalid worker node tasks queue max size: must be an integer' + ) + } + if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { throw new RangeError( - `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero` + `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero` ) } } @@ -636,17 +650,15 @@ export abstract class AbstractPool< this.checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) - this.setTasksQueueMaxSize( - this.opts.tasksQueueOptions.queueMaxSize as number - ) + this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number) } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } } - private setTasksQueueMaxSize (queueMaxSize: number): void { + private setTasksQueueMaxSize (size: number): void { for (const workerNode of this.workerNodes) { - workerNode.tasksQueueBackPressureSize = queueMaxSize + workerNode.tasksQueueBackPressureSize = size } } @@ -655,7 +667,7 @@ export abstract class AbstractPool< ): TasksQueueOptions { return { ...{ - queueMaxSize: Math.pow(this.maxSize, 2), + size: Math.pow(this.maxSize, 2), concurrency: 1 }, ...tasksQueueOptions @@ -723,6 +735,9 @@ export abstract class AbstractPool< transferList?: TransferListItem[] ): Promise { return await new Promise((resolve, reject) => { + if (!this.started) { + reject(new Error('Cannot execute a task on destroyed pool')) + } if (name != null && typeof name !== 'string') { reject(new TypeError('name argument must be a string')) } @@ -783,6 +798,7 @@ export abstract class AbstractPool< }) ) this.emitter?.emit(PoolEvents.destroy, this.info) + this.started = false } protected async sendKillMessageToWorker ( @@ -912,13 +928,6 @@ export abstract class AbstractPool< workerTaskStatistics.executing > 0 ) { --workerTaskStatistics.executing - } else if ( - workerTaskStatistics.executing != null && - workerTaskStatistics.executing < 0 - ) { - throw new Error( - 'Worker usage statistic for tasks executing cannot be negative' - ) } if (message.taskError == null) { ++workerTaskStatistics.executed @@ -1050,7 +1059,11 @@ export abstract class AbstractPool< workerInfo.ready = false this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) - if (this.opts.restartWorkerOnError === true && !this.starting) { + if ( + this.opts.restartWorkerOnError === true && + !this.starting && + this.started + ) { if (workerInfo.dynamic) { this.createAndSetupDynamicWorkerNode() } else { @@ -1174,27 +1187,25 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { + const workerNodes = this.workerNodes.filter( + (_, workerNodeId) => workerNodeId !== workerNodeKey + ) while (this.tasksQueueSize(workerNodeKey) > 0) { let targetWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity let executeTask = false - for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { + for (const [workerNodeId, workerNode] of workerNodes.entries()) { if ( - this.workerNodes[workerNodeId].usage.tasks.executing < + workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) ) { executeTask = true } - if ( - workerNodeId !== workerNodeKey && - workerNode.info.ready && - workerNode.usage.tasks.queued === 0 - ) { + if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) { targetWorkerNodeKey = workerNodeId break } if ( - workerNodeId !== workerNodeKey && workerNode.info.ready && workerNode.usage.tasks.queued < minQueuedTasks ) { @@ -1205,12 +1216,12 @@ export abstract class AbstractPool< if (executeTask) { this.executeTask( targetWorkerNodeKey, - this.popTask(workerNodeKey) as Task + this.dequeueTask(workerNodeKey) as Task ) } else { this.enqueueTask( targetWorkerNodeKey, - this.popTask(workerNodeKey) as Task + this.dequeueTask(workerNodeKey) as Task ) } } @@ -1229,25 +1240,22 @@ export abstract class AbstractPool< if ( workerNode.info.ready && sourceWorkerNode.usage.tasks.queued > 0 && - !workerNode.hasBackPressure() && - workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + !workerNode.hasBackPressure() ) { - this.executeTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) - } else if ( - workerNode.info.ready && - sourceWorkerNode.usage.tasks.queued > 0 && - !workerNode.hasBackPressure() && - workerNode.usage.tasks.executing >= + if ( + workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) - ) { - this.enqueueTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) + ) { + this.executeTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task + ) + } else { + this.enqueueTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task + ) + } } } } @@ -1360,7 +1368,7 @@ export abstract class AbstractPool< const workerNode = new WorkerNode( worker, this.worker, - this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2) + this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) ) // Flag the worker node as ready at pool startup. if (this.starting) { @@ -1426,10 +1434,6 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].dequeueTask() } - private popTask (workerNodeKey: number): Task | undefined { - return this.workerNodes[workerNodeKey].popTask() - } - private tasksQueueSize (workerNodeKey: number): number { return this.workerNodes[workerNodeKey].tasksQueueSize() }