1 import { MessageChannel
} from
'node:worker_threads'
2 import { EventEmitter
} from
'node:events'
3 import { CircularArray
} from
'../circular-array.js'
4 import type { Task
} from
'../utility-types.js'
5 import { DEFAULT_TASK_NAME
, getWorkerId
, getWorkerType
} from
'../utils.js'
6 import { Deque
} from
'../deque.js'
16 type WorkerNodeOptions
,
21 import { checkWorkerNodeArguments
, createWorker
} from
'./utils.js'
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
29 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
31 implements IWorkerNode
<Worker
, Data
> {
33 public readonly worker
: Worker
35 public readonly info
: WorkerInfo
37 public usage
: WorkerUsage
39 public strategyData
?: StrategyData
41 public messageChannel
?: MessageChannel
43 public tasksQueueBackPressureSize
: number
44 private readonly tasksQueue
: Deque
<Task
<Data
>>
45 private onBackPressureStarted
: boolean
46 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
49 * Constructs a new worker node.
51 * @param type - The worker type.
52 * @param filePath - Path to the worker file.
53 * @param opts - The worker node options.
55 constructor (type: WorkerType
, filePath
: string, opts
: WorkerNodeOptions
) {
57 checkWorkerNodeArguments(type, filePath
, opts
)
58 this.worker
= createWorker
<Worker
>(type, filePath
, {
60 workerOptions
: opts
.workerOptions
62 this.info
= this.initWorkerInfo(this.worker
)
63 this.usage
= this.initWorkerUsage()
64 if (this.info
.type === WorkerTypes
.thread
) {
65 this.messageChannel
= new MessageChannel()
67 this.tasksQueueBackPressureSize
= opts
.tasksQueueBackPressureSize
68 this.tasksQueue
= new Deque
<Task
<Data
>>()
69 this.onBackPressureStarted
= false
70 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
74 public tasksQueueSize (): number {
75 return this.tasksQueue
.size
79 public enqueueTask (task
: Task
<Data
>): number {
80 const tasksQueueSize
= this.tasksQueue
.push(task
)
81 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
82 this.onBackPressureStarted
= true
83 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
84 this.emit('backPressure', { workerId
: this.info
.id
! })
85 this.onBackPressureStarted
= false
91 public unshiftTask (task
: Task
<Data
>): number {
92 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
93 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
94 this.onBackPressureStarted
= true
95 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
96 this.emit('backPressure', { workerId
: this.info
.id
! })
97 this.onBackPressureStarted
= false
103 public dequeueTask (): Task
<Data
> | undefined {
104 return this.tasksQueue
.shift()
108 public popTask (): Task
<Data
> | undefined {
109 return this.tasksQueue
.pop()
113 public clearTasksQueue (): void {
114 this.tasksQueue
.clear()
118 public hasBackPressure (): boolean {
119 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
123 public resetUsage (): void {
124 this.usage
= this.initWorkerUsage()
125 this.taskFunctionsUsage
.clear()
129 public async terminate (): Promise
<void> {
130 const waitWorkerExit
= new Promise
<void>(resolve
=> {
131 this.registerOnceWorkerEventHandler('exit', () => {
135 this.closeMessageChannel()
136 this.removeAllListeners()
137 switch (this.info
.type) {
138 case WorkerTypes
.thread
:
139 await this.worker
.terminate
?.()
141 case WorkerTypes
.cluster
:
142 this.registerOnceWorkerEventHandler('disconnect', () => {
145 this.worker
.disconnect
?.()
152 public registerWorkerEventHandler (
155 | OnlineHandler
<Worker
>
156 | MessageHandler
<Worker
>
157 | ErrorHandler
<Worker
>
158 | ExitHandler
<Worker
>
160 this.worker
.on(event
, handler
)
164 public registerOnceWorkerEventHandler (
167 | OnlineHandler
<Worker
>
168 | MessageHandler
<Worker
>
169 | ErrorHandler
<Worker
>
170 | ExitHandler
<Worker
>
172 this.worker
.once(event
, handler
)
176 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
177 if (!Array.isArray(this.info
.taskFunctionNames
)) {
179 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
183 Array.isArray(this.info
.taskFunctionNames
) &&
184 this.info
.taskFunctionNames
.length
< 3
187 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
190 if (name
=== DEFAULT_TASK_NAME
) {
191 name
= this.info
.taskFunctionNames
[1]
193 if (!this.taskFunctionsUsage
.has(name
)) {
194 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
196 return this.taskFunctionsUsage
.get(name
)
200 public deleteTaskFunctionWorkerUsage (name
: string): boolean {
201 return this.taskFunctionsUsage
.delete(name
)
204 private closeMessageChannel (): void {
205 if (this.messageChannel
!= null) {
206 this.messageChannel
.port1
.unref()
207 this.messageChannel
.port2
.unref()
208 this.messageChannel
.port1
.close()
209 this.messageChannel
.port2
.close()
210 delete this.messageChannel
214 private initWorkerInfo (worker
: Worker
): WorkerInfo
{
216 id
: getWorkerId(worker
),
217 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
218 type: getWorkerType(worker
)!,
225 private initWorkerUsage (): WorkerUsage
{
226 const getTasksQueueSize
= (): number => {
227 return this.tasksQueue
.size
229 const getTasksQueueMaxSize
= (): number => {
230 return this.tasksQueue
.maxSize
236 get
queued (): number {
237 return getTasksQueueSize()
239 get
maxQueued (): number {
240 return getTasksQueueMaxSize()
242 sequentiallyStolen
: 0,
247 history
: new CircularArray
<number>()
250 history
: new CircularArray
<number>()
254 history
: new CircularArray
<number>()
257 history
: new CircularArray
<number>()
263 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
264 const getTaskFunctionQueueSize
= (): number => {
265 let taskFunctionQueueSize
= 0
266 for (const task
of this.tasksQueue
) {
268 (task
.name
=== DEFAULT_TASK_NAME
&&
269 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
270 name
=== this.info
.taskFunctionNames
![1]) ||
271 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
273 ++taskFunctionQueueSize
276 return taskFunctionQueueSize
282 get
queued (): number {
283 return getTaskFunctionQueueSize()
285 sequentiallyStolen
: 0,
290 history
: new CircularArray
<number>()
293 history
: new CircularArray
<number>()
297 history
: new CircularArray
<number>()
300 history
: new CircularArray
<number>()