Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48 2import { CircularArray } from '../circular-array'
5c4d16da 3import type { Task } from '../utility-types'
68cbdc84
JB
4import {
5 DEFAULT_TASK_NAME,
6 EMPTY_FUNCTION,
7 exponentialDelay,
75de9f41
JB
8 getWorkerId,
9 getWorkerType,
68cbdc84
JB
10 sleep
11} from '../utils'
574b351d 12import { Deque } from '../deque'
4b628b48
JB
13import {
14 type IWorker,
15 type IWorkerNode,
f3a91bac 16 type StrategyData,
4b628b48 17 type WorkerInfo,
a9780ad2 18 type WorkerNodeEventCallback,
4b628b48
JB
19 type WorkerType,
20 WorkerTypes,
21 type WorkerUsage
22} from './worker'
23
60664f48
JB
24/**
25 * Worker node.
26 *
27 * @typeParam Worker - Type of worker.
28 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
29 */
4b628b48
JB
30export class WorkerNode<Worker extends IWorker, Data = unknown>
31implements IWorkerNode<Worker, Data> {
671d5154 32 /** @inheritdoc */
4b628b48 33 public readonly worker: Worker
671d5154 34 /** @inheritdoc */
4b628b48 35 public readonly info: WorkerInfo
671d5154 36 /** @inheritdoc */
4b628b48 37 public usage: WorkerUsage
20c6f652 38 /** @inheritdoc */
f3a91bac
JB
39 public strategyData?: StrategyData
40 /** @inheritdoc */
26fb3c18
JB
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
20c6f652 43 public tasksQueueBackPressureSize: number
72695f86 44 /** @inheritdoc */
a9780ad2 45 public onBackPressure?: WorkerNodeEventCallback
dd951876 46 /** @inheritdoc */
a9780ad2 47 public onEmptyQueue?: WorkerNodeEventCallback
574b351d 48 private readonly tasksQueue: Deque<Task<Data>>
47352846 49 private onBackPressureStarted: boolean
68cbdc84 50 private onEmptyQueueCount: number
26fb3c18 51 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 52
60664f48
JB
53 /**
54 * Constructs a new worker node.
55 *
56 * @param worker - The worker.
20c6f652 57 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
60664f48 58 */
75de9f41 59 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
5b49e864 60 this.checkWorkerNodeArguments(worker, tasksQueueBackPressureSize)
4b628b48 61 this.worker = worker
75de9f41 62 this.info = this.initWorkerInfo(worker)
26fb3c18 63 this.usage = this.initWorkerUsage()
75de9f41 64 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
65 this.messageChannel = new MessageChannel()
66 }
20c6f652 67 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
26fb3c18 68 this.tasksQueue = new Deque<Task<Data>>()
47352846 69 this.onBackPressureStarted = false
68cbdc84 70 this.onEmptyQueueCount = 0
26fb3c18 71 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
72 }
73
74 /** @inheritdoc */
75 public tasksQueueSize (): number {
76 return this.tasksQueue.size
77 }
78
4b628b48
JB
79 /** @inheritdoc */
80 public enqueueTask (task: Task<Data>): number {
72695f86 81 const tasksQueueSize = this.tasksQueue.push(task)
47352846
JB
82 if (
83 this.onBackPressure != null &&
84 this.hasBackPressure() &&
85 !this.onBackPressureStarted
86 ) {
87 this.onBackPressureStarted = true
0741fbeb 88 this.onBackPressure(this.info.id as number)
47352846 89 this.onBackPressureStarted = false
72695f86
JB
90 }
91 return tasksQueueSize
92 }
93
94 /** @inheritdoc */
95 public unshiftTask (task: Task<Data>): number {
96 const tasksQueueSize = this.tasksQueue.unshift(task)
47352846
JB
97 if (
98 this.onBackPressure != null &&
99 this.hasBackPressure() &&
100 !this.onBackPressureStarted
101 ) {
102 this.onBackPressureStarted = true
0741fbeb 103 this.onBackPressure(this.info.id as number)
47352846 104 this.onBackPressureStarted = false
72695f86
JB
105 }
106 return tasksQueueSize
4b628b48
JB
107 }
108
109 /** @inheritdoc */
110 public dequeueTask (): Task<Data> | undefined {
dd951876 111 const task = this.tasksQueue.shift()
47352846
JB
112 if (
113 this.onEmptyQueue != null &&
114 this.tasksQueue.size === 0 &&
115 this.onEmptyQueueCount === 0
116 ) {
68cbdc84 117 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
dd951876
JB
118 }
119 return task
4b628b48
JB
120 }
121
72695f86
JB
122 /** @inheritdoc */
123 public popTask (): Task<Data> | undefined {
dd951876 124 const task = this.tasksQueue.pop()
47352846
JB
125 if (
126 this.onEmptyQueue != null &&
127 this.tasksQueue.size === 0 &&
128 this.onEmptyQueueCount === 0
129 ) {
68cbdc84 130 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
dd951876
JB
131 }
132 return task
72695f86
JB
133 }
134
4b628b48
JB
135 /** @inheritdoc */
136 public clearTasksQueue (): void {
137 this.tasksQueue.clear()
138 }
139
671d5154
JB
140 /** @inheritdoc */
141 public hasBackPressure (): boolean {
8735b4e5 142 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
143 }
144
ff128cc9 145 /** @inheritdoc */
4b628b48
JB
146 public resetUsage (): void {
147 this.usage = this.initWorkerUsage()
db0e38ee 148 this.taskFunctionsUsage.clear()
ff128cc9
JB
149 }
150
3f09ed9f
JB
151 /** @inheritdoc */
152 public closeChannel (): void {
7884d183
JB
153 if (this.messageChannel != null) {
154 this.messageChannel?.port1.unref()
155 this.messageChannel?.port2.unref()
156 this.messageChannel?.port1.close()
157 this.messageChannel?.port2.close()
158 delete this.messageChannel
3f09ed9f
JB
159 }
160 }
161
ff128cc9 162 /** @inheritdoc */
db0e38ee 163 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 164 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 165 throw new Error(
db0e38ee 166 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
167 )
168 }
b558f6b5 169 if (
6703b9f4
JB
170 Array.isArray(this.info.taskFunctionNames) &&
171 this.info.taskFunctionNames.length < 3
b558f6b5 172 ) {
db0e38ee
JB
173 throw new Error(
174 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
175 )
176 }
177 if (name === DEFAULT_TASK_NAME) {
6703b9f4 178 name = this.info.taskFunctionNames[1]
b558f6b5 179 }
db0e38ee
JB
180 if (!this.taskFunctionsUsage.has(name)) {
181 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 182 }
db0e38ee 183 return this.taskFunctionsUsage.get(name)
4b628b48
JB
184 }
185
adee6053
JB
186 /** @inheritdoc */
187 public deleteTaskFunctionWorkerUsage (name: string): boolean {
188 return this.taskFunctionsUsage.delete(name)
189 }
190
68cbdc84 191 private async startOnEmptyQueue (): Promise<void> {
1f0766e7
JB
192 if (
193 this.onEmptyQueueCount > 0 &&
79b197bb 194 (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
1f0766e7 195 ) {
60b7a7cc
JB
196 this.onEmptyQueueCount = 0
197 return
198 }
60b7a7cc 199 ++this.onEmptyQueueCount
47352846 200 this.onEmptyQueue?.(this.info.id as number)
60b7a7cc
JB
201 await sleep(exponentialDelay(this.onEmptyQueueCount))
202 await this.startOnEmptyQueue()
68cbdc84
JB
203 }
204
75de9f41 205 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 206 return {
75de9f41
JB
207 id: getWorkerId(worker),
208 type: getWorkerType(worker) as WorkerType,
4b628b48 209 dynamic: false,
7884d183 210 ready: false
4b628b48
JB
211 }
212 }
213
214 private initWorkerUsage (): WorkerUsage {
215 const getTasksQueueSize = (): number => {
dd951876 216 return this.tasksQueue.size
4b628b48 217 }
bf4ef2ca 218 const getTasksQueueMaxSize = (): number => {
dd951876 219 return this.tasksQueue.maxSize
4b628b48
JB
220 }
221 return {
222 tasks: {
223 executed: 0,
224 executing: 0,
225 get queued (): number {
226 return getTasksQueueSize()
227 },
228 get maxQueued (): number {
bf4ef2ca 229 return getTasksQueueMaxSize()
4b628b48 230 },
68cbdc84 231 stolen: 0,
4b628b48
JB
232 failed: 0
233 },
234 runTime: {
235 history: new CircularArray()
236 },
237 waitTime: {
238 history: new CircularArray()
239 },
240 elu: {
241 idle: {
242 history: new CircularArray()
243 },
244 active: {
245 history: new CircularArray()
246 }
247 }
248 }
249 }
250
db0e38ee 251 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
252 const getTaskFunctionQueueSize = (): number => {
253 let taskFunctionQueueSize = 0
b25a42cd 254 for (const task of this.tasksQueue) {
dd92a715 255 if (
e5ece61d 256 (task.name === DEFAULT_TASK_NAME &&
6703b9f4 257 name === (this.info.taskFunctionNames as string[])[1]) ||
e5ece61d 258 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 259 ) {
e5ece61d 260 ++taskFunctionQueueSize
b25a42cd
JB
261 }
262 }
e5ece61d 263 return taskFunctionQueueSize
b25a42cd
JB
264 }
265 return {
266 tasks: {
267 executed: 0,
268 executing: 0,
269 get queued (): number {
e5ece61d 270 return getTaskFunctionQueueSize()
b25a42cd 271 },
68cbdc84 272 stolen: 0,
b25a42cd
JB
273 failed: 0
274 },
275 runTime: {
276 history: new CircularArray()
277 },
278 waitTime: {
279 history: new CircularArray()
280 },
281 elu: {
282 idle: {
283 history: new CircularArray()
284 },
285 active: {
286 history: new CircularArray()
287 }
288 }
289 }
290 }
5b49e864
JB
291
292 private checkWorkerNodeArguments (
293 worker: Worker,
294 tasksQueueBackPressureSize: number
295 ): void {
296 if (worker == null) {
297 throw new TypeError('Cannot construct a worker node without a worker')
298 }
299 if (tasksQueueBackPressureSize == null) {
300 throw new TypeError(
301 'Cannot construct a worker node without a tasks queue back pressure size'
302 )
303 }
304 if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
305 throw new TypeError(
306 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
307 )
308 }
309 if (tasksQueueBackPressureSize <= 0) {
310 throw new RangeError(
311 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
312 )
313 }
314 }
4b628b48 315}