1 import { MessageChannel
} from
'node:worker_threads'
2 import { EventEmitter
} from
'node:events'
3 import { CircularArray
} from
'../circular-array.js'
4 import type { Task
} from
'../utility-types.js'
5 import { DEFAULT_TASK_NAME
, getWorkerId
, getWorkerType
} from
'../utils.js'
6 import { Deque
} from
'../deque.js'
13 type WorkerNodeOptions
,
18 import { checkWorkerNodeArguments
, createWorker
} from
'./utils.js'
23 * @typeParam Worker - Type of worker.
24 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
26 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
28 implements IWorkerNode
<Worker
, Data
> {
30 public readonly worker
: Worker
32 public readonly info
: WorkerInfo
34 public usage
: WorkerUsage
36 public strategyData
?: StrategyData
38 public messageChannel
?: MessageChannel
40 public tasksQueueBackPressureSize
: number
41 private readonly tasksQueue
: Deque
<Task
<Data
>>
42 private onBackPressureStarted
: boolean
43 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
46 * Constructs a new worker node.
48 * @param type - The worker type.
49 * @param filePath - Path to the worker file.
50 * @param opts - The worker node options.
52 constructor (type: WorkerType
, filePath
: string, opts
: WorkerNodeOptions
) {
54 checkWorkerNodeArguments(type, filePath
, opts
)
55 this.worker
= createWorker
<Worker
>(type, filePath
, {
57 workerOptions
: opts
.workerOptions
59 this.info
= this.initWorkerInfo(this.worker
)
60 this.usage
= this.initWorkerUsage()
61 if (this.info
.type === WorkerTypes
.thread
) {
62 this.messageChannel
= new MessageChannel()
64 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
65 this.tasksQueueBackPressureSize
= opts
.tasksQueueBackPressureSize
!
66 this.tasksQueue
= new Deque
<Task
<Data
>>()
67 this.onBackPressureStarted
= false
68 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
72 public tasksQueueSize (): number {
73 return this.tasksQueue
.size
77 public enqueueTask (task
: Task
<Data
>): number {
78 const tasksQueueSize
= this.tasksQueue
.push(task
)
79 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
80 this.onBackPressureStarted
= true
81 this.emit('backPressure', { workerId
: this.info
.id
})
82 this.onBackPressureStarted
= false
88 public unshiftTask (task
: Task
<Data
>): number {
89 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
90 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
91 this.onBackPressureStarted
= true
92 this.emit('backPressure', { workerId
: this.info
.id
})
93 this.onBackPressureStarted
= false
99 public dequeueTask (): Task
<Data
> | undefined {
100 return this.tasksQueue
.shift()
104 public popTask (): Task
<Data
> | undefined {
105 return this.tasksQueue
.pop()
109 public clearTasksQueue (): void {
110 this.tasksQueue
.clear()
114 public hasBackPressure (): boolean {
115 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
119 public resetUsage (): void {
120 this.usage
= this.initWorkerUsage()
121 this.taskFunctionsUsage
.clear()
125 public async terminate (): Promise
<void> {
126 const waitWorkerExit
= new Promise
<void>(resolve
=> {
127 this.registerOnceWorkerEventHandler('exit', () => {
131 this.closeMessageChannel()
132 this.removeAllListeners()
133 switch (this.info
.type) {
134 case WorkerTypes
.thread
:
135 await this.worker
.terminate
?.()
137 case WorkerTypes
.cluster
:
138 this.registerOnceWorkerEventHandler('disconnect', () => {
141 this.worker
.disconnect
?.()
148 public registerWorkerEventHandler (
150 handler
: EventHandler
<Worker
>
152 this.worker
.on(event
, handler
)
156 public registerOnceWorkerEventHandler (
158 handler
: EventHandler
<Worker
>
160 this.worker
.once(event
, handler
)
164 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
165 if (!Array.isArray(this.info
.taskFunctionNames
)) {
167 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
171 Array.isArray(this.info
.taskFunctionNames
) &&
172 this.info
.taskFunctionNames
.length
< 3
175 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
178 if (name
=== DEFAULT_TASK_NAME
) {
179 name
= this.info
.taskFunctionNames
[1]
181 if (!this.taskFunctionsUsage
.has(name
)) {
182 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
184 return this.taskFunctionsUsage
.get(name
)
188 public deleteTaskFunctionWorkerUsage (name
: string): boolean {
189 return this.taskFunctionsUsage
.delete(name
)
192 private closeMessageChannel (): void {
193 if (this.messageChannel
!= null) {
194 this.messageChannel
.port1
.unref()
195 this.messageChannel
.port2
.unref()
196 this.messageChannel
.port1
.close()
197 this.messageChannel
.port2
.close()
198 delete this.messageChannel
202 private initWorkerInfo (worker
: Worker
): WorkerInfo
{
204 id
: getWorkerId(worker
),
205 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
206 type: getWorkerType(worker
)!,
213 private initWorkerUsage (): WorkerUsage
{
214 const getTasksQueueSize
= (): number => {
215 return this.tasksQueue
.size
217 const getTasksQueueMaxSize
= (): number => {
218 return this.tasksQueue
.maxSize
224 get
queued (): number {
225 return getTasksQueueSize()
227 get
maxQueued (): number {
228 return getTasksQueueMaxSize()
230 sequentiallyStolen
: 0,
235 history
: new CircularArray
<number>()
238 history
: new CircularArray
<number>()
242 history
: new CircularArray
<number>()
245 history
: new CircularArray
<number>()
251 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
252 const getTaskFunctionQueueSize
= (): number => {
253 let taskFunctionQueueSize
= 0
254 for (const task
of this.tasksQueue
) {
256 (task
.name
=== DEFAULT_TASK_NAME
&&
257 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
258 name
=== this.info
.taskFunctionNames
![1]) ||
259 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
261 ++taskFunctionQueueSize
264 return taskFunctionQueueSize
270 get
queued (): number {
271 return getTaskFunctionQueueSize()
273 sequentiallyStolen
: 0,
278 history
: new CircularArray
<number>()
281 history
: new CircularArray
<number>()
285 history
: new CircularArray
<number>()
288 history
: new CircularArray
<number>()