1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import type { Task
} from
'../utility-types'
4 import { DEFAULT_TASK_NAME
, once
} from
'../utils'
5 import { Deque
} from
'../deque'
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
22 implements IWorkerNode
<Worker
, Data
> {
24 public readonly worker
: Worker
26 public readonly info
: WorkerInfo
28 public messageChannel
?: MessageChannel
30 public usage
: WorkerUsage
32 public tasksQueueBackPressureSize
: number
34 public onBackPressure
?: (workerId
: number) => void
36 public onEmptyQueue
?: (workerId
: number) => void
37 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
38 private readonly tasksQueue
: Deque
<Task
<Data
>>
41 * Constructs a new worker node.
43 * @param worker - The worker.
44 * @param workerType - The worker type.
45 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
49 workerType
: WorkerType
,
50 tasksQueueBackPressureSize
: number
53 throw new TypeError('Cannot construct a worker node without a worker')
55 if (workerType
== null) {
57 'Cannot construct a worker node without a worker type'
60 if (tasksQueueBackPressureSize
== null) {
62 'Cannot construct a worker node without a tasks queue back pressure size'
65 if (!Number.isSafeInteger(tasksQueueBackPressureSize
)) {
67 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
71 this.info
= this.initWorkerInfo(worker
, workerType
)
72 if (workerType
=== WorkerTypes
.thread
) {
73 this.messageChannel
= new MessageChannel()
75 this.usage
= this.initWorkerUsage()
76 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
77 this.tasksQueue
= new Deque
<Task
<Data
>>()
78 this.tasksQueueBackPressureSize
= tasksQueueBackPressureSize
82 public tasksQueueSize (): number {
83 return this.tasksQueue
.size
87 public enqueueTask (task
: Task
<Data
>): number {
88 const tasksQueueSize
= this.tasksQueue
.push(task
)
89 if (this.onBackPressure
!= null && this.hasBackPressure()) {
90 once(this.onBackPressure
, this)(this.info
.id
as number)
96 public unshiftTask (task
: Task
<Data
>): number {
97 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
98 if (this.onBackPressure
!= null && this.hasBackPressure()) {
99 once(this.onBackPressure
, this)(this.info
.id
as number)
101 return tasksQueueSize
105 public dequeueTask (): Task
<Data
> | undefined {
106 const task
= this.tasksQueue
.shift()
107 if (this.onEmptyQueue
!= null && this.tasksQueue
.size
=== 0) {
108 once(this.onEmptyQueue
, this)(this.info
.id
as number)
114 public popTask (): Task
<Data
> | undefined {
115 const task
= this.tasksQueue
.pop()
116 if (this.onEmptyQueue
!= null && this.tasksQueue
.size
=== 0) {
117 once(this.onEmptyQueue
, this)(this.info
.id
as number)
123 public clearTasksQueue (): void {
124 this.tasksQueue
.clear()
128 public hasBackPressure (): boolean {
129 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
133 public resetUsage (): void {
134 this.usage
= this.initWorkerUsage()
135 this.taskFunctionsUsage
.clear()
139 public closeChannel (): void {
140 if (this.messageChannel
!= null) {
141 this.messageChannel
?.port1
.unref()
142 this.messageChannel
?.port2
.unref()
143 this.messageChannel
?.port1
.close()
144 this.messageChannel
?.port2
.close()
145 delete this.messageChannel
150 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
151 if (!Array.isArray(this.info
.taskFunctions
)) {
153 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
157 Array.isArray(this.info
.taskFunctions
) &&
158 this.info
.taskFunctions
.length
< 3
161 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
164 if (name
=== DEFAULT_TASK_NAME
) {
165 name
= this.info
.taskFunctions
[1]
167 if (!this.taskFunctionsUsage
.has(name
)) {
168 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
170 return this.taskFunctionsUsage
.get(name
)
173 private initWorkerInfo (worker
: Worker
, workerType
: WorkerType
): WorkerInfo
{
175 id
: this.getWorkerId(worker
, workerType
),
182 private initWorkerUsage (): WorkerUsage
{
183 const getTasksQueueSize
= (): number => {
184 return this.tasksQueue
.size
186 const getTasksQueueMaxSize
= (): number => {
187 return this.tasksQueue
.maxSize
193 get
queued (): number {
194 return getTasksQueueSize()
196 get
maxQueued (): number {
197 return getTasksQueueMaxSize()
202 history
: new CircularArray()
205 history
: new CircularArray()
209 history
: new CircularArray()
212 history
: new CircularArray()
218 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
219 const getTaskFunctionQueueSize
= (): number => {
220 let taskFunctionQueueSize
= 0
221 for (const task
of this.tasksQueue
) {
223 (task
.name
=== DEFAULT_TASK_NAME
&&
224 name
=== (this.info
.taskFunctions
as string[])[1]) ||
225 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
227 ++taskFunctionQueueSize
230 return taskFunctionQueueSize
236 get
queued (): number {
237 return getTaskFunctionQueueSize()
242 history
: new CircularArray()
245 history
: new CircularArray()
249 history
: new CircularArray()
252 history
: new CircularArray()
259 * Gets the worker id.
261 * @param worker - The worker.
262 * @param workerType - The worker type.
263 * @returns The worker id.
265 private getWorkerId (
267 workerType
: WorkerType
268 ): number | undefined {
269 if (workerType
=== WorkerTypes
.thread
) {
270 return worker
.threadId
271 } else if (workerType
=== WorkerTypes
.cluster
) {