1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import type { Task
} from
'../utility-types'
10 import { Deque
} from
'../deque'
20 type EmptyQueueCallback
= (workerId
: number) => void
21 type BackPressureCallback
= EmptyQueueCallback
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
29 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
30 implements IWorkerNode
<Worker
, Data
> {
32 public readonly worker
: Worker
34 public readonly info
: WorkerInfo
36 public usage
: WorkerUsage
38 public messageChannel
?: MessageChannel
40 public tasksQueueBackPressureSize
: number
42 public onBackPressure
?: BackPressureCallback
44 public onEmptyQueue
?: EmptyQueueCallback
45 private readonly tasksQueue
: Deque
<Task
<Data
>>
46 private onEmptyQueueCount
: number
47 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
50 * Constructs a new worker node.
52 * @param worker - The worker.
53 * @param workerType - The worker type.
54 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
58 workerType
: WorkerType
,
59 tasksQueueBackPressureSize
: number
62 throw new TypeError('Cannot construct a worker node without a worker')
64 if (workerType
== null) {
66 'Cannot construct a worker node without a worker type'
69 if (tasksQueueBackPressureSize
== null) {
71 'Cannot construct a worker node without a tasks queue back pressure size'
74 if (!Number.isSafeInteger(tasksQueueBackPressureSize
)) {
76 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
80 this.info
= this.initWorkerInfo(worker
, workerType
)
81 this.usage
= this.initWorkerUsage()
82 if (workerType
=== WorkerTypes
.thread
) {
83 this.messageChannel
= new MessageChannel()
85 this.tasksQueueBackPressureSize
= tasksQueueBackPressureSize
86 this.tasksQueue
= new Deque
<Task
<Data
>>()
87 this.onEmptyQueueCount
= 0
88 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
92 public tasksQueueSize (): number {
93 return this.tasksQueue
.size
97 public enqueueTask (task
: Task
<Data
>): number {
98 const tasksQueueSize
= this.tasksQueue
.push(task
)
99 if (this.onBackPressure
!= null && this.hasBackPressure()) {
100 this.onBackPressure(this.info
.id
as number)
102 return tasksQueueSize
106 public unshiftTask (task
: Task
<Data
>): number {
107 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
108 if (this.onBackPressure
!= null && this.hasBackPressure()) {
109 this.onBackPressure(this.info
.id
as number)
111 return tasksQueueSize
115 public dequeueTask (): Task
<Data
> | undefined {
116 const task
= this.tasksQueue
.shift()
117 if (this.onEmptyQueue
!= null && this.tasksQueue
.size
=== 0) {
118 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
124 public popTask (): Task
<Data
> | undefined {
125 const task
= this.tasksQueue
.pop()
126 if (this.onEmptyQueue
!= null && this.tasksQueue
.size
=== 0) {
127 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
133 public clearTasksQueue (): void {
134 this.tasksQueue
.clear()
138 public hasBackPressure (): boolean {
139 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
143 public resetUsage (): void {
144 this.usage
= this.initWorkerUsage()
145 this.taskFunctionsUsage
.clear()
149 public closeChannel (): void {
150 if (this.messageChannel
!= null) {
151 this.messageChannel
?.port1
.unref()
152 this.messageChannel
?.port2
.unref()
153 this.messageChannel
?.port1
.close()
154 this.messageChannel
?.port2
.close()
155 delete this.messageChannel
160 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
161 if (!Array.isArray(this.info
.taskFunctions
)) {
163 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
167 Array.isArray(this.info
.taskFunctions
) &&
168 this.info
.taskFunctions
.length
< 3
171 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
174 if (name
=== DEFAULT_TASK_NAME
) {
175 name
= this.info
.taskFunctions
[1]
177 if (!this.taskFunctionsUsage
.has(name
)) {
178 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
180 return this.taskFunctionsUsage
.get(name
)
183 private async startOnEmptyQueue (): Promise
<void> {
185 this.onEmptyQueueCount
> 0 &&
186 (this.usage
.tasks
.executing
> 0 || this.tasksQueue
.size
> 0)
188 this.onEmptyQueueCount
= 0
191 (this.onEmptyQueue
as EmptyQueueCallback
)(this.info
.id
as number)
192 ++this.onEmptyQueueCount
193 await sleep(exponentialDelay(this.onEmptyQueueCount
))
194 await this.startOnEmptyQueue()
197 private initWorkerInfo (worker
: Worker
, workerType
: WorkerType
): WorkerInfo
{
199 id
: this.getWorkerId(worker
, workerType
),
206 private initWorkerUsage (): WorkerUsage
{
207 const getTasksQueueSize
= (): number => {
208 return this.tasksQueue
.size
210 const getTasksQueueMaxSize
= (): number => {
211 return this.tasksQueue
.maxSize
217 get
queued (): number {
218 return getTasksQueueSize()
220 get
maxQueued (): number {
221 return getTasksQueueMaxSize()
227 history
: new CircularArray()
230 history
: new CircularArray()
234 history
: new CircularArray()
237 history
: new CircularArray()
243 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
244 const getTaskFunctionQueueSize
= (): number => {
245 let taskFunctionQueueSize
= 0
246 for (const task
of this.tasksQueue
) {
248 (task
.name
=== DEFAULT_TASK_NAME
&&
249 name
=== (this.info
.taskFunctions
as string[])[1]) ||
250 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
252 ++taskFunctionQueueSize
255 return taskFunctionQueueSize
261 get
queued (): number {
262 return getTaskFunctionQueueSize()
268 history
: new CircularArray()
271 history
: new CircularArray()
275 history
: new CircularArray()
278 history
: new CircularArray()
285 * Gets the worker id.
287 * @param worker - The worker.
288 * @param workerType - The worker type.
289 * @returns The worker id.
291 private getWorkerId (
293 workerType
: WorkerType
294 ): number | undefined {
295 if (workerType
=== WorkerTypes
.thread
) {
296 return worker
.threadId
297 } else if (workerType
=== WorkerTypes
.cluster
) {