From: Jérôme Benoit Date: Fri, 24 Nov 2023 23:51:49 +0000 (+0100) Subject: fix: fix continuous tasks stealing on idle start at worker node idling X-Git-Tag: v3.0.8~7 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=463226a44b0370911db746052bcad0bcc7878acf;p=poolifier.git fix: fix continuous tasks stealing on idle start at worker node idling Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 49935144..bb18720d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Ensure continuous tasks stealing on idle start at worker node idling + ## [3.0.7] - 2023-11-24 ### Changed diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index bf018251..a5b81c72 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1813,8 +1813,8 @@ packages: restore-cursor: 4.0.0 dev: true - /cli-spinners@2.9.1: - resolution: {integrity: sha512-jHgecW0pxkonBJdrKsqxgRX9AcG+u/5k0Q7WPDfi8AogLAdwxEkyYYNWwZ5GvVFoFx2uiY1eNcSK00fh+1+FyQ==} + /cli-spinners@2.9.2: + resolution: {integrity: sha512-ywqV+5MmyL4E7ybXgKys4DugZbX0FC6LnwrhjuykIjnK9k8OQacQ7axGKnjDXWNhns0xot3bZI5h55H8yo9cJg==} engines: {node: '>=6'} dev: true @@ -4884,7 +4884,7 @@ packages: bl: 4.1.0 chalk: 4.1.2 cli-cursor: 3.1.0 - cli-spinners: 2.9.1 + cli-spinners: 2.9.2 is-interactive: 1.0.0 is-unicode-supported: 0.1.0 log-symbols: 4.1.0 @@ -4898,7 +4898,7 @@ packages: dependencies: chalk: 5.3.0 cli-cursor: 4.0.0 - cli-spinners: 2.9.1 + cli-spinners: 2.9.2 is-interactive: 2.0.0 is-unicode-supported: 1.3.0 log-symbols: 5.1.0 diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 8c2dcc31..ab319c62 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -12,12 +12,14 @@ import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, average, + exponentialDelay, isKillBehavior, isPlainObject, max, median, min, - round + round, + sleep } from '../utils' import { KillBehaviors } from '../worker/worker-options' import type { TaskFunction } from '../worker/task-functions' @@ -1478,11 +1480,79 @@ export abstract class AbstractPool< } } + private updateTaskSequentiallyStolenStatisticsWorkerUsage ( + workerNodeKey: number, + taskName: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] + if (workerNode?.usage != null) { + ++workerNode.usage.tasks.sequentiallyStolen + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + workerNode.getTaskFunctionWorkerUsage(taskName) != null + ) { + const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( + taskName + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.sequentiallyStolen + } + } + + private resetTaskSequentiallyStolenStatisticsWorkerUsage ( + workerNodeKey: number, + taskName: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] + if (workerNode?.usage != null) { + workerNode.usage.tasks.sequentiallyStolen = 0 + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + workerNode.getTaskFunctionWorkerUsage(taskName) != null + ) { + const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( + taskName + ) as WorkerUsage + taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + } + } + private readonly handleIdleWorkerNodeEvent = ( - event: CustomEvent + event: CustomEvent, + previousStolenTask?: Task ): void => { - const { workerId } = event.detail - const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const { workerNodeKey } = event.detail + if (workerNodeKey == null) { + throw new Error( + 'WorkerNode event detail workerNodeKey attribute must be defined' + ) + } + const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks + if ( + previousStolenTask != null && + workerNodeTasksUsage.sequentiallyStolen > 0 && + (workerNodeTasksUsage.executing > 0 || + this.tasksQueueSize(workerNodeKey) > 0) + ) { + this.resetTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + previousStolenTask.name as string + ) + return + } + const stolenTask = this.workerNodeStealTask(workerNodeKey) + sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) + .then(() => { + this.handleIdleWorkerNodeEvent(event, stolenTask) + return undefined + }) + .catch(EMPTY_FUNCTION) + } + + private readonly workerNodeStealTask = ( + workerNodeKey: number + ): Task | undefined => { const workerNodes = this.workerNodes .slice() .sort( @@ -1490,22 +1560,27 @@ export abstract class AbstractPool< workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued ) const sourceWorkerNode = workerNodes.find( - workerNode => - workerNode.info.ready && - workerNode.info.id !== workerId && - workerNode.usage.tasks.queued > 0 + (sourceWorkerNode, sourceWorkerNodeKey) => + sourceWorkerNode.info.ready && + sourceWorkerNodeKey !== workerNodeKey && + sourceWorkerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { const task = sourceWorkerNode.popTask() as Task - if (this.shallExecuteTask(destinationWorkerNodeKey)) { - this.executeTask(destinationWorkerNodeKey, task) + if (this.shallExecuteTask(workerNodeKey)) { + this.executeTask(workerNodeKey, task) } else { - this.enqueueTask(destinationWorkerNodeKey, task) + this.enqueueTask(workerNodeKey, task) } + this.updateTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + task.name as string + ) this.updateTaskStolenStatisticsWorkerUsage( - destinationWorkerNodeKey, + workerNodeKey, task.name as string ) + return task } } @@ -1567,16 +1642,15 @@ export abstract class AbstractPool< } private handleWorkerReadyResponse (message: MessageValue): void { - if (message.ready === false) { - throw new Error( - `Worker ${message.workerId as number} failed to initialize` - ) + const { workerId, ready, taskFunctionNames } = message + if (ready === false) { + throw new Error(`Worker ${workerId as number} failed to initialize`) } const workerInfo = this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(message.workerId) + this.getWorkerNodeKeyByWorkerId(workerId) ) - workerInfo.ready = message.ready as boolean - workerInfo.taskFunctionNames = message.taskFunctionNames + workerInfo.ready = ready as boolean + workerInfo.taskFunctionNames = taskFunctionNames if (!this.readyEventEmitted && this.ready) { this.readyEventEmitted = true this.emitter?.emit(PoolEvents.ready, this.info) @@ -1584,7 +1658,7 @@ export abstract class AbstractPool< } private handleTaskExecutionResponse (message: MessageValue): void { - const { taskId, workerError, data } = message + const { workerId, taskId, workerError, data } = message const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { const { resolve, reject, workerNodeKey } = promiseResponse @@ -1597,16 +1671,29 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) this.workerChoiceStrategyContext.update(workerNodeKey) this.promiseResponseMap.delete(taskId as string) - if ( - this.opts.enableTasksQueue === true && - this.tasksQueueSize(workerNodeKey) > 0 && - this.workerNodes[workerNodeKey].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) - ) { - this.executeTask( - workerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + if (this.opts.enableTasksQueue === true) { + const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks + if ( + this.tasksQueueSize(workerNodeKey) > 0 && + workerNodeTasksUsage.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.executeTask( + workerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } + if ( + workerNodeTasksUsage.executing === 0 && + this.tasksQueueSize(workerNodeKey) === 0 && + workerNodeTasksUsage.sequentiallyStolen === 0 + ) { + this.workerNodes[workerNodeKey].dispatchEvent( + new CustomEvent('idleWorkerNode', { + detail: { workerId: workerId as number, workerNodeKey } + }) + ) + } } } } diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 34a647fb..3de3cb80 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -1,14 +1,7 @@ import { MessageChannel } from 'node:worker_threads' import { CircularArray } from '../circular-array' import type { Task } from '../utility-types' -import { - DEFAULT_TASK_NAME, - EMPTY_FUNCTION, - exponentialDelay, - getWorkerId, - getWorkerType, - sleep -} from '../utils' +import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils' import { Deque } from '../deque' import { type IWorker, @@ -45,7 +38,6 @@ export class WorkerNode public tasksQueueBackPressureSize: number private readonly tasksQueue: Deque> private onBackPressureStarted: boolean - private onIdleWorkerNodeCount: number private readonly taskFunctionsUsage: Map /** @@ -66,7 +58,6 @@ export class WorkerNode this.tasksQueueBackPressureSize = tasksQueueBackPressureSize this.tasksQueue = new Deque>() this.onBackPressureStarted = false - this.onIdleWorkerNodeCount = 0 this.taskFunctionsUsage = new Map() } @@ -107,20 +98,12 @@ export class WorkerNode /** @inheritdoc */ public dequeueTask (): Task | undefined { - const task = this.tasksQueue.shift() - if (this.isIdle() && this.onIdleWorkerNodeCount === 0) { - this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION) - } - return task + return this.tasksQueue.shift() } /** @inheritdoc */ public popTask (): Task | undefined { - const task = this.tasksQueue.pop() - if (this.isIdle() && this.onIdleWorkerNodeCount === 0) { - this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION) - } - return task + return this.tasksQueue.pop() } /** @inheritdoc */ @@ -179,28 +162,6 @@ export class WorkerNode return this.taskFunctionsUsage.delete(name) } - private async startOnIdleWorkerNode (): Promise { - if ( - this.onIdleWorkerNodeCount > 0 && - (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0) - ) { - this.onIdleWorkerNodeCount = 0 - return - } - ++this.onIdleWorkerNodeCount - this.dispatchEvent( - new CustomEvent('idleWorkerNode', { - detail: { workerId: this.info.id as number } - }) - ) - await sleep(exponentialDelay(this.onIdleWorkerNodeCount)) - await this.startOnIdleWorkerNode() - } - - private isIdle (): boolean { - return this.usage.tasks.executing === 0 && this.tasksQueue.size === 0 - } - private initWorkerInfo (worker: Worker): WorkerInfo { return { id: getWorkerId(worker), @@ -227,6 +188,7 @@ export class WorkerNode get maxQueued (): number { return getTasksQueueMaxSize() }, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -268,6 +230,7 @@ export class WorkerNode get queued (): number { return getTaskFunctionQueueSize() }, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, diff --git a/src/pools/worker.ts b/src/pools/worker.ts index b3e02f3c..776986d5 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -104,6 +104,10 @@ export interface TaskStatistics { * Maximum number of queued tasks. */ readonly maxQueued?: number + /** + * Number of sequentially stolen tasks. + */ + sequentiallyStolen: number /** * Number of stolen tasks. */ @@ -223,6 +227,7 @@ export interface IWorker { */ export interface WorkerNodeEventDetail { workerId: number + workerNodeKey?: number } /** diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 1246a7d0..04e15c09 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -807,6 +807,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -972,6 +973,7 @@ describe('Abstract pool test suite', () => { executing: maxMultiplier, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -999,6 +1001,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1040,6 +1043,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1075,6 +1079,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1342,6 +1347,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: { @@ -1516,6 +1522,7 @@ describe('Abstract pool test suite', () => { executing: 0, failed: 0, queued: 0, + sequentiallyStolen: 0, stolen: 0 }, runTime: { diff --git a/tests/pools/cluster/fixed.test.mjs b/tests/pools/cluster/fixed.test.mjs index c9b528b9..a79be4a3 100644 --- a/tests/pools/cluster/fixed.test.mjs +++ b/tests/pools/cluster/fixed.test.mjs @@ -130,6 +130,7 @@ describe('Fixed cluster pool test suite', () => { expect(workerNode.usage.tasks.maxQueued).toBe( maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency ) + expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0) expect(workerNode.usage.tasks.stolen).toBe(0) } expect(queuePool.info.executedTasks).toBe(0) @@ -157,6 +158,12 @@ describe('Fixed cluster pool test suite', () => { expect(workerNode.usage.tasks.maxQueued).toBe( maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency ) + expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual( + 0 + ) + expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual( + numberOfWorkers * maxMultiplier + ) expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual( numberOfWorkers * maxMultiplier diff --git a/tests/pools/selection-strategies/selection-strategies.test.mjs b/tests/pools/selection-strategies/selection-strategies.test.mjs index fc8a6feb..4a2ace04 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.mjs +++ b/tests/pools/selection-strategies/selection-strategies.test.mjs @@ -278,6 +278,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: { @@ -333,6 +334,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: { @@ -558,6 +560,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: { @@ -616,6 +619,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: { @@ -755,6 +759,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: expect.objectContaining({ @@ -823,6 +828,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: expect.objectContaining({ @@ -972,6 +978,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: { @@ -1046,6 +1053,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: { @@ -1201,6 +1209,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: expect.objectContaining({ @@ -1286,6 +1295,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: expect.objectContaining({ @@ -1376,6 +1386,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: expect.objectContaining({ @@ -1576,6 +1587,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: expect.objectContaining({ @@ -1654,6 +1666,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: expect.objectContaining({ @@ -1737,6 +1750,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: expect.objectContaining({ @@ -1999,6 +2013,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: expect.objectContaining({ @@ -2084,6 +2099,7 @@ describe('Selection strategies test suite', () => { queued: 0, maxQueued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: expect.objectContaining({ diff --git a/tests/pools/thread/fixed.test.mjs b/tests/pools/thread/fixed.test.mjs index 6b46512c..6a435ffe 100644 --- a/tests/pools/thread/fixed.test.mjs +++ b/tests/pools/thread/fixed.test.mjs @@ -130,6 +130,7 @@ describe('Fixed thread pool test suite', () => { expect(workerNode.usage.tasks.maxQueued).toBe( maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency ) + expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0) expect(workerNode.usage.tasks.stolen).toBe(0) } expect(queuePool.info.executedTasks).toBe(0) @@ -157,6 +158,12 @@ describe('Fixed thread pool test suite', () => { expect(workerNode.usage.tasks.maxQueued).toBe( maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency ) + expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual( + 0 + ) + expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual( + numberOfThreads * maxMultiplier + ) expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual( numberOfThreads * maxMultiplier diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index b72c2887..834c3cb6 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -58,6 +58,7 @@ describe('Worker node test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -84,7 +85,6 @@ describe('Worker node test suite', () => { threadWorkerNode.tasksQueue.size ) expect(threadWorkerNode.onBackPressureStarted).toBe(false) - expect(threadWorkerNode.onIdleWorkerNodeCount).toBe(0) expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) expect(clusterWorkerNode).toBeInstanceOf(WorkerNode) @@ -101,6 +101,7 @@ describe('Worker node test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -127,7 +128,6 @@ describe('Worker node test suite', () => { clusterWorkerNode.tasksQueue.size ) expect(clusterWorkerNode.onBackPressureStarted).toBe(false) - expect(clusterWorkerNode.onIdleWorkerNodeCount).toBe(0) expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) }) @@ -156,6 +156,7 @@ describe('Worker node test suite', () => { executing: 0, queued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: { @@ -179,6 +180,7 @@ describe('Worker node test suite', () => { executing: 0, queued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: { @@ -202,6 +204,7 @@ describe('Worker node test suite', () => { executing: 0, queued: 0, stolen: 0, + sequentiallyStolen: 0, failed: 0 }, runTime: {