1 import { MessageChannel
} from
'node:worker_threads'
2 import { CircularArray
} from
'../circular-array'
3 import type { Task
} from
'../utility-types'
4 import { DEFAULT_TASK_NAME
, getWorkerId
, getWorkerType
} from
'../utils'
5 import { Deque
} from
'../deque'
11 type WorkerNodeEventDetail
,
16 import { checkWorkerNodeArguments
} from
'./utils'
21 * @typeParam Worker - Type of worker.
22 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
24 export class WorkerNode
<Worker
extends IWorker
, Data
= unknown
>
26 implements IWorkerNode
<Worker
, Data
> {
28 public readonly worker
: Worker
30 public readonly info
: WorkerInfo
32 public usage
: WorkerUsage
34 public strategyData
?: StrategyData
36 public messageChannel
?: MessageChannel
38 public tasksQueueBackPressureSize
: number
39 private readonly tasksQueue
: Deque
<Task
<Data
>>
40 private onBackPressureStarted
: boolean
41 private readonly taskFunctionsUsage
: Map
<string, WorkerUsage
>
44 * Constructs a new worker node.
46 * @param worker - The worker.
47 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
49 constructor (worker
: Worker
, tasksQueueBackPressureSize
: number) {
51 checkWorkerNodeArguments
<Worker
>(worker
, tasksQueueBackPressureSize
)
53 this.info
= this.initWorkerInfo(worker
)
54 this.usage
= this.initWorkerUsage()
55 if (this.info
.type === WorkerTypes
.thread
) {
56 this.messageChannel
= new MessageChannel()
58 this.tasksQueueBackPressureSize
= tasksQueueBackPressureSize
59 this.tasksQueue
= new Deque
<Task
<Data
>>()
60 this.onBackPressureStarted
= false
61 this.taskFunctionsUsage
= new Map
<string, WorkerUsage
>()
65 public tasksQueueSize (): number {
66 return this.tasksQueue
.size
70 public enqueueTask (task
: Task
<Data
>): number {
71 const tasksQueueSize
= this.tasksQueue
.push(task
)
72 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
73 this.onBackPressureStarted
= true
75 new CustomEvent
<WorkerNodeEventDetail
>('backPressure', {
76 detail
: { workerId
: this.info
.id
as number }
79 this.onBackPressureStarted
= false
85 public unshiftTask (task
: Task
<Data
>): number {
86 const tasksQueueSize
= this.tasksQueue
.unshift(task
)
87 if (this.hasBackPressure() && !this.onBackPressureStarted
) {
88 this.onBackPressureStarted
= true
90 new CustomEvent
<WorkerNodeEventDetail
>('backPressure', {
91 detail
: { workerId
: this.info
.id
as number }
94 this.onBackPressureStarted
= false
100 public dequeueTask (): Task
<Data
> | undefined {
101 return this.tasksQueue
.shift()
105 public popTask (): Task
<Data
> | undefined {
106 return this.tasksQueue
.pop()
110 public clearTasksQueue (): void {
111 this.tasksQueue
.clear()
115 public hasBackPressure (): boolean {
116 return this.tasksQueue
.size
>= this.tasksQueueBackPressureSize
120 public resetUsage (): void {
121 this.usage
= this.initWorkerUsage()
122 this.taskFunctionsUsage
.clear()
126 public closeChannel (): void {
127 if (this.messageChannel
!= null) {
128 this.messageChannel
.port1
.unref()
129 this.messageChannel
.port2
.unref()
130 this.messageChannel
.port1
.close()
131 this.messageChannel
.port2
.close()
132 delete this.messageChannel
137 public getTaskFunctionWorkerUsage (name
: string): WorkerUsage
| undefined {
138 if (!Array.isArray(this.info
.taskFunctionNames
)) {
140 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
144 Array.isArray(this.info
.taskFunctionNames
) &&
145 this.info
.taskFunctionNames
.length
< 3
148 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
151 if (name
=== DEFAULT_TASK_NAME
) {
152 name
= this.info
.taskFunctionNames
[1]
154 if (!this.taskFunctionsUsage
.has(name
)) {
155 this.taskFunctionsUsage
.set(name
, this.initTaskFunctionWorkerUsage(name
))
157 return this.taskFunctionsUsage
.get(name
)
161 public deleteTaskFunctionWorkerUsage (name
: string): boolean {
162 return this.taskFunctionsUsage
.delete(name
)
165 private initWorkerInfo (worker
: Worker
): WorkerInfo
{
167 id
: getWorkerId(worker
),
168 type: getWorkerType(worker
) as WorkerType
,
174 private initWorkerUsage (): WorkerUsage
{
175 const getTasksQueueSize
= (): number => {
176 return this.tasksQueue
.size
178 const getTasksQueueMaxSize
= (): number => {
179 return this.tasksQueue
.maxSize
185 get
queued (): number {
186 return getTasksQueueSize()
188 get
maxQueued (): number {
189 return getTasksQueueMaxSize()
191 sequentiallyStolen
: 0,
196 history
: new CircularArray
<number>()
199 history
: new CircularArray
<number>()
203 history
: new CircularArray
<number>()
206 history
: new CircularArray
<number>()
212 private initTaskFunctionWorkerUsage (name
: string): WorkerUsage
{
213 const getTaskFunctionQueueSize
= (): number => {
214 let taskFunctionQueueSize
= 0
215 for (const task
of this.tasksQueue
) {
217 (task
.name
=== DEFAULT_TASK_NAME
&&
218 name
=== (this.info
.taskFunctionNames
as string[])[1]) ||
219 (task
.name
!== DEFAULT_TASK_NAME
&& name
=== task
.name
)
221 ++taskFunctionQueueSize
224 return taskFunctionQueueSize
230 get
queued (): number {
231 return getTaskFunctionQueueSize()
233 sequentiallyStolen
: 0,
238 history
: new CircularArray
<number>()
241 history
: new CircularArray
<number>()
245 history
: new CircularArray
<number>()
248 history
: new CircularArray
<number>()