a5482dcd29f6778b69c70bf322c2370be13918b3
1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import { Queue
} from
'../queue'
4 import type { Task
} from
'../utility-types'
5 import { DEFAULT_TASK_NAME
} from
'../utils'
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
22 implements IWorkerNode
<Worker
, Data
> {
23 public readonly worker
: Worker
24 public readonly info
: WorkerInfo
25 public messageChannel
?: MessageChannel
26 public usage
: WorkerUsage
27 private readonly tasksUsage
: Map
<string, WorkerUsage
>
28 private readonly tasksQueue
: Queue
<Task
<Data
>>
31 * Constructs a new worker node.
33 * @param worker - The worker.
34 * @param workerType - The worker type.
36 constructor (worker
: Worker
, workerType
: WorkerType
) {
38 this.info
= this.initWorkerInfo(worker
, workerType
)
39 if (workerType
=== WorkerTypes
.thread
) {
40 this.messageChannel
= new MessageChannel()
42 this.usage
= this.initWorkerUsage()
43 this.tasksUsage
= new Map
<string, WorkerUsage
>()
44 this.tasksQueue
= new Queue
<Task
<Data
>>()
48 public tasksQueueSize (): number {
49 return this.tasksQueue
.size
53 * Tasks queue maximum size.
55 * @returns The tasks queue maximum size.
57 private tasksQueueMaxSize (): number {
58 return this.tasksQueue
.maxSize
62 public enqueueTask (task
: Task
<Data
>): number {
63 return this.tasksQueue
.enqueue(task
)
67 public dequeueTask (): Task
<Data
> | undefined {
68 return this.tasksQueue
.dequeue()
72 public clearTasksQueue (): void {
73 this.tasksQueue
.clear()
77 public resetUsage (): void {
78 this.usage
= this.initWorkerUsage()
79 this.tasksUsage
.clear()
83 public closeChannel (): void {
84 if (this.messageChannel
!= null) {
85 this.messageChannel
?.port1
.unref()
86 this.messageChannel
?.port2
.unref()
87 this.messageChannel
?.port1
.close()
88 this.messageChannel
?.port2
.close()
89 delete this.messageChannel
94 public getTaskWorkerUsage (name
: string): WorkerUsage
| undefined {
95 if (!Array.isArray(this.info
.taskFunctions
)) {
97 `Cannot get task worker usage for task function name '${name}' when task function names list is not yet defined`
101 name
=== DEFAULT_TASK_NAME
&&
102 Array.isArray(this.info
.taskFunctions
) &&
103 this.info
.taskFunctions
.length
> 1
105 name
= this.info
.taskFunctions
[1]
107 if (!this.tasksUsage
.has(name
)) {
108 this.tasksUsage
.set(name
, this.initTaskWorkerUsage(name
))
110 return this.tasksUsage
.get(name
)
113 private initWorkerInfo (worker
: Worker
, workerType
: WorkerType
): WorkerInfo
{
115 id
: this.getWorkerId(worker
, workerType
),
122 private initWorkerUsage (): WorkerUsage
{
123 const getTasksQueueSize
= (): number => {
124 return this.tasksQueueSize()
126 const getTasksQueueMaxSize
= (): number => {
127 return this.tasksQueueMaxSize()
133 get
queued (): number {
134 return getTasksQueueSize()
136 get
maxQueued (): number {
137 return getTasksQueueMaxSize()
142 history
: new CircularArray()
145 history
: new CircularArray()
149 history
: new CircularArray()
152 history
: new CircularArray()
158 private initTaskWorkerUsage (name
: string): WorkerUsage
{
159 const getTaskQueueSize
= (): number => {
160 let taskQueueSize
= 0
161 for (const task
of this.tasksQueue
) {
162 if (task
.name
=== name
) {
172 get
queued (): number {
173 return getTaskQueueSize()
178 history
: new CircularArray()
181 history
: new CircularArray()
185 history
: new CircularArray()
188 history
: new CircularArray()
195 * Gets the worker id.
197 * @param worker - The worker.
198 * @param workerType - The worker type.
199 * @returns The worker id.
201 private getWorkerId (
203 workerType
: WorkerType
204 ): number | undefined {
205 if (workerType
=== WorkerTypes
.thread
) {
206 return worker
.threadId
207 } else if (workerType
=== WorkerTypes
.cluster
) {