1 import { MessageChannel
} from
'node:worker_threads'
2 import { EventEmitter
} from
'node:events'
3 import { CircularArray
} from
'../circular-array.js'
4 import type { Task
} from
'../utility-types.js'
5 import { DEFAULT_TASK_NAME
} from
'../utils.js'
6 import { Deque
} from
'../deque.js'
13 type WorkerNodeOptions
,
19 checkWorkerNodeArguments
,
28 * @typeParam Worker - Type of worker.
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
33 implements IWorkerNode
<Worker
, Data
> {
35 public readonly worker
: Worker
37 public readonly info
: WorkerInfo
39 public usage
: WorkerUsage
41 public strategyData
?: StrategyData
43 public messageChannel
?: MessageChannel
45 public tasksQueueBackPressureSize
: number
46 private readonly tasksQueue
: Deque
<Task
<Data
>>
47 private onBackPressureStarted
: boolean
48 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
51 * Constructs a new worker node.
53 * @param type - The worker type.
54 * @param filePath - Path to the worker file.
55 * @param opts - The worker node options.
57 constructor (type: WorkerType
, filePath
: string, opts
: WorkerNodeOptions
) {
59 checkWorkerNodeArguments(type, filePath
, opts
)
60 this.worker
= createWorker
<Worker
>(type, filePath
, {
62 workerOptions
: opts
.workerOptions
64 this.info
= this.initWorkerInfo(this.worker
)
65 this.usage
= this.initWorkerUsage()
66 if (this.info
.type === WorkerTypes
.thread
) {
67 this.messageChannel
= new MessageChannel()
69 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
70 this.tasksQueueBackPressureSize
= opts
.tasksQueueBackPressureSize
!
71 this.tasksQueue
= new Deque
<Task
<Data
>>()
72 this.onBackPressureStarted
= false
73 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
77 public tasksQueueSize (): number {
78 return this.tasksQueue
.size
82 public enqueueTask (task
: Task
<Data
>): number {
83 const tasksQueueSize
= this.tasksQueue
.push(task
)
84 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
85 this.onBackPressureStarted
= true
86 this.emit('backPressure', { workerId
: this.info
.id
})
87 this.onBackPressureStarted
= false
93 public unshiftTask (task
: Task
<Data
>): number {
94 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
95 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
96 this.onBackPressureStarted
= true
97 this.emit('backPressure', { workerId
: this.info
.id
})
98 this.onBackPressureStarted
= false
100 return tasksQueueSize
104 public dequeueTask (): Task
<Data
> | undefined {
105 return this.tasksQueue
.shift()
109 public popTask (): Task
<Data
> | undefined {
110 return this.tasksQueue
.pop()
114 public clearTasksQueue (): void {
115 this.tasksQueue
.clear()
119 public hasBackPressure (): boolean {
120 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
124 public resetUsage (): void {
125 this.usage
= this.initWorkerUsage()
126 this.taskFunctionsUsage
.clear()
130 public async terminate (): Promise
<void> {
131 const waitWorkerExit
= new Promise
<void>(resolve
=> {
132 this.registerOnceWorkerEventHandler('exit', () => {
136 this.closeMessageChannel()
137 this.removeAllListeners()
138 switch (this.info
.type) {
139 case WorkerTypes
.thread
:
140 this.worker
.unref
?.()
141 await this.worker
.terminate
?.()
143 case WorkerTypes
.cluster
:
144 this.registerOnceWorkerEventHandler('disconnect', () => {
147 this.worker
.disconnect
?.()
154 public registerWorkerEventHandler (
156 handler
: EventHandler
<Worker
>
158 this.worker
.on(event
, handler
)
162 public registerOnceWorkerEventHandler (
164 handler
: EventHandler
<Worker
>
166 this.worker
.once(event
, handler
)
170 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
171 if (!Array.isArray(this.info
.taskFunctionNames
)) {
173 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
177 Array.isArray(this.info
.taskFunctionNames
) &&
178 this.info
.taskFunctionNames
.length
< 3
181 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
184 if (name
=== DEFAULT_TASK_NAME
) {
185 name
= this.info
.taskFunctionNames
[1]
187 if (!this.taskFunctionsUsage
.has(name
)) {
188 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
190 return this.taskFunctionsUsage
.get(name
)
194 public deleteTaskFunctionWorkerUsage (name
: string): boolean {
195 return this.taskFunctionsUsage
.delete(name
)
198 private closeMessageChannel (): void {
199 if (this.messageChannel
!= null) {
200 this.messageChannel
.port1
.unref()
201 this.messageChannel
.port2
.unref()
202 this.messageChannel
.port1
.close()
203 this.messageChannel
.port2
.close()
204 delete this.messageChannel
208 private initWorkerInfo (worker
: Worker
): WorkerInfo
{
210 id
: getWorkerId(worker
),
211 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
212 type: getWorkerType(worker
)!,
219 private initWorkerUsage (): WorkerUsage
{
220 const getTasksQueueSize
= (): number => {
221 return this.tasksQueue
.size
223 const getTasksQueueMaxSize
= (): number => {
224 return this.tasksQueue
.maxSize
230 get
queued (): number {
231 return getTasksQueueSize()
233 get
maxQueued (): number {
234 return getTasksQueueMaxSize()
236 sequentiallyStolen
: 0,
241 history
: new CircularArray
<number>()
244 history
: new CircularArray
<number>()
248 history
: new CircularArray
<number>()
251 history
: new CircularArray
<number>()
257 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
258 const getTaskFunctionQueueSize
= (): number => {
259 let taskFunctionQueueSize
= 0
260 for (const task
of this.tasksQueue
) {
262 (task
.name
=== DEFAULT_TASK_NAME
&&
263 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
264 name
=== this.info
.taskFunctionNames
![1]) ||
265 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
267 ++taskFunctionQueueSize
270 return taskFunctionQueueSize
276 get
queued (): number {
277 return getTaskFunctionQueueSize()
279 sequentiallyStolen
: 0,
284 history
: new CircularArray
<number>()
287 history
: new CircularArray
<number>()
291 history
: new CircularArray
<number>()
294 history
: new CircularArray
<number>()