1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import { Queue
} from
'../queue'
4 import type { Task
} from
'../utility-types'
5 import { DEFAULT_TASK_NAME
} from
'../utils'
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
22 implements IWorkerNode
<Worker
, Data
> {
24 public readonly worker
: Worker
26 public readonly info
: WorkerInfo
28 public messageChannel
?: MessageChannel
30 public usage
: WorkerUsage
31 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
32 private readonly tasksQueue
: Queue
<Task
<Data
>>
33 private readonly tasksQueueBackPressureSize
: number
36 * Constructs a new worker node.
38 * @param worker - The worker.
39 * @param workerType - The worker type.
40 * @param poolMaxSize - The pool maximum size.
42 constructor (worker
: Worker
, workerType
: WorkerType
, poolMaxSize
: number) {
44 throw new TypeError('Cannot construct a worker node without a worker')
46 if (workerType
== null) {
48 'Cannot construct a worker node without a worker type'
51 if (poolMaxSize
== null) {
53 'Cannot construct a worker node without a pool maximum size'
56 if (!Number.isSafeInteger(poolMaxSize
)) {
58 'Cannot construct a worker node with a pool maximum size that is not an integer'
62 this.info
= this.initWorkerInfo(worker
, workerType
)
63 if (workerType
=== WorkerTypes
.thread
) {
64 this.messageChannel
= new MessageChannel()
66 this.usage
= this.initWorkerUsage()
67 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
68 this.tasksQueue
= new Queue
<Task
<Data
>>()
69 this.tasksQueueBackPressureSize
= Math.pow(poolMaxSize
, 2)
73 public tasksQueueSize (): number {
74 return this.tasksQueue
.size
78 * Tasks queue maximum size.
80 * @returns The tasks queue maximum size.
82 private tasksQueueMaxSize (): number {
83 return this.tasksQueue
.maxSize
87 public enqueueTask (task
: Task
<Data
>): number {
88 return this.tasksQueue
.enqueue(task
)
92 public dequeueTask (): Task
<Data
> | undefined {
93 return this.tasksQueue
.dequeue()
97 public clearTasksQueue (): void {
98 this.tasksQueue
.clear()
102 public hasBackPressure (): boolean {
103 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
107 public resetUsage (): void {
108 this.usage
= this.initWorkerUsage()
109 this.taskFunctionsUsage
.clear()
113 public closeChannel (): void {
114 if (this.messageChannel
!= null) {
115 this.messageChannel
?.port1
.unref()
116 this.messageChannel
?.port2
.unref()
117 this.messageChannel
?.port1
.close()
118 this.messageChannel
?.port2
.close()
119 delete this.messageChannel
124 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
125 if (!Array.isArray(this.info
.taskFunctions
)) {
127 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
131 Array.isArray(this.info
.taskFunctions
) &&
132 this.info
.taskFunctions
.length
< 3
135 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
138 if (name
=== DEFAULT_TASK_NAME
) {
139 name
= this.info
.taskFunctions
[1]
141 if (!this.taskFunctionsUsage
.has(name
)) {
142 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
144 return this.taskFunctionsUsage
.get(name
)
147 private initWorkerInfo (worker
: Worker
, workerType
: WorkerType
): WorkerInfo
{
149 id
: this.getWorkerId(worker
, workerType
),
156 private initWorkerUsage (): WorkerUsage
{
157 const getTasksQueueSize
= (): number => {
158 return this.tasksQueueSize()
160 const getTasksQueueMaxSize
= (): number => {
161 return this.tasksQueueMaxSize()
167 get
queued (): number {
168 return getTasksQueueSize()
170 get
maxQueued (): number {
171 return getTasksQueueMaxSize()
176 history
: new CircularArray()
179 history
: new CircularArray()
183 history
: new CircularArray()
186 history
: new CircularArray()
192 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
193 const getTaskFunctionQueueSize
= (): number => {
194 let taskFunctionQueueSize
= 0
195 for (const task
of this.tasksQueue
) {
197 (task
.name
=== DEFAULT_TASK_NAME
&&
198 name
=== (this.info
.taskFunctions
as string[])[1]) ||
199 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
201 ++taskFunctionQueueSize
204 return taskFunctionQueueSize
210 get
queued (): number {
211 return getTaskFunctionQueueSize()
216 history
: new CircularArray()
219 history
: new CircularArray()
223 history
: new CircularArray()
226 history
: new CircularArray()
233 * Gets the worker id.
235 * @param worker - The worker.
236 * @param workerType - The worker type.
237 * @returns The worker id.
239 private getWorkerId (
241 workerType
: WorkerType
242 ): number | undefined {
243 if (workerType
=== WorkerTypes
.thread
) {
244 return worker
.threadId
245 } else if (workerType
=== WorkerTypes
.cluster
) {