1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import { Queue
} from
'../queue'
4 import type { Task
} from
'../utility-types'
5 import { DEFAULT_TASK_NAME
} from
'../utils'
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
22 implements IWorkerNode
<Worker
, Data
> {
24 public readonly worker
: Worker
26 public readonly info
: WorkerInfo
28 public messageChannel
?: MessageChannel
30 public usage
: WorkerUsage
32 public tasksQueueBackPressureSize
: number
33 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
34 private readonly tasksQueue
: Queue
<Task
<Data
>>
37 * Constructs a new worker node.
39 * @param worker - The worker.
40 * @param workerType - The worker type.
41 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
45 workerType
: WorkerType
,
46 tasksQueueBackPressureSize
: number
49 throw new TypeError('Cannot construct a worker node without a worker')
51 if (workerType
== null) {
53 'Cannot construct a worker node without a worker type'
56 if (tasksQueueBackPressureSize
== null) {
58 'Cannot construct a worker node without a tasks queue back pressure size'
61 if (!Number.isSafeInteger(tasksQueueBackPressureSize
)) {
63 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
67 this.info
= this.initWorkerInfo(worker
, workerType
)
68 if (workerType
=== WorkerTypes
.thread
) {
69 this.messageChannel
= new MessageChannel()
71 this.usage
= this.initWorkerUsage()
72 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
73 this.tasksQueue
= new Queue
<Task
<Data
>>()
74 this.tasksQueueBackPressureSize
= tasksQueueBackPressureSize
78 public tasksQueueSize (): number {
79 return this.tasksQueue
.size
83 * Tasks queue maximum size.
85 * @returns The tasks queue maximum size.
87 private tasksQueueMaxSize (): number {
88 return this.tasksQueue
.maxSize
92 public enqueueTask (task
: Task
<Data
>): number {
93 return this.tasksQueue
.enqueue(task
)
97 public dequeueTask (): Task
<Data
> | undefined {
98 return this.tasksQueue
.dequeue()
102 public clearTasksQueue (): void {
103 this.tasksQueue
.clear()
107 public hasBackPressure (): boolean {
108 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
112 public resetUsage (): void {
113 this.usage
= this.initWorkerUsage()
114 this.taskFunctionsUsage
.clear()
118 public closeChannel (): void {
119 if (this.messageChannel
!= null) {
120 this.messageChannel
?.port1
.unref()
121 this.messageChannel
?.port2
.unref()
122 this.messageChannel
?.port1
.close()
123 this.messageChannel
?.port2
.close()
124 delete this.messageChannel
129 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
130 if (!Array.isArray(this.info
.taskFunctions
)) {
132 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
136 Array.isArray(this.info
.taskFunctions
) &&
137 this.info
.taskFunctions
.length
< 3
140 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
143 if (name
=== DEFAULT_TASK_NAME
) {
144 name
= this.info
.taskFunctions
[1]
146 if (!this.taskFunctionsUsage
.has(name
)) {
147 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
149 return this.taskFunctionsUsage
.get(name
)
152 private initWorkerInfo (worker
: Worker
, workerType
: WorkerType
): WorkerInfo
{
154 id
: this.getWorkerId(worker
, workerType
),
161 private initWorkerUsage (): WorkerUsage
{
162 const getTasksQueueSize
= (): number => {
163 return this.tasksQueueSize()
165 const getTasksQueueMaxSize
= (): number => {
166 return this.tasksQueueMaxSize()
172 get
queued (): number {
173 return getTasksQueueSize()
175 get
maxQueued (): number {
176 return getTasksQueueMaxSize()
181 history
: new CircularArray()
184 history
: new CircularArray()
188 history
: new CircularArray()
191 history
: new CircularArray()
197 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
198 const getTaskFunctionQueueSize
= (): number => {
199 let taskFunctionQueueSize
= 0
200 for (const task
of this.tasksQueue
) {
202 (task
.name
=== DEFAULT_TASK_NAME
&&
203 name
=== (this.info
.taskFunctions
as string[])[1]) ||
204 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
206 ++taskFunctionQueueSize
209 return taskFunctionQueueSize
215 get
queued (): number {
216 return getTaskFunctionQueueSize()
221 history
: new CircularArray()
224 history
: new CircularArray()
228 history
: new CircularArray()
231 history
: new CircularArray()
238 * Gets the worker id.
240 * @param worker - The worker.
241 * @param workerType - The worker type.
242 * @returns The worker id.
244 private getWorkerId (
246 workerType
: WorkerType
247 ): number | undefined {
248 if (workerType
=== WorkerTypes
.thread
) {
249 return worker
.threadId
250 } else if (workerType
=== WorkerTypes
.cluster
) {