From 68cbdc846878bc058323b757a68b4c83eedc6388 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 25 Aug 2023 16:27:32 +0200 Subject: [PATCH] feat: continuous task stealing MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 5 ++ src/pools/abstract-pool.ts | 22 ++++++ src/pools/pool.ts | 1 + src/pools/worker-node.ts | 28 +++++++- src/pools/worker.ts | 4 ++ src/utils.ts | 53 ++++++++++----- tests/pools/abstract/abstract-pool.test.js | 9 ++- .../selection-strategies.test.js | 68 ++++++++++++------- 8 files changed, 144 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e8866ff..653c06a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Update simple moving average implementation to use a circular buffer. - Update simple moving median implementation to use a circular buffer. +- Account for stolen tasks in worker usage statistics and pool information. + +### Added + +- Continuous tasks stealing algorithm. ## [2.6.34] - 2023-08-24 diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 062df077..4abbd615 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -405,6 +405,13 @@ export abstract class AbstractPool< ...(this.opts.enableTasksQueue === true && { backPressure: this.hasBackPressure() }), + ...(this.opts.enableTasksQueue === true && { + stolenTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.stolen, + 0 + ) + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -1262,6 +1269,14 @@ export abstract class AbstractPool< } else { this.enqueueTask(destinationWorkerNodeKey, task) } + ++destinationWorkerNode.usage.tasks.stolen + if (this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey)) { + const taskFunctionWorkerUsage = + destinationWorkerNode.getTaskFunctionWorkerUsage( + task.name as string + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.stolen + } break } } @@ -1297,6 +1312,13 @@ export abstract class AbstractPool< } else { this.enqueueTask(workerNodeKey, task) } + ++workerNode.usage.tasks.stolen + if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( + task.name as string + ) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.stolen + } } } } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index cc028fec..4463560c 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -80,6 +80,7 @@ export interface PoolInfo { readonly queuedTasks?: number readonly maxQueuedTasks?: number readonly backPressure?: boolean + readonly stolenTasks?: number readonly failedTasks: number readonly runTime?: { readonly minimum: number diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index e79aed0b..e28372ef 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -1,7 +1,12 @@ import { MessageChannel } from 'node:worker_threads' import { CircularArray } from '../circular-array' import type { Task } from '../utility-types' -import { DEFAULT_TASK_NAME } from '../utils' +import { + DEFAULT_TASK_NAME, + EMPTY_FUNCTION, + exponentialDelay, + sleep +} from '../utils' import { Deque } from '../deque' import { type IWorker, @@ -36,6 +41,7 @@ implements IWorkerNode { public onEmptyQueue?: (workerId: number) => void private readonly taskFunctionsUsage: Map private readonly tasksQueue: Deque> + private onEmptyQueueCount: number /** * Constructs a new worker node. @@ -76,6 +82,7 @@ implements IWorkerNode { this.taskFunctionsUsage = new Map() this.tasksQueue = new Deque>() this.tasksQueueBackPressureSize = tasksQueueBackPressureSize + this.onEmptyQueueCount = 0 } /** @inheritdoc */ @@ -105,7 +112,7 @@ implements IWorkerNode { public dequeueTask (): Task | undefined { const task = this.tasksQueue.shift() if (this.onEmptyQueue != null && this.tasksQueue.size === 0) { - this.onEmptyQueue(this.info.id as number) + this.startOnEmptyQueue().catch(EMPTY_FUNCTION) } return task } @@ -114,7 +121,7 @@ implements IWorkerNode { public popTask (): Task | undefined { const task = this.tasksQueue.pop() if (this.onEmptyQueue != null && this.tasksQueue.size === 0) { - this.onEmptyQueue(this.info.id as number) + this.startOnEmptyQueue().catch(EMPTY_FUNCTION) } return task } @@ -170,6 +177,19 @@ implements IWorkerNode { return this.taskFunctionsUsage.get(name) } + private async startOnEmptyQueue (): Promise { + if (this.onEmptyQueue != null) { + if (this.tasksQueue.size > 0) { + this.onEmptyQueueCount = 0 + return + } + this.onEmptyQueue(this.info.id as number) + ++this.onEmptyQueueCount + await sleep(exponentialDelay(this.onEmptyQueueCount)) + await this.startOnEmptyQueue() + } + } + private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo { return { id: this.getWorkerId(worker, workerType), @@ -196,6 +216,7 @@ implements IWorkerNode { get maxQueued (): number { return getTasksQueueMaxSize() }, + stolen: 0, failed: 0 }, runTime: { @@ -236,6 +257,7 @@ implements IWorkerNode { get queued (): number { return getTaskFunctionQueueSize() }, + stolen: 0, failed: 0 }, runTime: { diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 6e81112d..52b56a24 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -96,6 +96,10 @@ export interface TaskStatistics { * Maximum number of queued tasks. */ readonly maxQueued?: number + /** + * Number of stolen tasks. + */ + stolen: number /** * Number of failed tasks. */ diff --git a/src/utils.ts b/src/utils.ts index d39bfa02..b5fb9afe 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -59,22 +59,34 @@ export const availableParallelism = (): number => { return availableParallelism } -// /** -// * Computes the retry delay in milliseconds using an exponential back off algorithm. -// * -// * @param retryNumber - The number of retries that have already been attempted -// * @param maxDelayRatio - The maximum ratio of the delay that can be randomized -// * @returns Delay in milliseconds -// * @internal -// */ -// export const exponentialDelay = ( -// retryNumber = 0, -// maxDelayRatio = 0.2 -// ): number => { -// const delay = Math.pow(2, retryNumber) * 100 -// const randomSum = delay * maxDelayRatio * Math.random() // 0-(maxDelayRatio*100)% of the delay -// return delay + randomSum -// } +/** + * Sleeps for the given amount of milliseconds. + * + * @param ms - The amount of milliseconds to sleep. + * @returns A promise that resolves after the given amount of milliseconds. + */ +export const sleep = async (ms: number): Promise => { + await new Promise((resolve) => { + setTimeout(resolve, ms) + }) +} + +/** + * Computes the retry delay in milliseconds using an exponential back off algorithm. + * + * @param retryNumber - The number of retries that have already been attempted + * @param maxDelayRatio - The maximum ratio of the delay that can be randomized + * @returns Delay in milliseconds + * @internal + */ +export const exponentialDelay = ( + retryNumber = 0, + maxDelayRatio = 0.2 +): number => { + const delay = Math.pow(2, retryNumber) * 100 + const randomSum = delay * maxDelayRatio * secureRandom() // 0-(maxDelayRatio*100)% of the delay + return delay + randomSum +} /** * Computes the average of the given data set. @@ -234,3 +246,12 @@ export const once = ( } } } + +/** + * Generate a cryptographically secure random number in the [0,1[ range + * + * @returns A number in the [0,1[ range + */ +const secureRandom = (): number => { + return crypto.getRandomValues(new Uint32Array(1))[0] / 0x100000000 +} diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 7f7c4dce..933e1f94 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -618,6 +618,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -710,6 +711,7 @@ describe('Abstract pool test suite', () => { executing: maxMultiplier, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -736,6 +738,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -776,6 +779,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -810,6 +814,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -977,6 +982,7 @@ describe('Abstract pool test suite', () => { maxQueuedTasks: expect.any(Number), queuedTasks: expect.any(Number), backPressure: true, + stolenTasks: expect.any(Number), failedTasks: expect.any(Number) }) expect(pool.hasBackPressure.called).toBe(true) @@ -1040,7 +1046,8 @@ describe('Abstract pool test suite', () => { executed: expect.any(Number), executing: expect.any(Number), failed: 0, - queued: 0 + queued: 0, + stolen: 0 }, runTime: { history: expect.any(CircularArray) diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 0e47e18e..77713b6f 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -217,6 +217,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -265,6 +266,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -464,6 +466,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -511,6 +514,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -639,6 +643,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -696,6 +701,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -834,6 +840,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -843,12 +850,12 @@ describe('Selection strategies test suite', () => { history: expect.any(CircularArray) }, elu: { - idle: expect.objectContaining({ + idle: { history: expect.any(CircularArray) - }), - active: expect.objectContaining({ + }, + active: { history: expect.any(CircularArray) - }) + } } }) expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) @@ -887,6 +894,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -896,12 +904,12 @@ describe('Selection strategies test suite', () => { history: expect.any(CircularArray) }, elu: { - idle: expect.objectContaining({ + idle: { history: expect.any(CircularArray) - }), - active: expect.objectContaining({ + }, + active: { history: expect.any(CircularArray) - }) + } } }) expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) @@ -1021,21 +1029,22 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: expect.objectContaining({ + runTime: { history: expect.any(CircularArray) - }), + }, waitTime: { history: expect.any(CircularArray) }, elu: { - idle: expect.objectContaining({ + idle: { history: expect.any(CircularArray) - }), - active: expect.objectContaining({ + }, + active: { history: expect.any(CircularArray) - }) + } } }) expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) @@ -1089,21 +1098,22 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: expect.objectContaining({ + runTime: { history: expect.any(CircularArray) - }), + }, waitTime: { history: expect.any(CircularArray) }, elu: { - idle: expect.objectContaining({ + idle: { history: expect.any(CircularArray) - }), - active: expect.objectContaining({ + }, + active: { history: expect.any(CircularArray) - }) + } } }) expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) @@ -1162,21 +1172,22 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, - runTime: expect.objectContaining({ + runTime: { history: expect.any(CircularArray) - }), + }, waitTime: { history: expect.any(CircularArray) }, elu: { - idle: expect.objectContaining({ + idle: { history: expect.any(CircularArray) - }), - active: expect.objectContaining({ + }, + active: { history: expect.any(CircularArray) - }) + } } }) expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) @@ -1385,6 +1396,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: expect.objectContaining({ @@ -1452,6 +1464,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: expect.objectContaining({ @@ -1524,6 +1537,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: expect.objectContaining({ @@ -1755,6 +1769,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -1825,6 +1840,7 @@ describe('Selection strategies test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { -- 2.34.1