1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import type { Task
} from
'../utility-types'
12 import { Deque
} from
'../deque'
18 type WorkerNodeEventCallback
,
27 * @typeParam Worker - Type of worker.
28 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
31 implements IWorkerNode
<Worker
, Data
> {
33 public readonly worker
: Worker
35 public readonly info
: WorkerInfo
37 public usage
: WorkerUsage
39 public strategyData
?: StrategyData
41 public messageChannel
?: MessageChannel
43 public tasksQueueBackPressureSize
: number
45 public onBackPressure
?: WorkerNodeEventCallback
47 public onEmptyQueue
?: WorkerNodeEventCallback
48 private readonly tasksQueue
: Deque
<Task
<Data
>>
49 private onBackPressureStarted
: boolean
50 private onEmptyQueueCount
: number
51 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
54 * Constructs a new worker node.
56 * @param worker - The worker.
57 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
59 constructor (worker
: Worker
, tasksQueueBackPressureSize
: number) {
60 this.checkWorkerNodeArguments(worker
, tasksQueueBackPressureSize
)
62 this.info
= this.initWorkerInfo(worker
)
63 this.usage
= this.initWorkerUsage()
64 if (this.info
.type === WorkerTypes
.thread
) {
65 this.messageChannel
= new MessageChannel()
67 this.tasksQueueBackPressureSize
= tasksQueueBackPressureSize
68 this.tasksQueue
= new Deque
<Task
<Data
>>()
69 this.onBackPressureStarted
= false
70 this.onEmptyQueueCount
= 0
71 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
75 public tasksQueueSize (): number {
76 return this.tasksQueue
.size
80 public enqueueTask (task
: Task
<Data
>): number {
81 const tasksQueueSize
= this.tasksQueue
.push(task
)
83 this.onBackPressure
!= null &&
84 this.hasBackPressure() &&
85 !this.onBackPressureStarted
87 this.onBackPressureStarted
= true
88 this.onBackPressure(this.info
.id
as number)
89 this.onBackPressureStarted
= false
95 public unshiftTask (task
: Task
<Data
>): number {
96 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
98 this.onBackPressure
!= null &&
99 this.hasBackPressure() &&
100 !this.onBackPressureStarted
102 this.onBackPressureStarted
= true
103 this.onBackPressure(this.info
.id
as number)
104 this.onBackPressureStarted
= false
106 return tasksQueueSize
110 public dequeueTask (): Task
<Data
> | undefined {
111 const task
= this.tasksQueue
.shift()
113 this.onEmptyQueue
!= null &&
114 this.tasksQueue
.size
=== 0 &&
115 this.onEmptyQueueCount
=== 0
117 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
123 public popTask (): Task
<Data
> | undefined {
124 const task
= this.tasksQueue
.pop()
126 this.onEmptyQueue
!= null &&
127 this.tasksQueue
.size
=== 0 &&
128 this.onEmptyQueueCount
=== 0
130 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
136 public clearTasksQueue (): void {
137 this.tasksQueue
.clear()
141 public hasBackPressure (): boolean {
142 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
146 public resetUsage (): void {
147 this.usage
= this.initWorkerUsage()
148 this.taskFunctionsUsage
.clear()
152 public closeChannel (): void {
153 if (this.messageChannel
!= null) {
154 this.messageChannel
?.port1
.unref()
155 this.messageChannel
?.port2
.unref()
156 this.messageChannel
?.port1
.close()
157 this.messageChannel
?.port2
.close()
158 delete this.messageChannel
163 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
164 if (!Array.isArray(this.info
.taskFunctionNames
)) {
166 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
170 Array.isArray(this.info
.taskFunctionNames
) &&
171 this.info
.taskFunctionNames
.length
< 3
174 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
177 if (name
=== DEFAULT_TASK_NAME
) {
178 name
= this.info
.taskFunctionNames
[1]
180 if (!this.taskFunctionsUsage
.has(name
)) {
181 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
183 return this.taskFunctionsUsage
.get(name
)
187 public deleteTaskFunctionWorkerUsage (name
: string): boolean {
188 return this.taskFunctionsUsage
.delete(name
)
191 private async startOnEmptyQueue (): Promise
<void> {
193 this.onEmptyQueueCount
> 0 &&
194 (this.usage
.tasks
.executing
> 0 || this.tasksQueue
.size
> 0)
196 this.onEmptyQueueCount
= 0
199 ++this.onEmptyQueueCount
200 this.onEmptyQueue
?.(this.info
.id
as number)
201 await sleep(exponentialDelay(this.onEmptyQueueCount
))
202 await this.startOnEmptyQueue()
205 private initWorkerInfo (worker
: Worker
): WorkerInfo
{
207 id
: getWorkerId(worker
),
208 type: getWorkerType(worker
) as WorkerType
,
214 private initWorkerUsage (): WorkerUsage
{
215 const getTasksQueueSize
= (): number => {
216 return this.tasksQueue
.size
218 const getTasksQueueMaxSize
= (): number => {
219 return this.tasksQueue
.maxSize
225 get
queued (): number {
226 return getTasksQueueSize()
228 get
maxQueued (): number {
229 return getTasksQueueMaxSize()
235 history
: new CircularArray()
238 history
: new CircularArray()
242 history
: new CircularArray()
245 history
: new CircularArray()
251 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
252 const getTaskFunctionQueueSize
= (): number => {
253 let taskFunctionQueueSize
= 0
254 for (const task
of this.tasksQueue
) {
256 (task
.name
=== DEFAULT_TASK_NAME
&&
257 name
=== (this.info
.taskFunctionNames
as string[])[1]) ||
258 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
260 ++taskFunctionQueueSize
263 return taskFunctionQueueSize
269 get
queued (): number {
270 return getTaskFunctionQueueSize()
276 history
: new CircularArray()
279 history
: new CircularArray()
283 history
: new CircularArray()
286 history
: new CircularArray()
292 private checkWorkerNodeArguments (
294 tasksQueueBackPressureSize
: number
296 if (worker
== null) {
297 throw new TypeError('Cannot construct a worker node without a worker')
299 if (tasksQueueBackPressureSize
== null) {
301 'Cannot construct a worker node without a tasks queue back pressure size'
304 if (!Number.isSafeInteger(tasksQueueBackPressureSize
)) {
306 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
309 if (tasksQueueBackPressureSize
<= 0) {
310 throw new RangeError(
311 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'