1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import type { Task
} from
'../utility-types'
12 import { Deque
} from
'../deque'
18 type WorkerNodeEventDetail
,
23 import { checkWorkerNodeArguments
} from
'./utils'
28 * @typeParam Worker - Type of worker.
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
33 implements IWorkerNode
<Worker
, Data
> {
35 public readonly worker
: Worker
37 public readonly info
: WorkerInfo
39 public usage
: WorkerUsage
41 public strategyData
?: StrategyData
43 public messageChannel
?: MessageChannel
45 public tasksQueueBackPressureSize
: number
46 private readonly tasksQueue
: Deque
<Task
<Data
>>
47 private onBackPressureStarted
: boolean
48 private onEmptyQueueCount
: number
49 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
52 * Constructs a new worker node.
54 * @param worker - The worker.
55 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
57 constructor (worker
: Worker
, tasksQueueBackPressureSize
: number) {
59 checkWorkerNodeArguments
<Worker
>(worker
, tasksQueueBackPressureSize
)
61 this.info
= this.initWorkerInfo(worker
)
62 this.usage
= this.initWorkerUsage()
63 if (this.info
.type === WorkerTypes
.thread
) {
64 this.messageChannel
= new MessageChannel()
66 this.tasksQueueBackPressureSize
= tasksQueueBackPressureSize
67 this.tasksQueue
= new Deque
<Task
<Data
>>()
68 this.onBackPressureStarted
= false
69 this.onEmptyQueueCount
= 0
70 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
74 public tasksQueueSize (): number {
75 return this.tasksQueue
.size
79 public enqueueTask (task
: Task
<Data
>): number {
80 const tasksQueueSize
= this.tasksQueue
.push(task
)
81 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
82 this.onBackPressureStarted
= true
84 new CustomEvent
<WorkerNodeEventDetail
>('backpressure', {
85 detail
: { workerId
: this.info
.id
as number }
88 this.onBackPressureStarted
= false
94 public unshiftTask (task
: Task
<Data
>): number {
95 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
96 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
97 this.onBackPressureStarted
= true
99 new CustomEvent
<WorkerNodeEventDetail
>('backpressure', {
100 detail
: { workerId
: this.info
.id
as number }
103 this.onBackPressureStarted
= false
105 return tasksQueueSize
109 public dequeueTask (): Task
<Data
> | undefined {
110 const task
= this.tasksQueue
.shift()
111 if (this.tasksQueue
.size
=== 0 && this.onEmptyQueueCount
=== 0) {
112 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
118 public popTask (): Task
<Data
> | undefined {
119 const task
= this.tasksQueue
.pop()
120 if (this.tasksQueue
.size
=== 0 && this.onEmptyQueueCount
=== 0) {
121 this.startOnEmptyQueue().catch(EMPTY_FUNCTION
)
127 public clearTasksQueue (): void {
128 this.tasksQueue
.clear()
132 public hasBackPressure (): boolean {
133 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
137 public resetUsage (): void {
138 this.usage
= this.initWorkerUsage()
139 this.taskFunctionsUsage
.clear()
143 public closeChannel (): void {
144 if (this.messageChannel
!= null) {
145 this.messageChannel
.port1
.unref()
146 this.messageChannel
.port2
.unref()
147 this.messageChannel
.port1
.close()
148 this.messageChannel
.port2
.close()
149 delete this.messageChannel
154 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
155 if (!Array.isArray(this.info
.taskFunctionNames
)) {
157 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
161 Array.isArray(this.info
.taskFunctionNames
) &&
162 this.info
.taskFunctionNames
.length
< 3
165 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
168 if (name
=== DEFAULT_TASK_NAME
) {
169 name
= this.info
.taskFunctionNames
[1]
171 if (!this.taskFunctionsUsage
.has(name
)) {
172 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
174 return this.taskFunctionsUsage
.get(name
)
178 public deleteTaskFunctionWorkerUsage (name
: string): boolean {
179 return this.taskFunctionsUsage
.delete(name
)
182 private async startOnEmptyQueue (): Promise
<void> {
184 this.onEmptyQueueCount
> 0 &&
185 (this.usage
.tasks
.executing
> 0 || this.tasksQueue
.size
> 0)
187 this.onEmptyQueueCount
= 0
190 ++this.onEmptyQueueCount
192 new CustomEvent
<WorkerNodeEventDetail
>('emptyqueue', {
193 detail
: { workerId
: this.info
.id
as number }
196 await sleep(exponentialDelay(this.onEmptyQueueCount
))
197 await this.startOnEmptyQueue()
200 private initWorkerInfo (worker
: Worker
): WorkerInfo
{
202 id
: getWorkerId(worker
),
203 type: getWorkerType(worker
) as WorkerType
,
209 private initWorkerUsage (): WorkerUsage
{
210 const getTasksQueueSize
= (): number => {
211 return this.tasksQueue
.size
213 const getTasksQueueMaxSize
= (): number => {
214 return this.tasksQueue
.maxSize
220 get
queued (): number {
221 return getTasksQueueSize()
223 get
maxQueued (): number {
224 return getTasksQueueMaxSize()
230 history
: new CircularArray
<number>()
233 history
: new CircularArray
<number>()
237 history
: new CircularArray
<number>()
240 history
: new CircularArray
<number>()
246 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
247 const getTaskFunctionQueueSize
= (): number => {
248 let taskFunctionQueueSize
= 0
249 for (const task
of this.tasksQueue
) {
251 (task
.name
=== DEFAULT_TASK_NAME
&&
252 name
=== (this.info
.taskFunctionNames
as string[])[1]) ||
253 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
255 ++taskFunctionQueueSize
258 return taskFunctionQueueSize
264 get
queued (): number {
265 return getTaskFunctionQueueSize()
271 history
: new CircularArray
<number>()
274 history
: new CircularArray
<number>()
278 history
: new CircularArray
<number>()
281 history
: new CircularArray
<number>()