1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import type { Task
} from
'../utility-types'
4 import { DEFAULT_TASK_NAME
} from
'../utils'
5 import { Deque
} from
'../deque'
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
22 implements IWorkerNode
<Worker
, Data
> {
24 public readonly worker
: Worker
26 public readonly info
: WorkerInfo
28 public messageChannel
?: MessageChannel
30 public usage
: WorkerUsage
32 public tasksQueueBackPressureSize
: number
34 public onBackPressure
?: (workerId
: number) => void
35 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
36 private readonly tasksQueue
: Deque
<Task
<Data
>>
39 * Constructs a new worker node.
41 * @param worker - The worker.
42 * @param workerType - The worker type.
43 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
47 workerType
: WorkerType
,
48 tasksQueueBackPressureSize
: number
51 throw new TypeError('Cannot construct a worker node without a worker')
53 if (workerType
== null) {
55 'Cannot construct a worker node without a worker type'
58 if (tasksQueueBackPressureSize
== null) {
60 'Cannot construct a worker node without a tasks queue back pressure size'
63 if (!Number.isSafeInteger(tasksQueueBackPressureSize
)) {
65 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
69 this.info
= this.initWorkerInfo(worker
, workerType
)
70 if (workerType
=== WorkerTypes
.thread
) {
71 this.messageChannel
= new MessageChannel()
73 this.usage
= this.initWorkerUsage()
74 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
75 this.tasksQueue
= new Deque
<Task
<Data
>>()
76 this.tasksQueueBackPressureSize
= tasksQueueBackPressureSize
80 public tasksQueueSize (): number {
81 return this.tasksQueue
.size
85 * Tasks queue maximum size.
87 * @returns The tasks queue maximum size.
89 private tasksQueueMaxSize (): number {
90 return this.tasksQueue
.maxSize
94 public enqueueTask (task
: Task
<Data
>): number {
95 const tasksQueueSize
= this.tasksQueue
.push(task
)
96 if (this.onBackPressure
!= null && this.hasBackPressure()) {
97 this.once(this.onBackPressure
)(this.info
.id
as number)
103 public unshiftTask (task
: Task
<Data
>): number {
104 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
105 if (this.onBackPressure
!= null && this.hasBackPressure()) {
106 this.once(this.onBackPressure
)(this.info
.id
as number)
108 return tasksQueueSize
112 public dequeueTask (): Task
<Data
> | undefined {
113 return this.tasksQueue
.shift()
117 public popTask (): Task
<Data
> | undefined {
118 return this.tasksQueue
.pop()
122 public clearTasksQueue (): void {
123 this.tasksQueue
.clear()
127 public hasBackPressure (): boolean {
128 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
132 public resetUsage (): void {
133 this.usage
= this.initWorkerUsage()
134 this.taskFunctionsUsage
.clear()
138 public closeChannel (): void {
139 if (this.messageChannel
!= null) {
140 this.messageChannel
?.port1
.unref()
141 this.messageChannel
?.port2
.unref()
142 this.messageChannel
?.port1
.close()
143 this.messageChannel
?.port2
.close()
144 delete this.messageChannel
149 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
150 if (!Array.isArray(this.info
.taskFunctions
)) {
152 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
156 Array.isArray(this.info
.taskFunctions
) &&
157 this.info
.taskFunctions
.length
< 3
160 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
163 if (name
=== DEFAULT_TASK_NAME
) {
164 name
= this.info
.taskFunctions
[1]
166 if (!this.taskFunctionsUsage
.has(name
)) {
167 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
169 return this.taskFunctionsUsage
.get(name
)
172 private initWorkerInfo (worker
: Worker
, workerType
: WorkerType
): WorkerInfo
{
174 id
: this.getWorkerId(worker
, workerType
),
181 private initWorkerUsage (): WorkerUsage
{
182 const getTasksQueueSize
= (): number => {
183 return this.tasksQueueSize()
185 const getTasksQueueMaxSize
= (): number => {
186 return this.tasksQueueMaxSize()
192 get
queued (): number {
193 return getTasksQueueSize()
195 get
maxQueued (): number {
196 return getTasksQueueMaxSize()
201 history
: new CircularArray()
204 history
: new CircularArray()
208 history
: new CircularArray()
211 history
: new CircularArray()
217 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
218 const getTaskFunctionQueueSize
= (): number => {
219 let taskFunctionQueueSize
= 0
220 for (const task
of this.tasksQueue
) {
222 (task
.name
=== DEFAULT_TASK_NAME
&&
223 name
=== (this.info
.taskFunctions
as string[])[1]) ||
224 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
226 ++taskFunctionQueueSize
229 return taskFunctionQueueSize
235 get
queued (): number {
236 return getTaskFunctionQueueSize()
241 history
: new CircularArray()
244 history
: new CircularArray()
248 history
: new CircularArray()
251 history
: new CircularArray()
258 * Gets the worker id.
260 * @param worker - The worker.
261 * @param workerType - The worker type.
262 * @returns The worker id.
264 private getWorkerId (
266 workerType
: WorkerType
267 ): number | undefined {
268 if (workerType
=== WorkerTypes
.thread
) {
269 return worker
.threadId
270 } else if (workerType
=== WorkerTypes
.cluster
) {
276 * Executes a function once at a time.
278 * @param fn - The function to execute.
279 * @param context - The context to bind the function to.
280 * @returns The function to execute.
283 // eslint-disable-next-line @typescript-eslint/no-explicit-any
284 fn
: (...args
: any[]) => void,
286 // eslint-disable-next-line @typescript-eslint/no-explicit-any
287 ): (...args
: any[]) => void {
289 // eslint-disable-next-line @typescript-eslint/no-explicit-any
290 return function (...args
: any[]): void {
293 fn
.apply(context
, args
)