1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import type { Task
} from
'../utility-types'
10 import { Deque
} from
'../deque'
12 type BackPressureCallback
,
13 type EmptyQueueCallback
,
25 * @typeParam Worker - Type of worker.
26 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
29 implements IWorkerNode
<Worker
, Data
> {
31 public readonly worker
: Worker
33 public readonly info
: WorkerInfo
35 public usage
: WorkerUsage
37 public messageChannel
?: MessageChannel
39 public tasksQueueBackPressureSize
: number
41 public onBackPressure
?: BackPressureCallback
43 public onEmptyQueue
?: EmptyQueueCallback
44 private readonly tasksQueue
: Deque
<Task
<Data
>>
45 private onEmptyQueueCount
: number
46 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
49 * Constructs a new worker node.
51 * @param worker - The worker.
52 * @param workerType - The worker type.
53 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
57 workerType
: WorkerType
,
58 tasksQueueBackPressureSize
: number
61 throw new TypeError('Cannot construct a worker node without a worker')
63 if (workerType
== null) {
65 'Cannot construct a worker node without a worker type'
68 if (tasksQueueBackPressureSize
== null) {
70 'Cannot construct a worker node without a tasks queue back pressure size'
73 if (!Number.isSafeInteger(tasksQueueBackPressureSize
)) {
75 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
79 this.info
= this.initWorkerInfo(worker
, workerType
)
80 this.usage
= this.initWorkerUsage()
81 if (workerType
=== WorkerTypes
.thread
) {
82 this.messageChannel
= new MessageChannel()
84 this.tasksQueueBackPressureSize
= tasksQueueBackPressureSize
85 this.tasksQueue
= new Deque
<Task
<Data
>>()
86 this.onEmptyQueueCount
= 0
87 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
91 public tasksQueueSize (): number {
92 return this.tasksQueue
.size
96 public enqueueTask (task
: Task
<Data
>): number {
97 const tasksQueueSize
= this.tasksQueue
.push(task
)
98 if (this.onBackPressure
!= null && this.hasBackPressure()) {
99 this.onBackPressure(this.info
.id
as number)
101 return tasksQueueSize
105 public unshiftTask (task
: Task
<Data
>): number {
106 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
107 if (this.onBackPressure
!= null && this.hasBackPressure()) {
108 this.onBackPressure(this.info
.id
as number)
110 return tasksQueueSize
114 public dequeueTask (): Task
<Data
> | undefined {
115 const task
= this.tasksQueue
.shift()
116 if (this.onEmptyQueue
!= null && this.tasksQueue
.size
=== 0) {
117 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
123 public popTask (): Task
<Data
> | undefined {
124 const task
= this.tasksQueue
.pop()
125 if (this.onEmptyQueue
!= null && this.tasksQueue
.size
=== 0) {
126 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
132 public clearTasksQueue (): void {
133 this.tasksQueue
.clear()
137 public hasBackPressure (): boolean {
138 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
142 public resetUsage (): void {
143 this.usage
= this.initWorkerUsage()
144 this.taskFunctionsUsage
.clear()
148 public closeChannel (): void {
149 if (this.messageChannel
!= null) {
150 this.messageChannel
?.port1
.unref()
151 this.messageChannel
?.port2
.unref()
152 this.messageChannel
?.port1
.close()
153 this.messageChannel
?.port2
.close()
154 delete this.messageChannel
159 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
160 if (!Array.isArray(this.info
.taskFunctions
)) {
162 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
166 Array.isArray(this.info
.taskFunctions
) &&
167 this.info
.taskFunctions
.length
< 3
170 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
173 if (name
=== DEFAULT_TASK_NAME
) {
174 name
= this.info
.taskFunctions
[1]
176 if (!this.taskFunctionsUsage
.has(name
)) {
177 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
179 return this.taskFunctionsUsage
.get(name
)
182 private async startOnEmptyQueue (): Promise
<void> {
184 this.onEmptyQueueCount
> 0 &&
185 (this.usage
.tasks
.executing
> 0 || this.tasksQueue
.size
> 0)
187 this.onEmptyQueueCount
= 0
190 (this.onEmptyQueue
as EmptyQueueCallback
)(this.info
.id
as number)
191 ++this.onEmptyQueueCount
192 await sleep(exponentialDelay(this.onEmptyQueueCount
))
193 await this.startOnEmptyQueue()
196 private initWorkerInfo (worker
: Worker
, workerType
: WorkerType
): WorkerInfo
{
198 id
: this.getWorkerId(worker
, workerType
),
205 private initWorkerUsage (): WorkerUsage
{
206 const getTasksQueueSize
= (): number => {
207 return this.tasksQueue
.size
209 const getTasksQueueMaxSize
= (): number => {
210 return this.tasksQueue
.maxSize
216 get
queued (): number {
217 return getTasksQueueSize()
219 get
maxQueued (): number {
220 return getTasksQueueMaxSize()
226 history
: new CircularArray()
229 history
: new CircularArray()
233 history
: new CircularArray()
236 history
: new CircularArray()
242 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
243 const getTaskFunctionQueueSize
= (): number => {
244 let taskFunctionQueueSize
= 0
245 for (const task
of this.tasksQueue
) {
247 (task
.name
=== DEFAULT_TASK_NAME
&&
248 name
=== (this.info
.taskFunctions
as string[])[1]) ||
249 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
251 ++taskFunctionQueueSize
254 return taskFunctionQueueSize
260 get
queued (): number {
261 return getTaskFunctionQueueSize()
267 history
: new CircularArray()
270 history
: new CircularArray()
274 history
: new CircularArray()
277 history
: new CircularArray()
284 * Gets the worker id.
286 * @param worker - The worker.
287 * @param workerType - The worker type.
288 * @returns The worker id.
290 private getWorkerId (
292 workerType
: WorkerType
293 ): number | undefined {
294 if (workerType
=== WorkerTypes
.thread
) {
295 return worker
.threadId
296 } else if (workerType
=== WorkerTypes
.cluster
) {