X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=7b0d541d8bcafaaaab1150fa01067c66acea30c2;hb=58baffd328c308bbffed882ebdc8bb68b8198b26;hp=1926a4a426fbcb84724ab55749778ff95c5910ee;hpb=f5d7178c9bf5e9c77fb69d191a2eadaf08539839;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1926a4a4..7b0d541d 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -5,7 +5,8 @@ import { type TransferListItem } from 'node:worker_threads' import type { MessageValue, PromiseResponseWrapper, - Task + Task, + Writable } from '../utility-types' import { DEFAULT_TASK_NAME, @@ -93,6 +94,10 @@ export abstract class AbstractPool< * Whether the pool is starting or not. */ private readonly starting: boolean + /** + * Whether the pool is started or not. + */ + private started: boolean /** * The start timestamp of the pool. */ @@ -141,6 +146,7 @@ export abstract class AbstractPool< this.starting = true this.startPool() this.starting = false + this.started = true this.startTimestamp = performance.now() } @@ -285,7 +291,7 @@ export abstract class AbstractPool< } private checkValidTasksQueueOptions ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: Writable ): void { if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { throw new TypeError('Invalid tasks queue options: must be a plain object') @@ -308,18 +314,26 @@ export abstract class AbstractPool< } if ( tasksQueueOptions?.queueMaxSize != null && - !Number.isSafeInteger(tasksQueueOptions.queueMaxSize) + tasksQueueOptions?.size != null ) { - throw new TypeError( - 'Invalid worker node tasks queue max size: must be an integer' + throw new Error( + 'Invalid tasks queue options: cannot specify both queueMaxSize and size' ) } + if (tasksQueueOptions?.queueMaxSize != null) { + tasksQueueOptions.size = tasksQueueOptions.queueMaxSize + } if ( - tasksQueueOptions?.queueMaxSize != null && - tasksQueueOptions.queueMaxSize <= 0 + tasksQueueOptions?.size != null && + !Number.isSafeInteger(tasksQueueOptions.size) ) { + throw new TypeError( + 'Invalid worker node tasks queue max size: must be an integer' + ) + } + if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { throw new RangeError( - `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero` + `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero` ) } } @@ -636,17 +650,15 @@ export abstract class AbstractPool< this.checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) - this.setTasksQueueMaxSize( - this.opts.tasksQueueOptions.queueMaxSize as number - ) + this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number) } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } } - private setTasksQueueMaxSize (queueMaxSize: number): void { + private setTasksQueueMaxSize (size: number): void { for (const workerNode of this.workerNodes) { - workerNode.tasksQueueBackPressureSize = queueMaxSize + workerNode.tasksQueueBackPressureSize = size } } @@ -655,7 +667,7 @@ export abstract class AbstractPool< ): TasksQueueOptions { return { ...{ - queueMaxSize: Math.pow(this.maxSize, 2), + size: Math.pow(this.maxSize, 2), concurrency: 1 }, ...tasksQueueOptions @@ -723,6 +735,9 @@ export abstract class AbstractPool< transferList?: TransferListItem[] ): Promise { return await new Promise((resolve, reject) => { + if (!this.started) { + reject(new Error('Cannot execute a task on destroyed pool')) + } if (name != null && typeof name !== 'string') { reject(new TypeError('name argument must be a string')) } @@ -783,6 +798,7 @@ export abstract class AbstractPool< }) ) this.emitter?.emit(PoolEvents.destroy, this.info) + this.started = false } protected async sendKillMessageToWorker ( @@ -912,13 +928,6 @@ export abstract class AbstractPool< workerTaskStatistics.executing > 0 ) { --workerTaskStatistics.executing - } else if ( - workerTaskStatistics.executing != null && - workerTaskStatistics.executing < 0 - ) { - throw new Error( - 'Worker usage statistic for tasks executing cannot be negative' - ) } if (message.taskError == null) { ++workerTaskStatistics.executed @@ -1050,7 +1059,11 @@ export abstract class AbstractPool< workerInfo.ready = false this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) - if (this.opts.restartWorkerOnError === true && !this.starting) { + if ( + this.opts.restartWorkerOnError === true && + !this.starting && + this.started + ) { if (workerInfo.dynamic) { this.createAndSetupDynamicWorkerNode() } else { @@ -1142,6 +1155,12 @@ export abstract class AbstractPool< this.sendStartupMessageToWorker(workerNodeKey) // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) + if (this.opts.enableTasksQueue === true) { + this.workerNodes[workerNodeKey].onEmptyQueue = + this.taskStealingOnEmptyQueue.bind(this) + this.workerNodes[workerNodeKey].onBackPressure = + this.tasksStealingOnBackPressure.bind(this) + } } /** @@ -1171,44 +1190,100 @@ export abstract class AbstractPool< private redistributeQueuedTasks (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { - let targetWorkerNodeKey: number = workerNodeKey + let destinationWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo - if ( - workerNodeId !== workerNodeKey && - workerInfo.ready && - workerNode.usage.tasks.queued === 0 - ) { + if (workerNode.info.ready && workerNodeId !== workerNodeKey) { if ( - this.workerNodes[workerNodeId].usage.tasks.executing < + workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) ) { executeTask = true } - targetWorkerNodeKey = workerNodeId - break + if (workerNode.usage.tasks.queued === 0) { + destinationWorkerNodeKey = workerNodeId + break + } + if (workerNode.usage.tasks.queued < minQueuedTasks) { + minQueuedTasks = workerNode.usage.tasks.queued + destinationWorkerNodeKey = workerNodeId + } + } + } + const task = { + ...(this.dequeueTask(workerNodeKey) as Task), + workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo) + .id as number + } + if (executeTask) { + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) + } + } + } + + private taskStealingOnEmptyQueue (workerId: number): void { + const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] + const workerNodes = this.workerNodes + .slice() + .sort( + (workerNodeA, workerNodeB) => + workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued + ) + for (const sourceWorkerNode of workerNodes) { + if ( + sourceWorkerNode.info.ready && + sourceWorkerNode.info.id !== workerId && + sourceWorkerNode.usage.tasks.queued > 0 + ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: destinationWorkerNode.info.id as number } if ( - workerNodeId !== workerNodeKey && - workerInfo.ready && - workerNode.usage.tasks.queued < minQueuedTasks + destinationWorkerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) ) { - minQueuedTasks = workerNode.usage.tasks.queued - targetWorkerNodeKey = workerNodeId + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) } + break } - if (executeTask) { - this.executeTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) - } else { - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + } + } + + private tasksStealingOnBackPressure (workerId: number): void { + const sourceWorkerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodes = this.workerNodes + .slice() + .sort( + (workerNodeA, workerNodeB) => + workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued + ) + for (const [workerNodeKey, workerNode] of workerNodes.entries()) { + if ( + workerNode.info.ready && + workerNode.info.id !== workerId && + sourceWorkerNode.usage.tasks.queued > 0 && + !workerNode.hasBackPressure() + ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: workerNode.info.id as number + } + if ( + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) + } } } } @@ -1321,7 +1396,7 @@ export abstract class AbstractPool< const workerNode = new WorkerNode( worker, this.worker, - this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2) + this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) ) // Flag the worker node as ready at pool startup. if (this.starting) {