1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import type { Task
} from
'../utility-types'
10 import { Deque
} from
'../deque'
23 * @typeParam Worker - Type of worker.
24 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
26 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
27 implements IWorkerNode
<Worker
, Data
> {
29 public readonly worker
: Worker
31 public readonly info
: WorkerInfo
33 public messageChannel
?: MessageChannel
35 public usage
: WorkerUsage
37 public tasksQueueBackPressureSize
: number
39 public onBackPressure
?: (workerId
: number) => void
41 public onEmptyQueue
?: (workerId
: number) => void
42 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
43 private readonly tasksQueue
: Deque
<Task
<Data
>>
44 private onEmptyQueueCount
: number
47 * Constructs a new worker node.
49 * @param worker - The worker.
50 * @param workerType - The worker type.
51 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
55 workerType
: WorkerType
,
56 tasksQueueBackPressureSize
: number
59 throw new TypeError('Cannot construct a worker node without a worker')
61 if (workerType
== null) {
63 'Cannot construct a worker node without a worker type'
66 if (tasksQueueBackPressureSize
== null) {
68 'Cannot construct a worker node without a tasks queue back pressure size'
71 if (!Number.isSafeInteger(tasksQueueBackPressureSize
)) {
73 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
77 this.info
= this.initWorkerInfo(worker
, workerType
)
78 if (workerType
=== WorkerTypes
.thread
) {
79 this.messageChannel
= new MessageChannel()
81 this.usage
= this.initWorkerUsage()
82 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
83 this.tasksQueue
= new Deque
<Task
<Data
>>()
84 this.tasksQueueBackPressureSize
= tasksQueueBackPressureSize
85 this.onEmptyQueueCount
= 0
89 public tasksQueueSize (): number {
90 return this.tasksQueue
.size
94 public enqueueTask (task
: Task
<Data
>): number {
95 const tasksQueueSize
= this.tasksQueue
.push(task
)
96 if (this.onBackPressure
!= null && this.hasBackPressure()) {
97 this.onBackPressure(this.info
.id
as number)
103 public unshiftTask (task
: Task
<Data
>): number {
104 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
105 if (this.onBackPressure
!= null && this.hasBackPressure()) {
106 this.onBackPressure(this.info
.id
as number)
108 return tasksQueueSize
112 public dequeueTask (): Task
<Data
> | undefined {
113 const task
= this.tasksQueue
.shift()
114 if (this.onEmptyQueue
!= null && this.tasksQueue
.size
=== 0) {
115 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
121 public popTask (): Task
<Data
> | undefined {
122 const task
= this.tasksQueue
.pop()
123 if (this.onEmptyQueue
!= null && this.tasksQueue
.size
=== 0) {
124 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
130 public clearTasksQueue (): void {
131 this.tasksQueue
.clear()
135 public hasBackPressure (): boolean {
136 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
140 public resetUsage (): void {
141 this.usage
= this.initWorkerUsage()
142 this.taskFunctionsUsage
.clear()
146 public closeChannel (): void {
147 if (this.messageChannel
!= null) {
148 this.messageChannel
?.port1
.unref()
149 this.messageChannel
?.port2
.unref()
150 this.messageChannel
?.port1
.close()
151 this.messageChannel
?.port2
.close()
152 delete this.messageChannel
157 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
158 if (!Array.isArray(this.info
.taskFunctions
)) {
160 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
164 Array.isArray(this.info
.taskFunctions
) &&
165 this.info
.taskFunctions
.length
< 3
168 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
171 if (name
=== DEFAULT_TASK_NAME
) {
172 name
= this.info
.taskFunctions
[1]
174 if (!this.taskFunctionsUsage
.has(name
)) {
175 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
177 return this.taskFunctionsUsage
.get(name
)
180 private async startOnEmptyQueue (): Promise
<void> {
181 if (this.tasksQueue
.size
> 0) {
182 this.onEmptyQueueCount
= 0
185 (this.onEmptyQueue
as (workerId
: number) => void)(this.info
.id
as number)
186 ++this.onEmptyQueueCount
187 await sleep(exponentialDelay(this.onEmptyQueueCount
))
188 await this.startOnEmptyQueue()
191 private initWorkerInfo (worker
: Worker
, workerType
: WorkerType
): WorkerInfo
{
193 id
: this.getWorkerId(worker
, workerType
),
200 private initWorkerUsage (): WorkerUsage
{
201 const getTasksQueueSize
= (): number => {
202 return this.tasksQueue
.size
204 const getTasksQueueMaxSize
= (): number => {
205 return this.tasksQueue
.maxSize
211 get
queued (): number {
212 return getTasksQueueSize()
214 get
maxQueued (): number {
215 return getTasksQueueMaxSize()
221 history
: new CircularArray()
224 history
: new CircularArray()
228 history
: new CircularArray()
231 history
: new CircularArray()
237 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
238 const getTaskFunctionQueueSize
= (): number => {
239 let taskFunctionQueueSize
= 0
240 for (const task
of this.tasksQueue
) {
242 (task
.name
=== DEFAULT_TASK_NAME
&&
243 name
=== (this.info
.taskFunctions
as string[])[1]) ||
244 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
246 ++taskFunctionQueueSize
249 return taskFunctionQueueSize
255 get
queued (): number {
256 return getTaskFunctionQueueSize()
262 history
: new CircularArray()
265 history
: new CircularArray()
269 history
: new CircularArray()
272 history
: new CircularArray()
279 * Gets the worker id.
281 * @param worker - The worker.
282 * @param workerType - The worker type.
283 * @returns The worker id.
285 private getWorkerId (
287 workerType
: WorkerType
288 ): number | undefined {
289 if (workerType
=== WorkerTypes
.thread
) {
290 return worker
.threadId
291 } else if (workerType
=== WorkerTypes
.cluster
) {