1 import { EventEmitter
} from
'node:events'
2 import { MessageChannel
} from
'node:worker_threads'
4 import type { Task
} from
'../utility-types.js'
6 import { CircularBuffer
} from
'../circular-buffer.js'
7 import { PriorityQueue
} from
'../queues/priority-queue.js'
8 import { DEFAULT_TASK_NAME
} from
'../utils.js'
10 checkWorkerNodeArguments
,
18 MeasurementHistorySize
,
21 type WorkerNodeOptions
,
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
32 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
34 implements IWorkerNode
<Worker
, Data
> {
36 public readonly info
: WorkerInfo
38 public messageChannel
?: MessageChannel
40 public strategyData
?: StrategyData
42 public readonly tasksQueue
: PriorityQueue
<Task
<Data
>>
44 public tasksQueueBackPressureSize
: number
46 public usage
: WorkerUsage
48 public readonly worker
: Worker
49 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
52 * Constructs a new worker node.
53 * @param type - The worker type.
54 * @param filePath - Path to the worker file.
55 * @param opts - The worker node options.
57 constructor (type: WorkerType
, filePath
: string, opts
: WorkerNodeOptions
) {
59 checkWorkerNodeArguments(type, filePath
, opts
)
60 this.worker
= createWorker
<Worker
>(type, filePath
, {
62 workerOptions
: opts
.workerOptions
,
64 this.info
= initWorkerInfo(this.worker
)
65 this.usage
= this.initWorkerUsage()
66 if (this.info
.type === WorkerTypes
.thread
) {
67 this.messageChannel
= new MessageChannel()
69 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
70 this.tasksQueueBackPressureSize
= opts
.tasksQueueBackPressureSize
!
71 this.tasksQueue
= new PriorityQueue
<Task
<Data
>>(
72 opts
.tasksQueueBucketSize
,
73 opts
.tasksQueuePriority
75 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
79 public clearTasksQueue (): void {
80 this.tasksQueue
.clear()
84 public deleteTask (task
: Task
<Data
>): boolean {
85 return this.tasksQueue
.delete(task
)
89 public deleteTaskFunctionWorkerUsage (name
: string): boolean {
90 return this.taskFunctionsUsage
.delete(name
)
94 public dequeueLastPrioritizedTask (): Task
<Data
> | undefined {
95 // Start from the last empty or partially filled bucket
96 return this.dequeueTask(this.tasksQueue
.buckets
+ 1)
100 public dequeueTask (bucket
?: number): Task
<Data
> | undefined {
101 const task
= this.tasksQueue
.dequeue(bucket
)
102 if (!this.hasBackPressure() && this.info
.backPressure
) {
103 this.info
.backPressure
= false
109 public enqueueTask (task
: Task
<Data
>): number {
110 const tasksQueueSize
= this.tasksQueue
.enqueue(task
, task
.priority
)
111 if (this.hasBackPressure() && !this.info
.backPressure
) {
112 this.info
.backPressure
= true
113 this.emit('backPressure', { workerId
: this.info
.id
})
115 return tasksQueueSize
119 public getTaskFunctionWorkerUsage (name
: string): undefined | WorkerUsage
{
120 if (!Array.isArray(this.info
.taskFunctionsProperties
)) {
122 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
126 Array.isArray(this.info
.taskFunctionsProperties
) &&
127 this.info
.taskFunctionsProperties
.length
< 3
130 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
133 if (name
=== DEFAULT_TASK_NAME
) {
134 name
= this.info
.taskFunctionsProperties
[1].name
136 if (!this.taskFunctionsUsage
.has(name
)) {
137 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
139 return this.taskFunctionsUsage
.get(name
)
143 public registerOnceWorkerEventHandler (
145 handler
: EventHandler
<Worker
>
147 this.worker
.once(event
, handler
)
151 public registerWorkerEventHandler (
153 handler
: EventHandler
<Worker
>
155 this.worker
.on(event
, handler
)
159 public setTasksQueuePriority (enablePriority
: boolean): void {
160 this.tasksQueue
.enablePriority
= enablePriority
164 public tasksQueueSize (): number {
165 return this.tasksQueue
.size
169 public async terminate (): Promise
<void> {
170 const waitWorkerExit
= new Promise
<void>(resolve
=> {
171 this.registerOnceWorkerEventHandler('exit', () => {
175 this.closeMessageChannel()
176 this.removeAllListeners()
177 switch (this.info
.type) {
178 case WorkerTypes
.cluster
:
179 this.registerOnceWorkerEventHandler('disconnect', () => {
182 this.worker
.disconnect
?.()
184 case WorkerTypes
.thread
:
185 this.worker
.unref
?.()
186 await this.worker
.terminate
?.()
192 private closeMessageChannel (): void {
193 if (this.messageChannel
!= null) {
194 this.messageChannel
.port1
.unref()
195 this.messageChannel
.port2
.unref()
196 this.messageChannel
.port1
.close()
197 this.messageChannel
.port2
.close()
198 delete this.messageChannel
203 * Whether the worker node is back pressured or not.
204 * @returns `true` if the worker node is back pressured, `false` otherwise.
206 private hasBackPressure (): boolean {
207 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
210 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
211 const getTaskFunctionQueueSize
= (): number => {
212 let taskFunctionQueueSize
= 0
213 for (const task
of this.tasksQueue
) {
215 (task
.name
=== DEFAULT_TASK_NAME
&&
216 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
217 name
=== this.info
.taskFunctionsProperties
![1].name
) ||
218 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
220 ++taskFunctionQueueSize
223 return taskFunctionQueueSize
228 history
: new CircularBuffer(MeasurementHistorySize
),
231 history
: new CircularBuffer(MeasurementHistorySize
),
235 history
: new CircularBuffer(MeasurementHistorySize
),
241 get
queued (): number {
242 return getTaskFunctionQueueSize()
244 sequentiallyStolen
: 0,
248 history
: new CircularBuffer(MeasurementHistorySize
),
253 private initWorkerUsage (): WorkerUsage
{
254 const getTasksQueueSize
= (): number => {
255 return this.tasksQueue
.size
257 const getTasksQueueMaxSize
= (): number => {
258 return this.tasksQueue
.maxSize
263 history
: new CircularBuffer(MeasurementHistorySize
),
266 history
: new CircularBuffer(MeasurementHistorySize
),
270 history
: new CircularBuffer(MeasurementHistorySize
),
276 get
maxQueued (): number {
277 return getTasksQueueMaxSize()
279 get
queued (): number {
280 return getTasksQueueSize()
282 sequentiallyStolen
: 0,
286 history
: new CircularBuffer(MeasurementHistorySize
),