1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import type { Task
} from
'../utility-types'
12 import { Deque
} from
'../deque'
14 type BackPressureCallback
,
15 type EmptyQueueCallback
,
27 * @typeParam Worker - Type of worker.
28 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
31 implements IWorkerNode
<Worker
, Data
> {
33 public readonly worker
: Worker
35 public readonly info
: WorkerInfo
37 public usage
: WorkerUsage
39 public messageChannel
?: MessageChannel
41 public tasksQueueBackPressureSize
: number
43 public onBackPressure
?: BackPressureCallback
45 public onEmptyQueue
?: EmptyQueueCallback
46 private readonly tasksQueue
: Deque
<Task
<Data
>>
47 private onEmptyQueueCount
: number
48 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
51 * Constructs a new worker node.
53 * @param worker - The worker.
54 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
56 constructor (worker
: Worker
, tasksQueueBackPressureSize
: number) {
58 throw new TypeError('Cannot construct a worker node without a worker')
61 if (tasksQueueBackPressureSize
== null) {
63 'Cannot construct a worker node without a tasks queue back pressure size'
66 if (!Number.isSafeInteger(tasksQueueBackPressureSize
)) {
68 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
72 this.info
= this.initWorkerInfo(worker
)
73 this.usage
= this.initWorkerUsage()
74 if (this.info
.type === WorkerTypes
.thread
) {
75 this.messageChannel
= new MessageChannel()
77 this.tasksQueueBackPressureSize
= tasksQueueBackPressureSize
78 this.tasksQueue
= new Deque
<Task
<Data
>>()
79 this.onEmptyQueueCount
= 0
80 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
84 public tasksQueueSize (): number {
85 return this.tasksQueue
.size
89 public enqueueTask (task
: Task
<Data
>): number {
90 const tasksQueueSize
= this.tasksQueue
.push(task
)
91 if (this.onBackPressure
!= null && this.hasBackPressure()) {
92 this.onBackPressure(this.info
.id
as number)
98 public unshiftTask (task
: Task
<Data
>): number {
99 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
100 if (this.onBackPressure
!= null && this.hasBackPressure()) {
101 this.onBackPressure(this.info
.id
as number)
103 return tasksQueueSize
107 public dequeueTask (): Task
<Data
> | undefined {
108 const task
= this.tasksQueue
.shift()
109 if (this.onEmptyQueue
!= null && this.tasksQueue
.size
=== 0) {
110 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
116 public popTask (): Task
<Data
> | undefined {
117 const task
= this.tasksQueue
.pop()
118 if (this.onEmptyQueue
!= null && this.tasksQueue
.size
=== 0) {
119 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
125 public clearTasksQueue (): void {
126 this.tasksQueue
.clear()
130 public hasBackPressure (): boolean {
131 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
135 public resetUsage (): void {
136 this.usage
= this.initWorkerUsage()
137 this.taskFunctionsUsage
.clear()
141 public closeChannel (): void {
142 if (this.messageChannel
!= null) {
143 this.messageChannel
?.port1
.unref()
144 this.messageChannel
?.port2
.unref()
145 this.messageChannel
?.port1
.close()
146 this.messageChannel
?.port2
.close()
147 delete this.messageChannel
152 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
153 if (!Array.isArray(this.info
.taskFunctions
)) {
155 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
159 Array.isArray(this.info
.taskFunctions
) &&
160 this.info
.taskFunctions
.length
< 3
163 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
166 if (name
=== DEFAULT_TASK_NAME
) {
167 name
= this.info
.taskFunctions
[1]
169 if (!this.taskFunctionsUsage
.has(name
)) {
170 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
172 return this.taskFunctionsUsage
.get(name
)
175 private async startOnEmptyQueue (): Promise
<void> {
177 this.onEmptyQueueCount
> 0 &&
178 (this.usage
.tasks
.executing
> 0 || this.tasksQueue
.size
> 0)
180 this.onEmptyQueueCount
= 0
183 (this.onEmptyQueue
as EmptyQueueCallback
)(this.info
.id
as number)
184 ++this.onEmptyQueueCount
185 await sleep(exponentialDelay(this.onEmptyQueueCount
))
186 await this.startOnEmptyQueue()
189 private initWorkerInfo (worker
: Worker
): WorkerInfo
{
191 id
: getWorkerId(worker
),
192 type: getWorkerType(worker
) as WorkerType
,
198 private initWorkerUsage (): WorkerUsage
{
199 const getTasksQueueSize
= (): number => {
200 return this.tasksQueue
.size
202 const getTasksQueueMaxSize
= (): number => {
203 return this.tasksQueue
.maxSize
209 get
queued (): number {
210 return getTasksQueueSize()
212 get
maxQueued (): number {
213 return getTasksQueueMaxSize()
219 history
: new CircularArray()
222 history
: new CircularArray()
226 history
: new CircularArray()
229 history
: new CircularArray()
235 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
236 const getTaskFunctionQueueSize
= (): number => {
237 let taskFunctionQueueSize
= 0
238 for (const task
of this.tasksQueue
) {
240 (task
.name
=== DEFAULT_TASK_NAME
&&
241 name
=== (this.info
.taskFunctions
as string[])[1]) ||
242 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
244 ++taskFunctionQueueSize
247 return taskFunctionQueueSize
253 get
queued (): number {
254 return getTaskFunctionQueueSize()
260 history
: new CircularArray()
263 history
: new CircularArray()
267 history
: new CircularArray()
270 history
: new CircularArray()