build(deps): bump poolifier
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48 2import { CircularArray } from '../circular-array'
5c4d16da 3import type { Task } from '../utility-types'
68cbdc84
JB
4import {
5 DEFAULT_TASK_NAME,
6 EMPTY_FUNCTION,
7 exponentialDelay,
75de9f41
JB
8 getWorkerId,
9 getWorkerType,
68cbdc84
JB
10 sleep
11} from '../utils'
574b351d 12import { Deque } from '../deque'
4b628b48
JB
13import {
14 type IWorker,
15 type IWorkerNode,
f3a91bac 16 type StrategyData,
4b628b48 17 type WorkerInfo,
a9780ad2 18 type WorkerNodeEventCallback,
4b628b48
JB
19 type WorkerType,
20 WorkerTypes,
21 type WorkerUsage
22} from './worker'
9a38f99e 23import { checkWorkerNodeArguments } from './utils'
4b628b48 24
60664f48
JB
25/**
26 * Worker node.
27 *
28 * @typeParam Worker - Type of worker.
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 */
4b628b48
JB
31export class WorkerNode<Worker extends IWorker, Data = unknown>
32implements IWorkerNode<Worker, Data> {
671d5154 33 /** @inheritdoc */
4b628b48 34 public readonly worker: Worker
671d5154 35 /** @inheritdoc */
4b628b48 36 public readonly info: WorkerInfo
671d5154 37 /** @inheritdoc */
4b628b48 38 public usage: WorkerUsage
20c6f652 39 /** @inheritdoc */
f3a91bac
JB
40 public strategyData?: StrategyData
41 /** @inheritdoc */
26fb3c18
JB
42 public messageChannel?: MessageChannel
43 /** @inheritdoc */
20c6f652 44 public tasksQueueBackPressureSize: number
72695f86 45 /** @inheritdoc */
a9780ad2 46 public onBackPressure?: WorkerNodeEventCallback
dd951876 47 /** @inheritdoc */
a9780ad2 48 public onEmptyQueue?: WorkerNodeEventCallback
574b351d 49 private readonly tasksQueue: Deque<Task<Data>>
47352846 50 private onBackPressureStarted: boolean
68cbdc84 51 private onEmptyQueueCount: number
26fb3c18 52 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 53
60664f48
JB
54 /**
55 * Constructs a new worker node.
56 *
57 * @param worker - The worker.
20c6f652 58 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
60664f48 59 */
75de9f41 60 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
9a38f99e 61 checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
4b628b48 62 this.worker = worker
75de9f41 63 this.info = this.initWorkerInfo(worker)
26fb3c18 64 this.usage = this.initWorkerUsage()
75de9f41 65 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
66 this.messageChannel = new MessageChannel()
67 }
20c6f652 68 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
26fb3c18 69 this.tasksQueue = new Deque<Task<Data>>()
47352846 70 this.onBackPressureStarted = false
68cbdc84 71 this.onEmptyQueueCount = 0
26fb3c18 72 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
73 }
74
75 /** @inheritdoc */
76 public tasksQueueSize (): number {
77 return this.tasksQueue.size
78 }
79
4b628b48
JB
80 /** @inheritdoc */
81 public enqueueTask (task: Task<Data>): number {
72695f86 82 const tasksQueueSize = this.tasksQueue.push(task)
47352846
JB
83 if (
84 this.onBackPressure != null &&
85 this.hasBackPressure() &&
86 !this.onBackPressureStarted
87 ) {
88 this.onBackPressureStarted = true
0741fbeb 89 this.onBackPressure(this.info.id as number)
47352846 90 this.onBackPressureStarted = false
72695f86
JB
91 }
92 return tasksQueueSize
93 }
94
95 /** @inheritdoc */
96 public unshiftTask (task: Task<Data>): number {
97 const tasksQueueSize = this.tasksQueue.unshift(task)
47352846
JB
98 if (
99 this.onBackPressure != null &&
100 this.hasBackPressure() &&
101 !this.onBackPressureStarted
102 ) {
103 this.onBackPressureStarted = true
0741fbeb 104 this.onBackPressure(this.info.id as number)
47352846 105 this.onBackPressureStarted = false
72695f86
JB
106 }
107 return tasksQueueSize
4b628b48
JB
108 }
109
110 /** @inheritdoc */
111 public dequeueTask (): Task<Data> | undefined {
dd951876 112 const task = this.tasksQueue.shift()
47352846
JB
113 if (
114 this.onEmptyQueue != null &&
115 this.tasksQueue.size === 0 &&
116 this.onEmptyQueueCount === 0
117 ) {
68cbdc84 118 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
dd951876
JB
119 }
120 return task
4b628b48
JB
121 }
122
72695f86
JB
123 /** @inheritdoc */
124 public popTask (): Task<Data> | undefined {
dd951876 125 const task = this.tasksQueue.pop()
47352846
JB
126 if (
127 this.onEmptyQueue != null &&
128 this.tasksQueue.size === 0 &&
129 this.onEmptyQueueCount === 0
130 ) {
68cbdc84 131 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
dd951876
JB
132 }
133 return task
72695f86
JB
134 }
135
4b628b48
JB
136 /** @inheritdoc */
137 public clearTasksQueue (): void {
138 this.tasksQueue.clear()
139 }
140
671d5154
JB
141 /** @inheritdoc */
142 public hasBackPressure (): boolean {
8735b4e5 143 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
144 }
145
ff128cc9 146 /** @inheritdoc */
4b628b48
JB
147 public resetUsage (): void {
148 this.usage = this.initWorkerUsage()
db0e38ee 149 this.taskFunctionsUsage.clear()
ff128cc9
JB
150 }
151
3f09ed9f
JB
152 /** @inheritdoc */
153 public closeChannel (): void {
7884d183
JB
154 if (this.messageChannel != null) {
155 this.messageChannel?.port1.unref()
156 this.messageChannel?.port2.unref()
157 this.messageChannel?.port1.close()
158 this.messageChannel?.port2.close()
159 delete this.messageChannel
3f09ed9f
JB
160 }
161 }
162
ff128cc9 163 /** @inheritdoc */
db0e38ee 164 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 165 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 166 throw new Error(
db0e38ee 167 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
168 )
169 }
b558f6b5 170 if (
6703b9f4
JB
171 Array.isArray(this.info.taskFunctionNames) &&
172 this.info.taskFunctionNames.length < 3
b558f6b5 173 ) {
db0e38ee
JB
174 throw new Error(
175 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
176 )
177 }
178 if (name === DEFAULT_TASK_NAME) {
6703b9f4 179 name = this.info.taskFunctionNames[1]
b558f6b5 180 }
db0e38ee
JB
181 if (!this.taskFunctionsUsage.has(name)) {
182 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 183 }
db0e38ee 184 return this.taskFunctionsUsage.get(name)
4b628b48
JB
185 }
186
adee6053
JB
187 /** @inheritdoc */
188 public deleteTaskFunctionWorkerUsage (name: string): boolean {
189 return this.taskFunctionsUsage.delete(name)
190 }
191
68cbdc84 192 private async startOnEmptyQueue (): Promise<void> {
1f0766e7
JB
193 if (
194 this.onEmptyQueueCount > 0 &&
79b197bb 195 (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
1f0766e7 196 ) {
60b7a7cc
JB
197 this.onEmptyQueueCount = 0
198 return
199 }
60b7a7cc 200 ++this.onEmptyQueueCount
47352846 201 this.onEmptyQueue?.(this.info.id as number)
60b7a7cc
JB
202 await sleep(exponentialDelay(this.onEmptyQueueCount))
203 await this.startOnEmptyQueue()
68cbdc84
JB
204 }
205
75de9f41 206 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 207 return {
75de9f41
JB
208 id: getWorkerId(worker),
209 type: getWorkerType(worker) as WorkerType,
4b628b48 210 dynamic: false,
7884d183 211 ready: false
4b628b48
JB
212 }
213 }
214
215 private initWorkerUsage (): WorkerUsage {
216 const getTasksQueueSize = (): number => {
dd951876 217 return this.tasksQueue.size
4b628b48 218 }
bf4ef2ca 219 const getTasksQueueMaxSize = (): number => {
dd951876 220 return this.tasksQueue.maxSize
4b628b48
JB
221 }
222 return {
223 tasks: {
224 executed: 0,
225 executing: 0,
226 get queued (): number {
227 return getTasksQueueSize()
228 },
229 get maxQueued (): number {
bf4ef2ca 230 return getTasksQueueMaxSize()
4b628b48 231 },
68cbdc84 232 stolen: 0,
4b628b48
JB
233 failed: 0
234 },
235 runTime: {
c52475b8 236 history: new CircularArray<number>()
4b628b48
JB
237 },
238 waitTime: {
c52475b8 239 history: new CircularArray<number>()
4b628b48
JB
240 },
241 elu: {
242 idle: {
c52475b8 243 history: new CircularArray<number>()
4b628b48
JB
244 },
245 active: {
c52475b8 246 history: new CircularArray<number>()
4b628b48
JB
247 }
248 }
249 }
250 }
251
db0e38ee 252 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
253 const getTaskFunctionQueueSize = (): number => {
254 let taskFunctionQueueSize = 0
b25a42cd 255 for (const task of this.tasksQueue) {
dd92a715 256 if (
e5ece61d 257 (task.name === DEFAULT_TASK_NAME &&
6703b9f4 258 name === (this.info.taskFunctionNames as string[])[1]) ||
e5ece61d 259 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 260 ) {
e5ece61d 261 ++taskFunctionQueueSize
b25a42cd
JB
262 }
263 }
e5ece61d 264 return taskFunctionQueueSize
b25a42cd
JB
265 }
266 return {
267 tasks: {
268 executed: 0,
269 executing: 0,
270 get queued (): number {
e5ece61d 271 return getTaskFunctionQueueSize()
b25a42cd 272 },
68cbdc84 273 stolen: 0,
b25a42cd
JB
274 failed: 0
275 },
276 runTime: {
c52475b8 277 history: new CircularArray<number>()
b25a42cd
JB
278 },
279 waitTime: {
c52475b8 280 history: new CircularArray<number>()
b25a42cd
JB
281 },
282 elu: {
283 idle: {
c52475b8 284 history: new CircularArray<number>()
b25a42cd
JB
285 },
286 active: {
c52475b8 287 history: new CircularArray<number>()
b25a42cd
JB
288 }
289 }
290 }
291 }
4b628b48 292}