1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import { Queue
} from
'../queue'
4 import type { Task
} from
'../utility-types'
5 import { DEFAULT_TASK_NAME
} from
'../utils'
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
22 implements IWorkerNode
<Worker
, Data
> {
23 public readonly worker
: Worker
24 public readonly info
: WorkerInfo
25 public usage
: WorkerUsage
26 private readonly tasksUsage
: Map
<string, WorkerUsage
>
27 private readonly tasksQueue
: Queue
<Task
<Data
>>
30 * Constructs a new worker node.
32 * @param worker - The worker.
33 * @param workerType - The worker type.
35 constructor (worker
: Worker
, workerType
: WorkerType
) {
37 this.info
= this.initWorkerInfo(worker
, workerType
)
38 this.usage
= this.initWorkerUsage()
39 this.tasksUsage
= new Map
<string, WorkerUsage
>()
40 this.tasksQueue
= new Queue
<Task
<Data
>>()
44 public tasksQueueSize (): number {
45 return this.tasksQueue
.size
49 * Tasks queue maximum size.
51 * @returns The tasks queue maximum size.
53 private tasksQueueMaxSize (): number {
54 return this.tasksQueue
.maxSize
58 public enqueueTask (task
: Task
<Data
>): number {
59 return this.tasksQueue
.enqueue(task
)
63 public dequeueTask (): Task
<Data
> | undefined {
64 return this.tasksQueue
.dequeue()
68 public clearTasksQueue (): void {
69 this.tasksQueue
.clear()
73 public resetUsage (): void {
74 this.usage
= this.initWorkerUsage()
75 this.tasksUsage
.clear()
79 public closeChannel (): void {
80 if (this.info
.messageChannel
!= null) {
81 this.info
.messageChannel
?.port1
.unref()
82 this.info
.messageChannel
?.port2
.unref()
83 this.info
.messageChannel
?.port1
.close()
84 this.info
.messageChannel
?.port2
.close()
85 delete this.info
.messageChannel
90 public getTaskWorkerUsage (name
: string): WorkerUsage
| undefined {
91 if (name
=== DEFAULT_TASK_NAME
&& !Array.isArray(this.info
.taskFunctions
)) {
93 'Cannot get task worker usage for default task function name when task function names list is not yet defined'
97 name
=== DEFAULT_TASK_NAME
&&
98 Array.isArray(this.info
.taskFunctions
) &&
99 this.info
.taskFunctions
.length
> 1
101 name
= this.info
.taskFunctions
[1]
103 if (!this.tasksUsage
.has(name
)) {
104 this.tasksUsage
.set(name
, this.initTaskWorkerUsage(name
))
106 return this.tasksUsage
.get(name
)
109 private initWorkerInfo (worker
: Worker
, workerType
: WorkerType
): WorkerInfo
{
111 id
: this.getWorkerId(worker
, workerType
),
115 ...(workerType
=== WorkerTypes
.thread
&& {
116 messageChannel
: new MessageChannel()
121 private initWorkerUsage (): WorkerUsage
{
122 const getTasksQueueSize
= (): number => {
123 return this.tasksQueueSize()
125 const getTasksQueueMaxSize
= (): number => {
126 return this.tasksQueueMaxSize()
132 get
queued (): number {
133 return getTasksQueueSize()
135 get
maxQueued (): number {
136 return getTasksQueueMaxSize()
141 history
: new CircularArray()
144 history
: new CircularArray()
148 history
: new CircularArray()
151 history
: new CircularArray()
157 private initTaskWorkerUsage (name
: string): WorkerUsage
{
158 const getTaskQueueSize
= (): number => {
159 let taskQueueSize
= 0
160 for (const task
of this.tasksQueue
) {
161 if (task
.name
=== name
) {
171 get
queued (): number {
172 return getTaskQueueSize()
177 history
: new CircularArray()
180 history
: new CircularArray()
184 history
: new CircularArray()
187 history
: new CircularArray()
194 * Gets the worker id.
196 * @param worker - The worker.
197 * @param workerType - The worker type.
198 * @returns The worker id.
200 private getWorkerId (
202 workerType
: WorkerType
203 ): number | undefined {
204 if (workerType
=== WorkerTypes
.thread
) {
205 return worker
.threadId
206 } else if (workerType
=== WorkerTypes
.cluster
) {