X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=c6712d198bc28a9d77c08ae817e5c96c3b9227c6;hb=255d48dfda3aae693d228d0c1caea6f9ab296b8b;hp=d1bb5bc4e0fa1c8fc2dc1f8d650f81efe66cfec8;hpb=4de3d785b5e8987cab449269a73b7dfa42aca0cc;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index d1bb5bc4..c6712d19 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -5,7 +5,8 @@ import { type TransferListItem } from 'node:worker_threads' import type { MessageValue, PromiseResponseWrapper, - Task + Task, + Writable } from '../utility-types' import { DEFAULT_TASK_NAME, @@ -290,7 +291,7 @@ export abstract class AbstractPool< } private checkValidTasksQueueOptions ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: Writable ): void { if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { throw new TypeError('Invalid tasks queue options: must be a plain object') @@ -313,18 +314,26 @@ export abstract class AbstractPool< } if ( tasksQueueOptions?.queueMaxSize != null && - !Number.isSafeInteger(tasksQueueOptions.queueMaxSize) + tasksQueueOptions?.size != null ) { - throw new TypeError( - 'Invalid worker node tasks queue max size: must be an integer' + throw new Error( + 'Invalid tasks queue options: cannot specify both queueMaxSize and size' ) } + if (tasksQueueOptions?.queueMaxSize != null) { + tasksQueueOptions.size = tasksQueueOptions.queueMaxSize + } if ( - tasksQueueOptions?.queueMaxSize != null && - tasksQueueOptions.queueMaxSize <= 0 + tasksQueueOptions?.size != null && + !Number.isSafeInteger(tasksQueueOptions.size) ) { + throw new TypeError( + 'Invalid worker node tasks queue max size: must be an integer' + ) + } + if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { throw new RangeError( - `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero` + `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero` ) } } @@ -641,17 +650,15 @@ export abstract class AbstractPool< this.checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) - this.setTasksQueueMaxSize( - this.opts.tasksQueueOptions.queueMaxSize as number - ) + this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number) } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } } - private setTasksQueueMaxSize (queueMaxSize: number): void { + private setTasksQueueMaxSize (size: number): void { for (const workerNode of this.workerNodes) { - workerNode.tasksQueueBackPressureSize = queueMaxSize + workerNode.tasksQueueBackPressureSize = size } } @@ -660,7 +667,7 @@ export abstract class AbstractPool< ): TasksQueueOptions { return { ...{ - queueMaxSize: Math.pow(this.maxSize, 2), + size: Math.pow(this.maxSize, 2), concurrency: 1 }, ...tasksQueueOptions @@ -1149,6 +1156,8 @@ export abstract class AbstractPool< // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { + this.workerNodes[workerNodeKey].onEmptyQueue = + this.taskStealingOnEmptyQueue.bind(this) this.workerNodes[workerNodeKey].onBackPressure = this.tasksStealingOnBackPressure.bind(this) } @@ -1180,42 +1189,77 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { - const workerNodes = this.workerNodes.filter( - (_, workerNodeId) => workerNodeId !== workerNodeKey - ) while (this.tasksQueueSize(workerNodeKey) > 0) { - let targetWorkerNodeKey: number = workerNodeKey + let destinationWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity let executeTask = false - for (const [workerNodeId, workerNode] of workerNodes.entries()) { + for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { if ( - this.workerNodes[workerNodeId].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + workerNode.info.ready && + workerNodeId !== workerNodeKey && + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) ) { executeTask = true } - if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) { - targetWorkerNodeKey = workerNodeId + if ( + workerNode.info.ready && + workerNodeId !== workerNodeKey && + workerNode.usage.tasks.queued === 0 + ) { + destinationWorkerNodeKey = workerNodeId break } if ( workerNode.info.ready && + workerNodeId !== workerNodeKey && workerNode.usage.tasks.queued < minQueuedTasks ) { minQueuedTasks = workerNode.usage.tasks.queued - targetWorkerNodeKey = workerNodeId + destinationWorkerNodeKey = workerNodeId } } + const task = { + ...(this.dequeueTask(workerNodeKey) as Task), + workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo) + .id as number + } if (executeTask) { - this.executeTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + this.executeTask(destinationWorkerNodeKey, task) } else { - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + this.enqueueTask(destinationWorkerNodeKey, task) + } + } + } + + private taskStealingOnEmptyQueue (workerId: number): void { + const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] + const workerNodes = this.workerNodes + .slice() + .sort( + (workerNodeA, workerNodeB) => + workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued + ) + for (const sourceWorkerNode of workerNodes) { + if ( + sourceWorkerNode.info.ready && + sourceWorkerNode.info.id !== workerId && + sourceWorkerNode.usage.tasks.queued > 0 + ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: destinationWorkerNode.info.id as number + } + if ( + destinationWorkerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) + } + break } } } @@ -1224,7 +1268,7 @@ export abstract class AbstractPool< const sourceWorkerNode = this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] const workerNodes = this.workerNodes - .filter((workerNode) => workerNode.info.id !== workerId) + .slice() .sort( (workerNodeA, workerNodeB) => workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued @@ -1232,22 +1276,21 @@ export abstract class AbstractPool< for (const [workerNodeKey, workerNode] of workerNodes.entries()) { if ( workerNode.info.ready && + workerNode.info.id !== workerId && sourceWorkerNode.usage.tasks.queued > 0 && !workerNode.hasBackPressure() ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: workerNode.info.id as number + } if ( workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) ) { - this.executeTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) + this.executeTask(workerNodeKey, task) } else { - this.enqueueTask( - workerNodeKey, - sourceWorkerNode.popTask() as Task - ) + this.enqueueTask(workerNodeKey, task) } } } @@ -1361,7 +1404,7 @@ export abstract class AbstractPool< const workerNode = new WorkerNode( worker, this.worker, - this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2) + this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) ) // Flag the worker node as ready at pool startup. if (this.starting) {