1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import type { Task
} from
'../utility-types'
12 import { Deque
} from
'../deque'
17 type WorkerNodeEventCallback
,
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
29 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
30 implements IWorkerNode
<Worker
, Data
> {
32 public readonly worker
: Worker
34 public readonly info
: WorkerInfo
36 public usage
: WorkerUsage
38 public messageChannel
?: MessageChannel
40 public tasksQueueBackPressureSize
: number
42 public onBackPressure
?: WorkerNodeEventCallback
44 public onEmptyQueue
?: WorkerNodeEventCallback
45 private readonly tasksQueue
: Deque
<Task
<Data
>>
46 private onEmptyQueueCount
: number
47 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
50 * Constructs a new worker node.
52 * @param worker - The worker.
53 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
55 constructor (worker
: Worker
, tasksQueueBackPressureSize
: number) {
56 this.checkWorkerNodeArguments(worker
, tasksQueueBackPressureSize
)
58 this.info
= this.initWorkerInfo(worker
)
59 this.usage
= this.initWorkerUsage()
60 if (this.info
.type === WorkerTypes
.thread
) {
61 this.messageChannel
= new MessageChannel()
63 this.tasksQueueBackPressureSize
= tasksQueueBackPressureSize
64 this.tasksQueue
= new Deque
<Task
<Data
>>()
65 this.onEmptyQueueCount
= 0
66 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
70 public tasksQueueSize (): number {
71 return this.tasksQueue
.size
75 public enqueueTask (task
: Task
<Data
>): number {
76 const tasksQueueSize
= this.tasksQueue
.push(task
)
77 if (this.onBackPressure
!= null && this.hasBackPressure()) {
78 this.onBackPressure(this.info
.id
as number)
84 public unshiftTask (task
: Task
<Data
>): number {
85 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
86 if (this.onBackPressure
!= null && this.hasBackPressure()) {
87 this.onBackPressure(this.info
.id
as number)
93 public dequeueTask (): Task
<Data
> | undefined {
94 const task
= this.tasksQueue
.shift()
95 if (this.onEmptyQueue
!= null && this.tasksQueue
.size
=== 0) {
96 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
102 public popTask (): Task
<Data
> | undefined {
103 const task
= this.tasksQueue
.pop()
104 if (this.onEmptyQueue
!= null && this.tasksQueue
.size
=== 0) {
105 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
111 public clearTasksQueue (): void {
112 this.tasksQueue
.clear()
116 public hasBackPressure (): boolean {
117 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
121 public resetUsage (): void {
122 this.usage
= this.initWorkerUsage()
123 this.taskFunctionsUsage
.clear()
127 public closeChannel (): void {
128 if (this.messageChannel
!= null) {
129 this.messageChannel
?.port1
.unref()
130 this.messageChannel
?.port2
.unref()
131 this.messageChannel
?.port1
.close()
132 this.messageChannel
?.port2
.close()
133 delete this.messageChannel
138 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
139 if (!Array.isArray(this.info
.taskFunctions
)) {
141 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
145 Array.isArray(this.info
.taskFunctions
) &&
146 this.info
.taskFunctions
.length
< 3
149 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
152 if (name
=== DEFAULT_TASK_NAME
) {
153 name
= this.info
.taskFunctions
[1]
155 if (!this.taskFunctionsUsage
.has(name
)) {
156 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
158 return this.taskFunctionsUsage
.get(name
)
161 private async startOnEmptyQueue (): Promise
<void> {
163 this.onEmptyQueueCount
> 0 &&
164 (this.usage
.tasks
.executing
> 0 || this.tasksQueue
.size
> 0)
166 this.onEmptyQueueCount
= 0
169 (this.onEmptyQueue
as WorkerNodeEventCallback
)(this.info
.id
as number)
170 ++this.onEmptyQueueCount
171 await sleep(exponentialDelay(this.onEmptyQueueCount
))
172 await this.startOnEmptyQueue()
175 private initWorkerInfo (worker
: Worker
): WorkerInfo
{
177 id
: getWorkerId(worker
),
178 type: getWorkerType(worker
) as WorkerType
,
184 private initWorkerUsage (): WorkerUsage
{
185 const getTasksQueueSize
= (): number => {
186 return this.tasksQueue
.size
188 const getTasksQueueMaxSize
= (): number => {
189 return this.tasksQueue
.maxSize
195 get
queued (): number {
196 return getTasksQueueSize()
198 get
maxQueued (): number {
199 return getTasksQueueMaxSize()
205 history
: new CircularArray()
208 history
: new CircularArray()
212 history
: new CircularArray()
215 history
: new CircularArray()
221 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
222 const getTaskFunctionQueueSize
= (): number => {
223 let taskFunctionQueueSize
= 0
224 for (const task
of this.tasksQueue
) {
226 (task
.name
=== DEFAULT_TASK_NAME
&&
227 name
=== (this.info
.taskFunctions
as string[])[1]) ||
228 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
230 ++taskFunctionQueueSize
233 return taskFunctionQueueSize
239 get
queued (): number {
240 return getTaskFunctionQueueSize()
246 history
: new CircularArray()
249 history
: new CircularArray()
253 history
: new CircularArray()
256 history
: new CircularArray()
262 private checkWorkerNodeArguments (
264 tasksQueueBackPressureSize
: number
266 if (worker
== null) {
267 throw new TypeError('Cannot construct a worker node without a worker')
269 if (tasksQueueBackPressureSize
== null) {
271 'Cannot construct a worker node without a tasks queue back pressure size'
274 if (!Number.isSafeInteger(tasksQueueBackPressureSize
)) {
276 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
279 if (tasksQueueBackPressureSize
<= 0) {
280 throw new RangeError(
281 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'