X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=a4d83b593cd52b4927aa480f557227e788d6cfeb;hb=0717df3dc168fc35b8268713ac8b7bde86f024a9;hp=1aa1a13413f807019352907d2dac7a635ab665d1;hpb=a25c335d7e5573a171fc96c445cb6092852b2c4a;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1aa1a134..a4d83b59 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -93,6 +93,10 @@ export abstract class AbstractPool< * Whether the pool is starting or not. */ private readonly starting: boolean + /** + * Whether the pool is started or not. + */ + private started: boolean /** * The start timestamp of the pool. */ @@ -141,6 +145,7 @@ export abstract class AbstractPool< this.starting = true this.startPool() this.starting = false + this.started = true this.startTimestamp = performance.now() } @@ -179,7 +184,7 @@ export abstract class AbstractPool< protected checkDynamicPoolSize (min: number, max: number): void { if (this.type === PoolTypes.dynamic) { if (max == null) { - throw new Error( + throw new TypeError( 'Cannot instantiate a dynamic pool without specifying the maximum pool size' ) } else if (!Number.isSafeInteger(max)) { @@ -295,15 +300,31 @@ export abstract class AbstractPool< !Number.isSafeInteger(tasksQueueOptions.concurrency) ) { throw new TypeError( - 'Invalid worker tasks concurrency: must be an integer' + 'Invalid worker node tasks concurrency: must be an integer' ) } if ( tasksQueueOptions?.concurrency != null && tasksQueueOptions.concurrency <= 0 ) { - throw new Error( - `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` + throw new RangeError( + `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` + ) + } + if ( + tasksQueueOptions?.queueMaxSize != null && + !Number.isSafeInteger(tasksQueueOptions.queueMaxSize) + ) { + throw new TypeError( + 'Invalid worker node tasks queue max size: must be an integer' + ) + } + if ( + tasksQueueOptions?.queueMaxSize != null && + tasksQueueOptions.queueMaxSize <= 0 + ) { + throw new RangeError( + `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero` ) } } @@ -620,16 +641,29 @@ export abstract class AbstractPool< this.checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) + this.setTasksQueueMaxSize( + this.opts.tasksQueueOptions.queueMaxSize as number + ) } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } } + private setTasksQueueMaxSize (queueMaxSize: number): void { + for (const workerNode of this.workerNodes) { + workerNode.tasksQueueBackPressureSize = queueMaxSize + } + } + private buildTasksQueueOptions ( tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - concurrency: tasksQueueOptions?.concurrency ?? 1 + ...{ + queueMaxSize: Math.pow(this.maxSize, 2), + concurrency: 1 + }, + ...tasksQueueOptions } } @@ -694,6 +728,9 @@ export abstract class AbstractPool< transferList?: TransferListItem[] ): Promise { return await new Promise((resolve, reject) => { + if (!this.started) { + reject(new Error('Cannot execute a task on destroyed pool')) + } if (name != null && typeof name !== 'string') { reject(new TypeError('name argument must be a string')) } @@ -754,6 +791,7 @@ export abstract class AbstractPool< }) ) this.emitter?.emit(PoolEvents.destroy, this.info) + this.started = false } protected async sendKillMessageToWorker ( @@ -850,7 +888,7 @@ export abstract class AbstractPool< const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey ].getTaskFunctionWorkerUsage( - message.taskPerformance?.name ?? DEFAULT_TASK_NAME + message.taskPerformance?.name as string ) as WorkerUsage this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) @@ -883,13 +921,6 @@ export abstract class AbstractPool< workerTaskStatistics.executing > 0 ) { --workerTaskStatistics.executing - } else if ( - workerTaskStatistics.executing != null && - workerTaskStatistics.executing < 0 - ) { - throw new Error( - 'Worker usage statistic for tasks executing cannot be negative' - ) } if (message.taskError == null) { ++workerTaskStatistics.executed @@ -1021,7 +1052,11 @@ export abstract class AbstractPool< workerInfo.ready = false this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) - if (this.opts.restartWorkerOnError === true && !this.starting) { + if ( + this.opts.restartWorkerOnError === true && + !this.starting && + this.started + ) { if (workerInfo.dynamic) { this.createAndSetupDynamicWorkerNode() } else { @@ -1113,6 +1148,10 @@ export abstract class AbstractPool< this.sendStartupMessageToWorker(workerNodeKey) // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) + if (this.opts.enableTasksQueue === true) { + this.workerNodes[workerNodeKey].onBackPressure = + this.tasksStealingOnBackPressure.bind(this) + } } /** @@ -1141,29 +1180,26 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { + const workerNodes = this.workerNodes.filter( + (_, workerNodeId) => workerNodeId !== workerNodeKey + ) while (this.tasksQueueSize(workerNodeKey) > 0) { let targetWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity let executeTask = false - for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo + for (const [workerNodeId, workerNode] of workerNodes.entries()) { if ( - workerNodeId !== workerNodeKey && - workerInfo.ready && - workerNode.usage.tasks.queued === 0 + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) ) { - if ( - this.workerNodes[workerNodeId].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) - ) { - executeTask = true - } + executeTask = true + } + if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) { targetWorkerNodeKey = workerNodeId break } if ( - workerNodeId !== workerNodeKey && - workerInfo.ready && + workerNode.info.ready && workerNode.usage.tasks.queued < minQueuedTasks ) { minQueuedTasks = workerNode.usage.tasks.queued @@ -1184,6 +1220,39 @@ export abstract class AbstractPool< } } + private tasksStealingOnBackPressure (workerId: number): void { + const sourceWorkerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodes = this.workerNodes + .filter((workerNode) => workerNode.info.id !== workerId) + .sort( + (workerNodeA, workerNodeB) => + workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued + ) + for (const [workerNodeKey, workerNode] of workerNodes.entries()) { + if ( + workerNode.info.ready && + sourceWorkerNode.usage.tasks.queued > 0 && + !workerNode.hasBackPressure() + ) { + if ( + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.executeTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task + ) + } else { + this.enqueueTask( + workerNodeKey, + sourceWorkerNode.popTask() as Task + ) + } + } + } + } + /** * This method is the listener registered for each worker message. * @@ -1292,7 +1361,7 @@ export abstract class AbstractPool< const workerNode = new WorkerNode( worker, this.worker, - this.maxSize + this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2) ) // Flag the worker node as ready at pool startup. if (this.starting) {