From 9f95d5ebb5081a7f52a6cbd6a5a7c703081296da Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 13 Oct 2023 21:38:49 +0200 Subject: [PATCH] feat: make worker node an event target MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/index.ts | 1 - src/pools/abstract-pool.ts | 53 +++++++++++++++++++++--------- src/pools/worker-node.ts | 52 +++++++++++++---------------- src/pools/worker.ts | 18 ++++------ tests/pools/abstract-pool.test.mjs | 22 ------------- 5 files changed, 66 insertions(+), 80 deletions(-) diff --git a/src/index.ts b/src/index.ts index f3bbe503..1e5ecbff 100644 --- a/src/index.ts +++ b/src/index.ts @@ -26,7 +26,6 @@ export type { StrategyData, TaskStatistics, WorkerInfo, - WorkerNodeEventCallback, WorkerType, WorkerUsage } from './pools/worker' diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 08a955a9..ba87b704 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -34,6 +34,7 @@ import type { IWorker, IWorkerNode, WorkerInfo, + WorkerNodeEventDetail, WorkerType, WorkerUsage } from './worker' @@ -619,27 +620,37 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].onEmptyQueue = - this.taskStealingOnEmptyQueue.bind(this) + this.workerNodes[workerNodeKey].addEventListener( + 'emptyqueue', + this.handleEmptyQueueEvent + ) } } private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - delete this.workerNodes[workerNodeKey].onEmptyQueue + this.workerNodes[workerNodeKey].removeEventListener( + 'emptyqueue', + this.handleEmptyQueueEvent + ) } } private setTasksStealingOnBackPressure (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].onBackPressure = - this.tasksStealingOnBackPressure.bind(this) + this.workerNodes[workerNodeKey].addEventListener( + 'backpressure', + this.handleBackPressureEvent + ) } } private unsetTasksStealingOnBackPressure (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - delete this.workerNodes[workerNodeKey].onBackPressure + this.workerNodes[workerNodeKey].removeEventListener( + 'backpressure', + this.handleBackPressureEvent + ) } } @@ -1353,12 +1364,16 @@ export abstract class AbstractPool< this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { - this.workerNodes[workerNodeKey].onEmptyQueue = - this.taskStealingOnEmptyQueue.bind(this) + this.workerNodes[workerNodeKey].addEventListener( + 'emptyqueue', + this.handleEmptyQueueEvent.bind(this) + ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { - this.workerNodes[workerNodeKey].onBackPressure = - this.tasksStealingOnBackPressure.bind(this) + this.workerNodes[workerNodeKey].addEventListener( + 'backpressure', + this.handleBackPressureEvent.bind(this) + ) } } } @@ -1427,8 +1442,12 @@ export abstract class AbstractPool< } } - private taskStealingOnEmptyQueue (workerId: number): void { - const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + private readonly handleEmptyQueueEvent = ( + event: CustomEvent + ): void => { + const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( + event.detail.workerId + ) const workerNodes = this.workerNodes .slice() .sort( @@ -1438,7 +1457,7 @@ export abstract class AbstractPool< const sourceWorkerNode = workerNodes.find( workerNode => workerNode.info.ready && - workerNode.info.id !== workerId && + workerNode.info.id !== event.detail.workerId && workerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { @@ -1455,13 +1474,15 @@ export abstract class AbstractPool< } } - private tasksStealingOnBackPressure (workerId: number): void { + private readonly handleBackPressureEvent = ( + event: CustomEvent + ): void => { const sizeOffset = 1 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { return } const sourceWorkerNode = - this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)] const workerNodes = this.workerNodes .slice() .sort( @@ -1472,7 +1493,7 @@ export abstract class AbstractPool< if ( sourceWorkerNode.usage.tasks.queued > 0 && workerNode.info.ready && - workerNode.info.id !== workerId && + workerNode.info.id !== event.detail.workerId && workerNode.usage.tasks.queued < (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 6eb2d646..718fd489 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -15,7 +15,7 @@ import { type IWorkerNode, type StrategyData, type WorkerInfo, - type WorkerNodeEventCallback, + type WorkerNodeEventDetail, type WorkerType, WorkerTypes, type WorkerUsage @@ -29,7 +29,8 @@ import { checkWorkerNodeArguments } from './utils' * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. */ export class WorkerNode -implements IWorkerNode { + extends EventTarget + implements IWorkerNode { /** @inheritdoc */ public readonly worker: Worker /** @inheritdoc */ @@ -42,10 +43,6 @@ implements IWorkerNode { public messageChannel?: MessageChannel /** @inheritdoc */ public tasksQueueBackPressureSize: number - /** @inheritdoc */ - public onBackPressure?: WorkerNodeEventCallback - /** @inheritdoc */ - public onEmptyQueue?: WorkerNodeEventCallback private readonly tasksQueue: Deque> private onBackPressureStarted: boolean private onEmptyQueueCount: number @@ -58,6 +55,7 @@ implements IWorkerNode { * @param tasksQueueBackPressureSize - The tasks queue back pressure size. */ constructor (worker: Worker, tasksQueueBackPressureSize: number) { + super() checkWorkerNodeArguments(worker, tasksQueueBackPressureSize) this.worker = worker this.info = this.initWorkerInfo(worker) @@ -80,13 +78,13 @@ implements IWorkerNode { /** @inheritdoc */ public enqueueTask (task: Task): number { const tasksQueueSize = this.tasksQueue.push(task) - if ( - this.onBackPressure != null && - this.hasBackPressure() && - !this.onBackPressureStarted - ) { + if (this.hasBackPressure() && !this.onBackPressureStarted) { this.onBackPressureStarted = true - this.onBackPressure(this.info.id as number) + this.dispatchEvent( + new CustomEvent('backpressure', { + detail: { workerId: this.info.id as number } + }) + ) this.onBackPressureStarted = false } return tasksQueueSize @@ -95,13 +93,13 @@ implements IWorkerNode { /** @inheritdoc */ public unshiftTask (task: Task): number { const tasksQueueSize = this.tasksQueue.unshift(task) - if ( - this.onBackPressure != null && - this.hasBackPressure() && - !this.onBackPressureStarted - ) { + if (this.hasBackPressure() && !this.onBackPressureStarted) { this.onBackPressureStarted = true - this.onBackPressure(this.info.id as number) + this.dispatchEvent( + new CustomEvent('backpressure', { + detail: { workerId: this.info.id as number } + }) + ) this.onBackPressureStarted = false } return tasksQueueSize @@ -110,11 +108,7 @@ implements IWorkerNode { /** @inheritdoc */ public dequeueTask (): Task | undefined { const task = this.tasksQueue.shift() - if ( - this.onEmptyQueue != null && - this.tasksQueue.size === 0 && - this.onEmptyQueueCount === 0 - ) { + if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) { this.startOnEmptyQueue().catch(EMPTY_FUNCTION) } return task @@ -123,11 +117,7 @@ implements IWorkerNode { /** @inheritdoc */ public popTask (): Task | undefined { const task = this.tasksQueue.pop() - if ( - this.onEmptyQueue != null && - this.tasksQueue.size === 0 && - this.onEmptyQueueCount === 0 - ) { + if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) { this.startOnEmptyQueue().catch(EMPTY_FUNCTION) } return task @@ -198,7 +188,11 @@ implements IWorkerNode { return } ++this.onEmptyQueueCount - this.onEmptyQueue?.(this.info.id as number) + this.dispatchEvent( + new CustomEvent('emptyqueue', { + detail: { workerId: this.info.id as number } + }) + ) await sleep(exponentialDelay(this.onEmptyQueueCount)) await this.startOnEmptyQueue() } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 4c877bc8..43e7fc60 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -209,12 +209,13 @@ export interface IWorker { } /** - * Worker node event callback. + * Worker node event detail. * - * @param workerId - The worker id. * @internal */ -export type WorkerNodeEventCallback = (workerId: number) => void +export interface WorkerNodeEventDetail { + workerId: number +} /** * Worker node interface. @@ -223,7 +224,8 @@ export type WorkerNodeEventCallback = (workerId: number) => void * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. * @internal */ -export interface IWorkerNode { +export interface IWorkerNode + extends EventTarget { /** * Worker. */ @@ -250,14 +252,6 @@ export interface IWorkerNode { * This is the number of tasks that can be enqueued before the worker node has back pressure. */ tasksQueueBackPressureSize: number - /** - * Callback invoked when worker node tasks queue is back pressured. - */ - onBackPressure?: WorkerNodeEventCallback - /** - * Callback invoked when worker node tasks queue is empty. - */ - onEmptyQueue?: WorkerNodeEventCallback /** * Tasks queue size. * diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index c42dbc94..896610f8 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -634,10 +634,6 @@ describe('Abstract pool test suite', () => { ) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() - for (const workerNode of pool.workerNodes) { - expect(workerNode.onEmptyQueue).toBeUndefined() - expect(workerNode.onBackPressure).toBeUndefined() - } pool.enableTasksQueue(true) expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ @@ -646,10 +642,6 @@ describe('Abstract pool test suite', () => { taskStealing: true, tasksStealingOnBackPressure: true }) - for (const workerNode of pool.workerNodes) { - expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) - expect(workerNode.onBackPressure).toBeInstanceOf(Function) - } pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ @@ -658,17 +650,9 @@ describe('Abstract pool test suite', () => { taskStealing: true, tasksStealingOnBackPressure: true }) - for (const workerNode of pool.workerNodes) { - expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) - expect(workerNode.onBackPressure).toBeInstanceOf(Function) - } pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() - for (const workerNode of pool.workerNodes) { - expect(workerNode.onEmptyQueue).toBeUndefined() - expect(workerNode.onBackPressure).toBeUndefined() - } await pool.destroy() }) @@ -688,8 +672,6 @@ describe('Abstract pool test suite', () => { expect(workerNode.tasksQueueBackPressureSize).toBe( pool.opts.tasksQueueOptions.size ) - expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) - expect(workerNode.onBackPressure).toBeInstanceOf(Function) } pool.setTasksQueueOptions({ concurrency: 2, @@ -707,8 +689,6 @@ describe('Abstract pool test suite', () => { expect(workerNode.tasksQueueBackPressureSize).toBe( pool.opts.tasksQueueOptions.size ) - expect(workerNode.onEmptyQueue).toBeUndefined() - expect(workerNode.onBackPressure).toBeUndefined() } pool.setTasksQueueOptions({ concurrency: 1, @@ -725,8 +705,6 @@ describe('Abstract pool test suite', () => { expect(workerNode.tasksQueueBackPressureSize).toBe( pool.opts.tasksQueueOptions.size ) - expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) - expect(workerNode.onBackPressure).toBeInstanceOf(Function) } expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow( new TypeError('Invalid tasks queue options: must be a plain object') -- 2.34.1