1 import { EventEmitter
} from
'node:events'
2 import { MessageChannel
} from
'node:worker_threads'
4 import { CircularArray
} from
'../circular-array.js'
5 import { Deque
} from
'../deque.js'
6 import { PriorityQueue
} from
'../priority-queue.js'
7 import type { Task
} from
'../utility-types.js'
8 import { DEFAULT_TASK_NAME
} from
'../utils.js'
10 checkWorkerNodeArguments
,
21 type WorkerNodeOptions
,
30 * @typeParam Worker - Type of worker.
31 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
33 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
35 implements IWorkerNode
<Worker
, Data
> {
37 public readonly worker
: Worker
39 public readonly info
: WorkerInfo
41 public usage
: WorkerUsage
43 public strategyData
?: StrategyData
45 public messageChannel
?: MessageChannel
47 public tasksQueueBackPressureSize
: number
48 private readonly tasksQueue
: Deque
<Task
<Data
>>
49 private readonly tasksQueue2
= new PriorityQueue
<Task
<Data
>>()
50 private onBackPressureStarted
: boolean
51 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
54 * Constructs a new worker node.
56 * @param type - The worker type.
57 * @param filePath - Path to the worker file.
58 * @param opts - The worker node options.
60 constructor (type: WorkerType
, filePath
: string, opts
: WorkerNodeOptions
) {
62 checkWorkerNodeArguments(type, filePath
, opts
)
63 this.worker
= createWorker
<Worker
>(type, filePath
, {
65 workerOptions
: opts
.workerOptions
67 this.info
= this.initWorkerInfo(this.worker
)
68 this.usage
= this.initWorkerUsage()
69 if (this.info
.type === WorkerTypes
.thread
) {
70 this.messageChannel
= new MessageChannel()
72 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
73 this.tasksQueueBackPressureSize
= opts
.tasksQueueBackPressureSize
!
74 this.tasksQueue
= new Deque
<Task
<Data
>>()
75 this.onBackPressureStarted
= false
76 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
80 public tasksQueueSize (): number {
81 return this.tasksQueue
.size
85 public enqueueTask (task
: Task
<Data
>): number {
86 const tasksQueueSize
= this.tasksQueue
.push(task
)
87 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
88 this.onBackPressureStarted
= true
89 this.emit('backPressure', { workerId
: this.info
.id
})
90 this.onBackPressureStarted
= false
96 public unshiftTask (task
: Task
<Data
>): number {
97 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
98 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
99 this.onBackPressureStarted
= true
100 this.emit('backPressure', { workerId
: this.info
.id
})
101 this.onBackPressureStarted
= false
103 return tasksQueueSize
107 public dequeueTask (): Task
<Data
> | undefined {
108 return this.tasksQueue
.shift()
112 public popTask (): Task
<Data
> | undefined {
113 return this.tasksQueue
.pop()
117 public clearTasksQueue (): void {
118 this.tasksQueue
.clear()
122 public hasBackPressure (): boolean {
123 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
127 public async terminate (): Promise
<void> {
128 const waitWorkerExit
= new Promise
<void>(resolve
=> {
129 this.registerOnceWorkerEventHandler('exit', () => {
133 this.closeMessageChannel()
134 this.removeAllListeners()
135 switch (this.info
.type) {
136 case WorkerTypes
.thread
:
137 this.worker
.unref
?.()
138 await this.worker
.terminate
?.()
140 case WorkerTypes
.cluster
:
141 this.registerOnceWorkerEventHandler('disconnect', () => {
144 this.worker
.disconnect
?.()
151 public registerWorkerEventHandler (
153 handler
: EventHandler
<Worker
>
155 this.worker
.on(event
, handler
)
159 public registerOnceWorkerEventHandler (
161 handler
: EventHandler
<Worker
>
163 this.worker
.once(event
, handler
)
167 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
168 if (!Array.isArray(this.info
.taskFunctionsProperties
)) {
170 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
174 Array.isArray(this.info
.taskFunctionsProperties
) &&
175 this.info
.taskFunctionsProperties
.length
< 3
178 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
181 if (name
=== DEFAULT_TASK_NAME
) {
182 name
= this.info
.taskFunctionsProperties
[1].name
184 if (!this.taskFunctionsUsage
.has(name
)) {
185 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
187 return this.taskFunctionsUsage
.get(name
)
191 public deleteTaskFunctionWorkerUsage (name
: string): boolean {
192 return this.taskFunctionsUsage
.delete(name
)
195 private closeMessageChannel (): void {
196 if (this.messageChannel
!= null) {
197 this.messageChannel
.port1
.unref()
198 this.messageChannel
.port2
.unref()
199 this.messageChannel
.port1
.close()
200 this.messageChannel
.port2
.close()
201 delete this.messageChannel
205 private initWorkerInfo (worker
: Worker
): WorkerInfo
{
207 id
: getWorkerId(worker
),
208 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
209 type: getWorkerType(worker
)!,
216 private initWorkerUsage (): WorkerUsage
{
217 const getTasksQueueSize
= (): number => {
218 return this.tasksQueue
.size
220 const getTasksQueueMaxSize
= (): number => {
221 return this.tasksQueue
.maxSize
227 get
queued (): number {
228 return getTasksQueueSize()
230 get
maxQueued (): number {
231 return getTasksQueueMaxSize()
233 sequentiallyStolen
: 0,
238 history
: new CircularArray
<number>()
241 history
: new CircularArray
<number>()
245 history
: new CircularArray
<number>()
248 history
: new CircularArray
<number>()
254 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
255 const getTaskFunctionQueueSize
= (): number => {
256 let taskFunctionQueueSize
= 0
257 for (const task
of this.tasksQueue
) {
259 (task
.name
=== DEFAULT_TASK_NAME
&&
260 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
261 name
=== this.info
.taskFunctionsProperties
![1].name
) ||
262 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
264 ++taskFunctionQueueSize
267 return taskFunctionQueueSize
273 get
queued (): number {
274 return getTaskFunctionQueueSize()
276 sequentiallyStolen
: 0,
281 history
: new CircularArray
<number>()
284 history
: new CircularArray
<number>()
288 history
: new CircularArray
<number>()
291 history
: new CircularArray
<number>()