X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=6081902720554c2bcdb809d37f04d5f93ed89da7;hb=79b197bb222403411621c41775d9d640f4150633;hp=a6ffcbbb3959caa7ac2480656a25798fa5223856;hpb=033f1776d603b75ce8dcd763278a1ce8fca5c479;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index a6ffcbbb..60819027 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -405,6 +405,13 @@ export abstract class AbstractPool< ...(this.opts.enableTasksQueue === true && { backPressure: this.hasBackPressure() }), + ...(this.opts.enableTasksQueue === true && { + stolenTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.stolen, + 0 + ) + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -1253,18 +1260,29 @@ export abstract class AbstractPool< ...(sourceWorkerNode.popTask() as Task), workerId: destinationWorkerNode.info.id as number } - // Enqueue task for continuous task stealing - this.enqueueTask(destinationWorkerNodeKey, task) - // Avoid starvation if ( - this.tasksQueueSize(destinationWorkerNodeKey) > 0 && + this.tasksQueueSize(destinationWorkerNodeKey) === 0 && destinationWorkerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) ) { - this.executeTask( - destinationWorkerNodeKey, - this.dequeueTask(destinationWorkerNodeKey) as Task - ) + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) + } + if (destinationWorkerNode?.usage != null) { + ++destinationWorkerNode.usage.tasks.stolen + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) && + destinationWorkerNode.getTaskFunctionWorkerUsage( + task.name as string + ) != null + ) { + const taskFunctionWorkerUsage = + destinationWorkerNode.getTaskFunctionWorkerUsage( + task.name as string + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.stolen } break } @@ -1301,6 +1319,18 @@ export abstract class AbstractPool< } else { this.enqueueTask(workerNodeKey, task) } + if (workerNode?.usage != null) { + ++workerNode.usage.tasks.stolen + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + workerNode.getTaskFunctionWorkerUsage(task.name as string) != null + ) { + const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( + task.name as string + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.stolen + } } } }