private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].addEventListener(
+ this.workerNodes[workerNodeKey].on(
'idleWorkerNode',
- this.handleIdleWorkerNodeEvent as EventListener
+ this.handleIdleWorkerNodeEvent
)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].removeEventListener(
+ this.workerNodes[workerNodeKey].off(
'idleWorkerNode',
- this.handleIdleWorkerNodeEvent as EventListener
+ this.handleIdleWorkerNodeEvent
)
}
}
private setTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].addEventListener(
+ this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent as EventListener
+ this.handleBackPressureEvent
)
}
}
private unsetTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].removeEventListener(
+ this.workerNodes[workerNodeKey].off(
'backPressure',
- this.handleBackPressureEvent as EventListener
+ this.handleBackPressureEvent
)
}
}
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
- this.workerNodes[workerNodeKey].addEventListener(
+ this.workerNodes[workerNodeKey].on(
'idleWorkerNode',
- this.handleIdleWorkerNodeEvent as EventListener
+ this.handleIdleWorkerNodeEvent
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
- this.workerNodes[workerNodeKey].addEventListener(
+ this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent as EventListener
+ this.handleBackPressureEvent
)
}
}
}
private readonly handleIdleWorkerNodeEvent = (
- event: CustomEvent<WorkerNodeEventDetail>,
+ eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
- const { workerNodeKey } = event.detail
+ const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
'WorkerNode event detail workerNodeKey attribute must be defined'
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
- this.handleIdleWorkerNodeEvent(event, stolenTask)
+ this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
return undefined
})
.catch(EMPTY_FUNCTION)
}
private readonly handleBackPressureEvent = (
- event: CustomEvent<WorkerNodeEventDetail>
+ eventDetail: WorkerNodeEventDetail
): void => {
- const { workerId } = event.detail
+ const { workerId } = eventDetail
const sizeOffset = 1
if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
return
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- this.workerNodes[workerNodeKey].dispatchEvent(
- new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
- detail: { workerId: workerId as number, workerNodeKey }
- })
- )
+ this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
+ workerId: workerId as number,
+ workerNodeKey
+ })
}
}
}
import { MessageChannel } from 'node:worker_threads'
+import { EventEmitter } from 'node:events'
import { CircularArray } from '../circular-array'
import type { Task } from '../utility-types'
import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
type IWorkerNode,
type StrategyData,
type WorkerInfo,
- type WorkerNodeEventDetail,
type WorkerType,
WorkerTypes,
type WorkerUsage
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
*/
export class WorkerNode<Worker extends IWorker, Data = unknown>
- extends EventTarget
+ extends EventEmitter
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public readonly worker: Worker
const tasksQueueSize = this.tasksQueue.push(task)
if (this.hasBackPressure() && !this.onBackPressureStarted) {
this.onBackPressureStarted = true
- this.dispatchEvent(
- new CustomEvent<WorkerNodeEventDetail>('backPressure', {
- detail: { workerId: this.info.id as number }
- })
- )
+ this.emit('backPressure', { workerId: this.info.id as number })
this.onBackPressureStarted = false
}
return tasksQueueSize
const tasksQueueSize = this.tasksQueue.unshift(task)
if (this.hasBackPressure() && !this.onBackPressureStarted) {
this.onBackPressureStarted = true
- this.dispatchEvent(
- new CustomEvent<WorkerNodeEventDetail>('backPressure', {
- detail: { workerId: this.info.id as number }
- })
- )
+ this.emit('backPressure', { workerId: this.info.id as number })
this.onBackPressureStarted = false
}
return tasksQueueSize