1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import { Queue
} from
'../queue'
4 import type { Task
} from
'../utility-types'
17 * @typeParam Worker - Type of worker.
18 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
21 implements IWorkerNode
<Worker
, Data
> {
22 public readonly worker
: Worker
23 public readonly info
: WorkerInfo
24 public usage
: WorkerUsage
25 private readonly tasksUsage
: Map
<string, WorkerUsage
>
26 private readonly tasksQueue
: Queue
<Task
<Data
>>
29 * Constructs a new worker node.
31 * @param worker - The worker.
32 * @param workerType - The worker type.
34 constructor (worker
: Worker
, workerType
: WorkerType
) {
36 this.info
= this.initWorkerInfo(worker
, workerType
)
37 this.usage
= this.initWorkerUsage()
38 this.tasksUsage
= new Map
<string, WorkerUsage
>()
39 this.tasksQueue
= new Queue
<Task
<Data
>>()
43 public tasksQueueSize (): number {
44 return this.tasksQueue
.size
48 * Tasks queue maximum size.
50 * @returns The tasks queue maximum size.
52 private tasksQueueMaxSize (): number {
53 return this.tasksQueue
.maxSize
57 public enqueueTask (task
: Task
<Data
>): number {
58 return this.tasksQueue
.enqueue(task
)
62 public dequeueTask (): Task
<Data
> | undefined {
63 return this.tasksQueue
.dequeue()
67 public clearTasksQueue (): void {
68 this.tasksQueue
.clear()
72 public resetUsage (): void {
73 this.usage
= this.initWorkerUsage()
74 this.tasksUsage
.clear()
78 public closeChannel (): void {
79 if (this.info
.messageChannel
!= null) {
80 this.info
.messageChannel
?.port1
.unref()
81 this.info
.messageChannel
?.port2
.unref()
82 this.info
.messageChannel
?.port1
.close()
83 this.info
.messageChannel
?.port2
.close()
84 delete this.info
.messageChannel
89 public getTaskWorkerUsage (name
: string): WorkerUsage
| undefined {
90 if (!this.tasksUsage
.has(name
)) {
91 this.tasksUsage
.set(name
, this.initTaskWorkerUsage(name
))
93 return this.tasksUsage
.get(name
)
96 private initWorkerInfo (worker
: Worker
, workerType
: WorkerType
): WorkerInfo
{
98 id
: this.getWorkerId(worker
, workerType
),
102 ...(workerType
=== WorkerTypes
.thread
&& {
103 messageChannel
: new MessageChannel()
108 private initWorkerUsage (): WorkerUsage
{
109 const getTasksQueueSize
= (): number => {
110 return this.tasksQueueSize()
112 const getTasksQueueMaxSize
= (): number => {
113 return this.tasksQueueMaxSize()
119 get
queued (): number {
120 return getTasksQueueSize()
122 get
maxQueued (): number {
123 return getTasksQueueMaxSize()
128 history
: new CircularArray()
131 history
: new CircularArray()
135 history
: new CircularArray()
138 history
: new CircularArray()
144 private initTaskWorkerUsage (name
: string): WorkerUsage
{
145 const getTaskQueueSize
= (): number => {
146 let taskQueueSize
= 0
147 for (const task
of this.tasksQueue
) {
148 if (task
.name
=== name
) {
158 get
queued (): number {
159 return getTaskQueueSize()
164 history
: new CircularArray()
167 history
: new CircularArray()
171 history
: new CircularArray()
174 history
: new CircularArray()
181 * Gets the worker id.
183 * @param worker - The worker.
184 * @param workerType - The worker type.
185 * @returns The worker id.
187 private getWorkerId (
189 workerType
: WorkerType
190 ): number | undefined {
191 if (workerType
=== WorkerTypes
.thread
) {
192 return worker
.threadId
193 } else if (workerType
=== WorkerTypes
.cluster
) {