From: Jérôme Benoit Date: Sat, 25 Nov 2023 13:50:00 +0000 (+0100) Subject: fix: rely on node event emitter for worker node events X-Git-Tag: v3.0.8~1 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=e1c2dba7ec9c30556abfc492327abdc8957a664b;p=poolifier.git fix: rely on node event emitter for worker node events Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 5da69ca1..eb92eb54 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -630,36 +630,36 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'idleWorkerNode', - this.handleIdleWorkerNodeEvent as EventListener + this.handleIdleWorkerNodeEvent ) } } private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].removeEventListener( + this.workerNodes[workerNodeKey].off( 'idleWorkerNode', - this.handleIdleWorkerNodeEvent as EventListener + this.handleIdleWorkerNodeEvent ) } } private setTasksStealingOnBackPressure (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent as EventListener + this.handleBackPressureEvent ) } } private unsetTasksStealingOnBackPressure (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].removeEventListener( + this.workerNodes[workerNodeKey].off( 'backPressure', - this.handleBackPressureEvent as EventListener + this.handleBackPressureEvent ) } } @@ -1403,15 +1403,15 @@ export abstract class AbstractPool< this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'idleWorkerNode', - this.handleIdleWorkerNodeEvent as EventListener + this.handleIdleWorkerNodeEvent ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent as EventListener + this.handleBackPressureEvent ) } } @@ -1532,10 +1532,10 @@ export abstract class AbstractPool< } private readonly handleIdleWorkerNodeEvent = ( - event: CustomEvent, + eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { - const { workerNodeKey } = event.detail + const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( 'WorkerNode event detail workerNodeKey attribute must be defined' @@ -1586,7 +1586,7 @@ export abstract class AbstractPool< } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { - this.handleIdleWorkerNodeEvent(event, stolenTask) + this.handleIdleWorkerNodeEvent(eventDetail, stolenTask) return undefined }) .catch(EMPTY_FUNCTION) @@ -1624,9 +1624,9 @@ export abstract class AbstractPool< } private readonly handleBackPressureEvent = ( - event: CustomEvent + eventDetail: WorkerNodeEventDetail ): void => { - const { workerId } = event.detail + const { workerId } = eventDetail const sizeOffset = 1 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { return @@ -1728,11 +1728,10 @@ export abstract class AbstractPool< this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - this.workerNodes[workerNodeKey].dispatchEvent( - new CustomEvent('idleWorkerNode', { - detail: { workerId: workerId as number, workerNodeKey } - }) - ) + this.workerNodes[workerNodeKey].emit('idleWorkerNode', { + workerId: workerId as number, + workerNodeKey + }) } } } diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 3de3cb80..59c4de7a 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -1,4 +1,5 @@ import { MessageChannel } from 'node:worker_threads' +import { EventEmitter } from 'node:events' import { CircularArray } from '../circular-array' import type { Task } from '../utility-types' import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils' @@ -8,7 +9,6 @@ import { type IWorkerNode, type StrategyData, type WorkerInfo, - type WorkerNodeEventDetail, type WorkerType, WorkerTypes, type WorkerUsage @@ -22,7 +22,7 @@ import { checkWorkerNodeArguments } from './utils' * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. */ export class WorkerNode - extends EventTarget + extends EventEmitter implements IWorkerNode { /** @inheritdoc */ public readonly worker: Worker @@ -71,11 +71,7 @@ export class WorkerNode const tasksQueueSize = this.tasksQueue.push(task) if (this.hasBackPressure() && !this.onBackPressureStarted) { this.onBackPressureStarted = true - this.dispatchEvent( - new CustomEvent('backPressure', { - detail: { workerId: this.info.id as number } - }) - ) + this.emit('backPressure', { workerId: this.info.id as number }) this.onBackPressureStarted = false } return tasksQueueSize @@ -86,11 +82,7 @@ export class WorkerNode const tasksQueueSize = this.tasksQueue.unshift(task) if (this.hasBackPressure() && !this.onBackPressureStarted) { this.onBackPressureStarted = true - this.dispatchEvent( - new CustomEvent('backPressure', { - detail: { workerId: this.info.id as number } - }) - ) + this.emit('backPressure', { workerId: this.info.id as number }) this.onBackPressureStarted = false } return tasksQueueSize diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 776986d5..5439606d 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -1,4 +1,5 @@ import type { MessageChannel } from 'node:worker_threads' +import type { EventEmitter } from 'node:events' import type { CircularArray } from '../circular-array' import type { Task } from '../utility-types' @@ -238,7 +239,7 @@ export interface WorkerNodeEventDetail { * @internal */ export interface IWorkerNode - extends EventTarget { + extends EventEmitter { /** * Worker. */