1 import { EventEmitter
} from
'node:events'
2 import { MessageChannel
} from
'node:worker_threads'
4 import { CircularBuffer
} from
'../circular-buffer.js'
5 import { PriorityQueue
} from
'../priority-queue.js'
6 import type { Task
} from
'../utility-types.js'
7 import { DEFAULT_TASK_NAME
} from
'../utils.js'
9 checkWorkerNodeArguments
,
18 MeasurementHistorySize
,
21 type WorkerNodeOptions
,
30 * @typeParam Worker - Type of worker.
31 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
33 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
35 implements IWorkerNode
<Worker
, Data
> {
37 public readonly worker
: Worker
39 public readonly info
: WorkerInfo
41 public usage
: WorkerUsage
43 public strategyData
?: StrategyData
45 public messageChannel
?: MessageChannel
47 public tasksQueueBackPressureSize
: number
48 private readonly tasksQueue
: PriorityQueue
<Task
<Data
>>
49 private setBackPressureFlag
: boolean
50 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
53 * Constructs a new worker node.
55 * @param type - The worker type.
56 * @param filePath - Path to the worker file.
57 * @param opts - The worker node options.
59 constructor (type: WorkerType
, filePath
: string, opts
: WorkerNodeOptions
) {
61 checkWorkerNodeArguments(type, filePath
, opts
)
62 this.worker
= createWorker
<Worker
>(type, filePath
, {
64 workerOptions
: opts
.workerOptions
66 this.info
= this.initWorkerInfo(this.worker
)
67 this.usage
= this.initWorkerUsage()
68 if (this.info
.type === WorkerTypes
.thread
) {
69 this.messageChannel
= new MessageChannel()
71 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
72 this.tasksQueueBackPressureSize
= opts
.tasksQueueBackPressureSize
!
73 this.tasksQueue
= new PriorityQueue
<Task
<Data
>>(
74 opts
.tasksQueueBucketSize
,
75 opts
.tasksQueuePriority
77 this.setBackPressureFlag
= false
78 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
82 public setTasksQueuePriority (enablePriority
: boolean): void {
83 this.tasksQueue
.enablePriority
= enablePriority
87 public tasksQueueSize (): number {
88 return this.tasksQueue
.size
92 public enqueueTask (task
: Task
<Data
>): number {
93 const tasksQueueSize
= this.tasksQueue
.enqueue(task
, task
.priority
)
95 !this.setBackPressureFlag
&&
96 this.hasBackPressure() &&
97 !this.info
.backPressure
99 this.setBackPressureFlag
= true
100 this.info
.backPressure
= true
101 this.emit('backPressure', { workerId
: this.info
.id
})
102 this.setBackPressureFlag
= false
104 return tasksQueueSize
108 public dequeueTask (bucket
?: number): Task
<Data
> | undefined {
109 const task
= this.tasksQueue
.dequeue(bucket
)
111 !this.setBackPressureFlag
&&
112 !this.hasBackPressure() &&
113 this.info
.backPressure
115 this.setBackPressureFlag
= true
116 this.info
.backPressure
= false
117 this.setBackPressureFlag
= false
123 public dequeueLastPrioritizedTask (): Task
<Data
> | undefined {
124 // Start from the last empty or partially filled bucket
125 return this.dequeueTask(this.tasksQueue
.buckets
+ 1)
129 public clearTasksQueue (): void {
130 this.tasksQueue
.clear()
134 public hasBackPressure (): boolean {
135 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
139 public async terminate (): Promise
<void> {
140 const waitWorkerExit
= new Promise
<void>(resolve
=> {
141 this.registerOnceWorkerEventHandler('exit', () => {
145 this.closeMessageChannel()
146 this.removeAllListeners()
147 switch (this.info
.type) {
148 case WorkerTypes
.thread
:
149 this.worker
.unref
?.()
150 await this.worker
.terminate
?.()
152 case WorkerTypes
.cluster
:
153 this.registerOnceWorkerEventHandler('disconnect', () => {
156 this.worker
.disconnect
?.()
163 public registerWorkerEventHandler (
165 handler
: EventHandler
<Worker
>
167 this.worker
.on(event
, handler
)
171 public registerOnceWorkerEventHandler (
173 handler
: EventHandler
<Worker
>
175 this.worker
.once(event
, handler
)
179 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
180 if (!Array.isArray(this.info
.taskFunctionsProperties
)) {
182 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
186 Array.isArray(this.info
.taskFunctionsProperties
) &&
187 this.info
.taskFunctionsProperties
.length
< 3
190 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
193 if (name
=== DEFAULT_TASK_NAME
) {
194 name
= this.info
.taskFunctionsProperties
[1].name
196 if (!this.taskFunctionsUsage
.has(name
)) {
197 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
199 return this.taskFunctionsUsage
.get(name
)
203 public deleteTaskFunctionWorkerUsage (name
: string): boolean {
204 return this.taskFunctionsUsage
.delete(name
)
207 private closeMessageChannel (): void {
208 if (this.messageChannel
!= null) {
209 this.messageChannel
.port1
.unref()
210 this.messageChannel
.port2
.unref()
211 this.messageChannel
.port1
.close()
212 this.messageChannel
.port2
.close()
213 delete this.messageChannel
217 private initWorkerInfo (worker
: Worker
): WorkerInfo
{
219 id
: getWorkerId(worker
),
220 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
221 type: getWorkerType(worker
)!,
229 private initWorkerUsage (): WorkerUsage
{
230 const getTasksQueueSize
= (): number => {
231 return this.tasksQueue
.size
233 const getTasksQueueMaxSize
= (): number => {
234 return this.tasksQueue
.maxSize
240 get
queued (): number {
241 return getTasksQueueSize()
243 get
maxQueued (): number {
244 return getTasksQueueMaxSize()
246 sequentiallyStolen
: 0,
251 history
: new CircularBuffer(MeasurementHistorySize
)
254 history
: new CircularBuffer(MeasurementHistorySize
)
258 history
: new CircularBuffer(MeasurementHistorySize
)
261 history
: new CircularBuffer(MeasurementHistorySize
)
267 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
268 const getTaskFunctionQueueSize
= (): number => {
269 let taskFunctionQueueSize
= 0
270 for (const task
of this.tasksQueue
) {
272 (task
.name
=== DEFAULT_TASK_NAME
&&
273 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
274 name
=== this.info
.taskFunctionsProperties
![1].name
) ||
275 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
277 ++taskFunctionQueueSize
280 return taskFunctionQueueSize
286 get
queued (): number {
287 return getTaskFunctionQueueSize()
289 sequentiallyStolen
: 0,
294 history
: new CircularBuffer(MeasurementHistorySize
)
297 history
: new CircularBuffer(MeasurementHistorySize
)
301 history
: new CircularBuffer(MeasurementHistorySize
)
304 history
: new CircularBuffer(MeasurementHistorySize
)