1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import { Queue
} from
'../queue'
4 import type { Task
} from
'../utility-types'
5 import { DEFAULT_TASK_NAME
} from
'../utils'
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
22 implements IWorkerNode
<Worker
, Data
> {
24 public readonly worker
: Worker
26 public readonly info
: WorkerInfo
28 public messageChannel
?: MessageChannel
30 public usage
: WorkerUsage
31 private readonly tasksUsage
: Map
<string, WorkerUsage
>
32 private readonly tasksQueue
: Queue
<Task
<Data
>>
33 private readonly tasksQueueBackPressureSize
: number
36 * Constructs a new worker node.
38 * @param worker - The worker.
39 * @param workerType - The worker type.
40 * @param poolMaxSize - The pool maximum size.
42 constructor (worker
: Worker
, workerType
: WorkerType
, poolMaxSize
: number) {
44 this.info
= this.initWorkerInfo(worker
, workerType
)
45 if (workerType
=== WorkerTypes
.thread
) {
46 this.messageChannel
= new MessageChannel()
48 this.usage
= this.initWorkerUsage()
49 this.tasksUsage
= new Map
<string, WorkerUsage
>()
50 this.tasksQueue
= new Queue
<Task
<Data
>>()
51 this.tasksQueueBackPressureSize
= Math.pow(poolMaxSize
, 2)
55 public tasksQueueSize (): number {
56 return this.tasksQueue
.size
60 * Tasks queue maximum size.
62 * @returns The tasks queue maximum size.
64 private tasksQueueMaxSize (): number {
65 return this.tasksQueue
.maxSize
69 public enqueueTask (task
: Task
<Data
>): number {
70 return this.tasksQueue
.enqueue(task
)
74 public dequeueTask (): Task
<Data
> | undefined {
75 return this.tasksQueue
.dequeue()
79 public clearTasksQueue (): void {
80 this.tasksQueue
.clear()
84 public hasBackPressure (): boolean {
85 return this.tasksQueueSize() >= this.tasksQueueBackPressureSize
89 public resetUsage (): void {
90 this.usage
= this.initWorkerUsage()
91 this.tasksUsage
.clear()
95 public closeChannel (): void {
96 if (this.messageChannel
!= null) {
97 this.messageChannel
?.port1
.unref()
98 this.messageChannel
?.port2
.unref()
99 this.messageChannel
?.port1
.close()
100 this.messageChannel
?.port2
.close()
101 delete this.messageChannel
106 public getTaskWorkerUsage (name
: string): WorkerUsage
| undefined {
107 if (!Array.isArray(this.info
.taskFunctions
)) {
109 `Cannot get task worker usage for task function name '${name}' when task function names list is not yet defined`
113 name
=== DEFAULT_TASK_NAME
&&
114 Array.isArray(this.info
.taskFunctions
) &&
115 this.info
.taskFunctions
.length
> 1
117 name
= this.info
.taskFunctions
[1]
119 if (!this.tasksUsage
.has(name
)) {
120 this.tasksUsage
.set(name
, this.initTaskWorkerUsage(name
))
122 return this.tasksUsage
.get(name
)
125 private initWorkerInfo (worker
: Worker
, workerType
: WorkerType
): WorkerInfo
{
127 id
: this.getWorkerId(worker
, workerType
),
134 private initWorkerUsage (): WorkerUsage
{
135 const getTasksQueueSize
= (): number => {
136 return this.tasksQueueSize()
138 const getTasksQueueMaxSize
= (): number => {
139 return this.tasksQueueMaxSize()
145 get
queued (): number {
146 return getTasksQueueSize()
148 get
maxQueued (): number {
149 return getTasksQueueMaxSize()
154 history
: new CircularArray()
157 history
: new CircularArray()
161 history
: new CircularArray()
164 history
: new CircularArray()
170 private initTaskWorkerUsage (name
: string): WorkerUsage
{
171 const getTaskQueueSize
= (): number => {
172 let taskQueueSize
= 0
173 for (const task
of this.tasksQueue
) {
174 if (task
.name
=== name
) {
184 get
queued (): number {
185 return getTaskQueueSize()
190 history
: new CircularArray()
193 history
: new CircularArray()
197 history
: new CircularArray()
200 history
: new CircularArray()
207 * Gets the worker id.
209 * @param worker - The worker.
210 * @param workerType - The worker type.
211 * @returns The worker id.
213 private getWorkerId (
215 workerType
: WorkerType
216 ): number | undefined {
217 if (workerType
=== WorkerTypes
.thread
) {
218 return worker
.threadId
219 } else if (workerType
=== WorkerTypes
.cluster
) {