1 import { EventEmitter
} from
'node:events'
2 import { MessageChannel
} from
'node:worker_threads'
4 import { CircularArray
} from
'../circular-array.js'
5 import { Deque
} from
'../deque.js'
6 import type { Task
} from
'../utility-types.js'
7 import { DEFAULT_TASK_NAME
} from
'../utils.js'
9 checkWorkerNodeArguments
,
20 type WorkerNodeOptions
,
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
32 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
34 implements IWorkerNode
<Worker
, Data
> {
36 public readonly worker
: Worker
38 public readonly info
: WorkerInfo
40 public usage
: WorkerUsage
42 public strategyData
?: StrategyData
44 public messageChannel
?: MessageChannel
46 public tasksQueueBackPressureSize
: number
47 private readonly tasksQueue
: Deque
<Task
<Data
>>
48 private onBackPressureStarted
: boolean
49 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
52 * Constructs a new worker node.
54 * @param type - The worker type.
55 * @param filePath - Path to the worker file.
56 * @param opts - The worker node options.
58 constructor (type: WorkerType
, filePath
: string, opts
: WorkerNodeOptions
) {
60 checkWorkerNodeArguments(type, filePath
, opts
)
61 this.worker
= createWorker
<Worker
>(type, filePath
, {
63 workerOptions
: opts
.workerOptions
65 this.info
= this.initWorkerInfo(this.worker
)
66 this.usage
= this.initWorkerUsage()
67 if (this.info
.type === WorkerTypes
.thread
) {
68 this.messageChannel
= new MessageChannel()
70 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
71 this.tasksQueueBackPressureSize
= opts
.tasksQueueBackPressureSize
!
72 this.tasksQueue
= new Deque
<Task
<Data
>>()
73 this.onBackPressureStarted
= false
74 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
78 public tasksQueueSize (): number {
79 return this.tasksQueue
.size
83 public enqueueTask (task
: Task
<Data
>): number {
84 const tasksQueueSize
= this.tasksQueue
.push(task
)
85 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
86 this.onBackPressureStarted
= true
87 this.emit('backPressure', { workerId
: this.info
.id
})
88 this.onBackPressureStarted
= false
94 public unshiftTask (task
: Task
<Data
>): number {
95 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
96 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
97 this.onBackPressureStarted
= true
98 this.emit('backPressure', { workerId
: this.info
.id
})
99 this.onBackPressureStarted
= false
101 return tasksQueueSize
105 public dequeueTask (): Task
<Data
> | undefined {
106 return this.tasksQueue
.shift()
110 public popTask (): Task
<Data
> | undefined {
111 return this.tasksQueue
.pop()
115 public clearTasksQueue (): void {
116 this.tasksQueue
.clear()
120 public hasBackPressure (): boolean {
121 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
125 public resetUsage (): void {
126 this.usage
= this.initWorkerUsage()
127 this.taskFunctionsUsage
.clear()
131 public async terminate (): Promise
<void> {
132 const waitWorkerExit
= new Promise
<void>(resolve
=> {
133 this.registerOnceWorkerEventHandler('exit', () => {
137 this.closeMessageChannel()
138 this.removeAllListeners()
139 switch (this.info
.type) {
140 case WorkerTypes
.thread
:
141 this.worker
.unref
?.()
142 await this.worker
.terminate
?.()
144 case WorkerTypes
.cluster
:
145 this.registerOnceWorkerEventHandler('disconnect', () => {
148 this.worker
.disconnect
?.()
155 public registerWorkerEventHandler (
157 handler
: EventHandler
<Worker
>
159 this.worker
.on(event
, handler
)
163 public registerOnceWorkerEventHandler (
165 handler
: EventHandler
<Worker
>
167 this.worker
.once(event
, handler
)
171 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
172 if (!Array.isArray(this.info
.taskFunctionsProperties
)) {
174 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
178 Array.isArray(this.info
.taskFunctionsProperties
) &&
179 this.info
.taskFunctionsProperties
.length
< 3
182 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
185 if (name
=== DEFAULT_TASK_NAME
) {
186 name
= this.info
.taskFunctionsProperties
[1].name
188 if (!this.taskFunctionsUsage
.has(name
)) {
189 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
191 return this.taskFunctionsUsage
.get(name
)
195 public deleteTaskFunctionWorkerUsage (name
: string): boolean {
196 return this.taskFunctionsUsage
.delete(name
)
199 private closeMessageChannel (): void {
200 if (this.messageChannel
!= null) {
201 this.messageChannel
.port1
.unref()
202 this.messageChannel
.port2
.unref()
203 this.messageChannel
.port1
.close()
204 this.messageChannel
.port2
.close()
205 delete this.messageChannel
209 private initWorkerInfo (worker
: Worker
): WorkerInfo
{
211 id
: getWorkerId(worker
),
212 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
213 type: getWorkerType(worker
)!,
220 private initWorkerUsage (): WorkerUsage
{
221 const getTasksQueueSize
= (): number => {
222 return this.tasksQueue
.size
224 const getTasksQueueMaxSize
= (): number => {
225 return this.tasksQueue
.maxSize
231 get
queued (): number {
232 return getTasksQueueSize()
234 get
maxQueued (): number {
235 return getTasksQueueMaxSize()
237 sequentiallyStolen
: 0,
242 history
: new CircularArray
<number>()
245 history
: new CircularArray
<number>()
249 history
: new CircularArray
<number>()
252 history
: new CircularArray
<number>()
258 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
259 const getTaskFunctionQueueSize
= (): number => {
260 let taskFunctionQueueSize
= 0
261 for (const task
of this.tasksQueue
) {
263 (task
.name
=== DEFAULT_TASK_NAME
&&
264 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
265 name
=== this.info
.taskFunctionsProperties
![1].name
) ||
266 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
268 ++taskFunctionQueueSize
271 return taskFunctionQueueSize
277 get
queued (): number {
278 return getTaskFunctionQueueSize()
280 sequentiallyStolen
: 0,
285 history
: new CircularArray
<number>()
288 history
: new CircularArray
<number>()
292 history
: new CircularArray
<number>()
295 history
: new CircularArray
<number>()