1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import type { Task
} from
'../utility-types'
12 import { Deque
} from
'../deque'
18 type WorkerNodeEventCallback
,
23 import { checkWorkerNodeArguments
} from
'./utils'
28 * @typeParam Worker - Type of worker.
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
32 implements IWorkerNode
<Worker
, Data
> {
34 public readonly worker
: Worker
36 public readonly info
: WorkerInfo
38 public usage
: WorkerUsage
40 public strategyData
?: StrategyData
42 public messageChannel
?: MessageChannel
44 public tasksQueueBackPressureSize
: number
46 public onBackPressure
?: WorkerNodeEventCallback
48 public onEmptyQueue
?: WorkerNodeEventCallback
49 private readonly tasksQueue
: Deque
<Task
<Data
>>
50 private onBackPressureStarted
: boolean
51 private onEmptyQueueCount
: number
52 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
55 * Constructs a new worker node.
57 * @param worker - The worker.
58 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
60 constructor (worker
: Worker
, tasksQueueBackPressureSize
: number) {
61 checkWorkerNodeArguments
<Worker
>(worker
, tasksQueueBackPressureSize
)
63 this.info
= this.initWorkerInfo(worker
)
64 this.usage
= this.initWorkerUsage()
65 if (this.info
.type === WorkerTypes
.thread
) {
66 this.messageChannel
= new MessageChannel()
68 this.tasksQueueBackPressureSize
= tasksQueueBackPressureSize
69 this.tasksQueue
= new Deque
<Task
<Data
>>()
70 this.onBackPressureStarted
= false
71 this.onEmptyQueueCount
= 0
72 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
76 public tasksQueueSize (): number {
77 return this.tasksQueue
.size
81 public enqueueTask (task
: Task
<Data
>): number {
82 const tasksQueueSize
= this.tasksQueue
.push(task
)
84 this.onBackPressure
!= null &&
85 this.hasBackPressure() &&
86 !this.onBackPressureStarted
88 this.onBackPressureStarted
= true
89 this.onBackPressure(this.info
.id
as number)
90 this.onBackPressureStarted
= false
96 public unshiftTask (task
: Task
<Data
>): number {
97 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
99 this.onBackPressure
!= null &&
100 this.hasBackPressure() &&
101 !this.onBackPressureStarted
103 this.onBackPressureStarted
= true
104 this.onBackPressure(this.info
.id
as number)
105 this.onBackPressureStarted
= false
107 return tasksQueueSize
111 public dequeueTask (): Task
<Data
> | undefined {
112 const task
= this.tasksQueue
.shift()
114 this.onEmptyQueue
!= null &&
115 this.tasksQueue
.size
=== 0 &&
116 this.onEmptyQueueCount
=== 0
118 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
124 public popTask (): Task
<Data
> | undefined {
125 const task
= this.tasksQueue
.pop()
127 this.onEmptyQueue
!= null &&
128 this.tasksQueue
.size
=== 0 &&
129 this.onEmptyQueueCount
=== 0
131 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
137 public clearTasksQueue (): void {
138 this.tasksQueue
.clear()
142 public hasBackPressure (): boolean {
143 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
147 public resetUsage (): void {
148 this.usage
= this.initWorkerUsage()
149 this.taskFunctionsUsage
.clear()
153 public closeChannel (): void {
154 if (this.messageChannel
!= null) {
155 this.messageChannel
?.port1
.unref()
156 this.messageChannel
?.port2
.unref()
157 this.messageChannel
?.port1
.close()
158 this.messageChannel
?.port2
.close()
159 delete this.messageChannel
164 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
165 if (!Array.isArray(this.info
.taskFunctionNames
)) {
167 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
171 Array.isArray(this.info
.taskFunctionNames
) &&
172 this.info
.taskFunctionNames
.length
< 3
175 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
178 if (name
=== DEFAULT_TASK_NAME
) {
179 name
= this.info
.taskFunctionNames
[1]
181 if (!this.taskFunctionsUsage
.has(name
)) {
182 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
184 return this.taskFunctionsUsage
.get(name
)
188 public deleteTaskFunctionWorkerUsage (name
: string): boolean {
189 return this.taskFunctionsUsage
.delete(name
)
192 private async startOnEmptyQueue (): Promise
<void> {
194 this.onEmptyQueueCount
> 0 &&
195 (this.usage
.tasks
.executing
> 0 || this.tasksQueue
.size
> 0)
197 this.onEmptyQueueCount
= 0
200 ++this.onEmptyQueueCount
201 this.onEmptyQueue
?.(this.info
.id
as number)
202 await sleep(exponentialDelay(this.onEmptyQueueCount
))
203 await this.startOnEmptyQueue()
206 private initWorkerInfo (worker
: Worker
): WorkerInfo
{
208 id
: getWorkerId(worker
),
209 type: getWorkerType(worker
) as WorkerType
,
215 private initWorkerUsage (): WorkerUsage
{
216 const getTasksQueueSize
= (): number => {
217 return this.tasksQueue
.size
219 const getTasksQueueMaxSize
= (): number => {
220 return this.tasksQueue
.maxSize
226 get
queued (): number {
227 return getTasksQueueSize()
229 get
maxQueued (): number {
230 return getTasksQueueMaxSize()
236 history
: new CircularArray
<number>()
239 history
: new CircularArray
<number>()
243 history
: new CircularArray
<number>()
246 history
: new CircularArray
<number>()
252 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
253 const getTaskFunctionQueueSize
= (): number => {
254 let taskFunctionQueueSize
= 0
255 for (const task
of this.tasksQueue
) {
257 (task
.name
=== DEFAULT_TASK_NAME
&&
258 name
=== (this.info
.taskFunctionNames
as string[])[1]) ||
259 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
261 ++taskFunctionQueueSize
264 return taskFunctionQueueSize
270 get
queued (): number {
271 return getTaskFunctionQueueSize()
277 history
: new CircularArray
<number>()
280 history
: new CircularArray
<number>()
284 history
: new CircularArray
<number>()
287 history
: new CircularArray
<number>()