X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=7b0d541d8bcafaaaab1150fa01067c66acea30c2;hb=58baffd328c308bbffed882ebdc8bb68b8198b26;hp=7310e9efa2abd32e58bbb6744049e7f1573dbc34;hpb=15b176e0295f49da9ddc609e03b328cb8963eee1;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 7310e9ef..7b0d541d 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -5,7 +5,8 @@ import { type TransferListItem } from 'node:worker_threads' import type { MessageValue, PromiseResponseWrapper, - Task + Task, + Writable } from '../utility-types' import { DEFAULT_TASK_NAME, @@ -290,7 +291,7 @@ export abstract class AbstractPool< } private checkValidTasksQueueOptions ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: Writable ): void { if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { throw new TypeError('Invalid tasks queue options: must be a plain object') @@ -313,18 +314,26 @@ export abstract class AbstractPool< } if ( tasksQueueOptions?.queueMaxSize != null && - !Number.isSafeInteger(tasksQueueOptions.queueMaxSize) + tasksQueueOptions?.size != null ) { - throw new TypeError( - 'Invalid worker node tasks queue max size: must be an integer' + throw new Error( + 'Invalid tasks queue options: cannot specify both queueMaxSize and size' ) } + if (tasksQueueOptions?.queueMaxSize != null) { + tasksQueueOptions.size = tasksQueueOptions.queueMaxSize + } if ( - tasksQueueOptions?.queueMaxSize != null && - tasksQueueOptions.queueMaxSize <= 0 + tasksQueueOptions?.size != null && + !Number.isSafeInteger(tasksQueueOptions.size) ) { + throw new TypeError( + 'Invalid worker node tasks queue max size: must be an integer' + ) + } + if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { throw new RangeError( - `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero` + `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero` ) } } @@ -641,17 +650,15 @@ export abstract class AbstractPool< this.checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) - this.setTasksQueueMaxSize( - this.opts.tasksQueueOptions.queueMaxSize as number - ) + this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number) } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } } - private setTasksQueueMaxSize (queueMaxSize: number): void { + private setTasksQueueMaxSize (size: number): void { for (const workerNode of this.workerNodes) { - workerNode.tasksQueueBackPressureSize = queueMaxSize + workerNode.tasksQueueBackPressureSize = size } } @@ -660,7 +667,7 @@ export abstract class AbstractPool< ): TasksQueueOptions { return { ...{ - queueMaxSize: Math.pow(this.maxSize, 2), + size: Math.pow(this.maxSize, 2), concurrency: 1 }, ...tasksQueueOptions @@ -921,13 +928,6 @@ export abstract class AbstractPool< workerTaskStatistics.executing > 0 ) { --workerTaskStatistics.executing - } else if ( - workerTaskStatistics.executing != null && - workerTaskStatistics.executing < 0 - ) { - throw new Error( - 'Worker usage statistic for tasks executing cannot be negative' - ) } if (message.taskError == null) { ++workerTaskStatistics.executed @@ -1156,6 +1156,8 @@ export abstract class AbstractPool< // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { + this.workerNodes[workerNodeKey].onEmptyQueue = + this.taskStealingOnEmptyQueue.bind(this) this.workerNodes[workerNodeKey].onBackPressure = this.tasksStealingOnBackPressure.bind(this) } @@ -1188,43 +1190,68 @@ export abstract class AbstractPool< private redistributeQueuedTasks (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { - let targetWorkerNodeKey: number = workerNodeKey + let destinationWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - if ( - this.workerNodes[workerNodeId].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) - ) { - executeTask = true + if (workerNode.info.ready && workerNodeId !== workerNodeKey) { + if ( + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + executeTask = true + } + if (workerNode.usage.tasks.queued === 0) { + destinationWorkerNodeKey = workerNodeId + break + } + if (workerNode.usage.tasks.queued < minQueuedTasks) { + minQueuedTasks = workerNode.usage.tasks.queued + destinationWorkerNodeKey = workerNodeId + } } - if ( - workerNodeId !== workerNodeKey && - workerNode.info.ready && - workerNode.usage.tasks.queued === 0 - ) { - targetWorkerNodeKey = workerNodeId - break + } + const task = { + ...(this.dequeueTask(workerNodeKey) as Task), + workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo) + .id as number + } + if (executeTask) { + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) + } + } + } + + private taskStealingOnEmptyQueue (workerId: number): void { + const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] + const workerNodes = this.workerNodes + .slice() + .sort( + (workerNodeA, workerNodeB) => + workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued + ) + for (const sourceWorkerNode of workerNodes) { + if ( + sourceWorkerNode.info.ready && + sourceWorkerNode.info.id !== workerId && + sourceWorkerNode.usage.tasks.queued > 0 + ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: destinationWorkerNode.info.id as number } if ( - workerNodeId !== workerNodeKey && - workerNode.info.ready && - workerNode.usage.tasks.queued < minQueuedTasks + destinationWorkerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) ) { - minQueuedTasks = workerNode.usage.tasks.queued - targetWorkerNodeKey = workerNodeId + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) } - } - if (executeTask) { - this.executeTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) - } else { - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + break } } } @@ -1233,7 +1260,7 @@ export abstract class AbstractPool< const sourceWorkerNode = this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] const workerNodes = this.workerNodes - .filter((workerNode) => workerNode.info.id !== workerId) + .slice() .sort( (workerNodeA, workerNodeB) => workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued @@ -1241,26 +1268,22 @@ export abstract class AbstractPool< for (const [workerNodeKey, workerNode] of workerNodes.entries()) { if ( workerNode.info.ready && + workerNode.info.id !== workerId && sourceWorkerNode.usage.tasks.queued > 0 && - !workerNode.hasBackPressure() && - workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + !workerNode.hasBackPressure() ) { - this.executeTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) - } else if ( - workerNode.info.ready && - sourceWorkerNode.usage.tasks.queued > 0 && - !workerNode.hasBackPressure() && - workerNode.usage.tasks.executing >= + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: workerNode.info.id as number + } + if ( + workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) - ) { - this.enqueueTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) + ) { + this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) + } } } } @@ -1373,7 +1396,7 @@ export abstract class AbstractPool< const workerNode = new WorkerNode( worker, this.worker, - this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2) + this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) ) // Flag the worker node as ready at pool startup. if (this.starting) {