docs: update benchmarks vs. external pools
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48 2import { CircularArray } from '../circular-array'
5c4d16da 3import type { Task } from '../utility-types'
68cbdc84
JB
4import {
5 DEFAULT_TASK_NAME,
6 EMPTY_FUNCTION,
7 exponentialDelay,
8 sleep
9} from '../utils'
574b351d 10import { Deque } from '../deque'
4b628b48 11import {
ec287edf
JB
12 type BackPressureCallback,
13 type EmptyQueueCallback,
4b628b48
JB
14 type IWorker,
15 type IWorkerNode,
4b628b48
JB
16 type WorkerInfo,
17 type WorkerType,
18 WorkerTypes,
19 type WorkerUsage
20} from './worker'
21
60664f48
JB
22/**
23 * Worker node.
24 *
25 * @typeParam Worker - Type of worker.
26 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
27 */
4b628b48
JB
28export class WorkerNode<Worker extends IWorker, Data = unknown>
29implements IWorkerNode<Worker, Data> {
671d5154 30 /** @inheritdoc */
4b628b48 31 public readonly worker: Worker
671d5154 32 /** @inheritdoc */
4b628b48 33 public readonly info: WorkerInfo
671d5154 34 /** @inheritdoc */
4b628b48 35 public usage: WorkerUsage
20c6f652 36 /** @inheritdoc */
26fb3c18
JB
37 public messageChannel?: MessageChannel
38 /** @inheritdoc */
20c6f652 39 public tasksQueueBackPressureSize: number
72695f86 40 /** @inheritdoc */
6e9c39d3 41 public onBackPressure?: BackPressureCallback
dd951876 42 /** @inheritdoc */
6e9c39d3 43 public onEmptyQueue?: EmptyQueueCallback
574b351d 44 private readonly tasksQueue: Deque<Task<Data>>
68cbdc84 45 private onEmptyQueueCount: number
26fb3c18 46 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 47
60664f48
JB
48 /**
49 * Constructs a new worker node.
50 *
51 * @param worker - The worker.
52 * @param workerType - The worker type.
20c6f652 53 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
60664f48 54 */
20c6f652
JB
55 constructor (
56 worker: Worker,
57 workerType: WorkerType,
58 tasksQueueBackPressureSize: number
59 ) {
8735b4e5 60 if (worker == null) {
e695d66f 61 throw new TypeError('Cannot construct a worker node without a worker')
8735b4e5
JB
62 }
63 if (workerType == null) {
e695d66f
JB
64 throw new TypeError(
65 'Cannot construct a worker node without a worker type'
66 )
8735b4e5 67 }
20c6f652 68 if (tasksQueueBackPressureSize == null) {
e695d66f 69 throw new TypeError(
20c6f652 70 'Cannot construct a worker node without a tasks queue back pressure size'
8735b4e5
JB
71 )
72 }
20c6f652 73 if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
e695d66f 74 throw new TypeError(
20c6f652 75 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
8735b4e5
JB
76 )
77 }
4b628b48
JB
78 this.worker = worker
79 this.info = this.initWorkerInfo(worker, workerType)
26fb3c18 80 this.usage = this.initWorkerUsage()
7884d183
JB
81 if (workerType === WorkerTypes.thread) {
82 this.messageChannel = new MessageChannel()
83 }
20c6f652 84 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
26fb3c18 85 this.tasksQueue = new Deque<Task<Data>>()
68cbdc84 86 this.onEmptyQueueCount = 0
26fb3c18 87 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
88 }
89
90 /** @inheritdoc */
91 public tasksQueueSize (): number {
92 return this.tasksQueue.size
93 }
94
4b628b48
JB
95 /** @inheritdoc */
96 public enqueueTask (task: Task<Data>): number {
72695f86
JB
97 const tasksQueueSize = this.tasksQueue.push(task)
98 if (this.onBackPressure != null && this.hasBackPressure()) {
0741fbeb 99 this.onBackPressure(this.info.id as number)
72695f86
JB
100 }
101 return tasksQueueSize
102 }
103
104 /** @inheritdoc */
105 public unshiftTask (task: Task<Data>): number {
106 const tasksQueueSize = this.tasksQueue.unshift(task)
107 if (this.onBackPressure != null && this.hasBackPressure()) {
0741fbeb 108 this.onBackPressure(this.info.id as number)
72695f86
JB
109 }
110 return tasksQueueSize
4b628b48
JB
111 }
112
113 /** @inheritdoc */
114 public dequeueTask (): Task<Data> | undefined {
dd951876
JB
115 const task = this.tasksQueue.shift()
116 if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
68cbdc84 117 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
dd951876
JB
118 }
119 return task
4b628b48
JB
120 }
121
72695f86
JB
122 /** @inheritdoc */
123 public popTask (): Task<Data> | undefined {
dd951876
JB
124 const task = this.tasksQueue.pop()
125 if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
68cbdc84 126 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
dd951876
JB
127 }
128 return task
72695f86
JB
129 }
130
4b628b48
JB
131 /** @inheritdoc */
132 public clearTasksQueue (): void {
133 this.tasksQueue.clear()
134 }
135
671d5154
JB
136 /** @inheritdoc */
137 public hasBackPressure (): boolean {
8735b4e5 138 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
139 }
140
ff128cc9 141 /** @inheritdoc */
4b628b48
JB
142 public resetUsage (): void {
143 this.usage = this.initWorkerUsage()
db0e38ee 144 this.taskFunctionsUsage.clear()
ff128cc9
JB
145 }
146
3f09ed9f
JB
147 /** @inheritdoc */
148 public closeChannel (): void {
7884d183
JB
149 if (this.messageChannel != null) {
150 this.messageChannel?.port1.unref()
151 this.messageChannel?.port2.unref()
152 this.messageChannel?.port1.close()
153 this.messageChannel?.port2.close()
154 delete this.messageChannel
3f09ed9f
JB
155 }
156 }
157
ff128cc9 158 /** @inheritdoc */
db0e38ee 159 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
a5d15204 160 if (!Array.isArray(this.info.taskFunctions)) {
71b2b6d8 161 throw new Error(
db0e38ee 162 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
163 )
164 }
b558f6b5 165 if (
71b2b6d8 166 Array.isArray(this.info.taskFunctions) &&
db0e38ee 167 this.info.taskFunctions.length < 3
b558f6b5 168 ) {
db0e38ee
JB
169 throw new Error(
170 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
171 )
172 }
173 if (name === DEFAULT_TASK_NAME) {
71b2b6d8 174 name = this.info.taskFunctions[1]
b558f6b5 175 }
db0e38ee
JB
176 if (!this.taskFunctionsUsage.has(name)) {
177 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 178 }
db0e38ee 179 return this.taskFunctionsUsage.get(name)
4b628b48
JB
180 }
181
68cbdc84 182 private async startOnEmptyQueue (): Promise<void> {
1f0766e7
JB
183 if (
184 this.onEmptyQueueCount > 0 &&
79b197bb 185 (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
1f0766e7 186 ) {
60b7a7cc
JB
187 this.onEmptyQueueCount = 0
188 return
189 }
6e9c39d3 190 (this.onEmptyQueue as EmptyQueueCallback)(this.info.id as number)
60b7a7cc
JB
191 ++this.onEmptyQueueCount
192 await sleep(exponentialDelay(this.onEmptyQueueCount))
193 await this.startOnEmptyQueue()
68cbdc84
JB
194 }
195
4b628b48
JB
196 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
197 return {
198 id: this.getWorkerId(worker, workerType),
199 type: workerType,
200 dynamic: false,
7884d183 201 ready: false
4b628b48
JB
202 }
203 }
204
205 private initWorkerUsage (): WorkerUsage {
206 const getTasksQueueSize = (): number => {
dd951876 207 return this.tasksQueue.size
4b628b48 208 }
bf4ef2ca 209 const getTasksQueueMaxSize = (): number => {
dd951876 210 return this.tasksQueue.maxSize
4b628b48
JB
211 }
212 return {
213 tasks: {
214 executed: 0,
215 executing: 0,
216 get queued (): number {
217 return getTasksQueueSize()
218 },
219 get maxQueued (): number {
bf4ef2ca 220 return getTasksQueueMaxSize()
4b628b48 221 },
68cbdc84 222 stolen: 0,
4b628b48
JB
223 failed: 0
224 },
225 runTime: {
226 history: new CircularArray()
227 },
228 waitTime: {
229 history: new CircularArray()
230 },
231 elu: {
232 idle: {
233 history: new CircularArray()
234 },
235 active: {
236 history: new CircularArray()
237 }
238 }
239 }
240 }
241
db0e38ee 242 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
243 const getTaskFunctionQueueSize = (): number => {
244 let taskFunctionQueueSize = 0
b25a42cd 245 for (const task of this.tasksQueue) {
dd92a715 246 if (
e5ece61d
JB
247 (task.name === DEFAULT_TASK_NAME &&
248 name === (this.info.taskFunctions as string[])[1]) ||
249 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 250 ) {
e5ece61d 251 ++taskFunctionQueueSize
b25a42cd
JB
252 }
253 }
e5ece61d 254 return taskFunctionQueueSize
b25a42cd
JB
255 }
256 return {
257 tasks: {
258 executed: 0,
259 executing: 0,
260 get queued (): number {
e5ece61d 261 return getTaskFunctionQueueSize()
b25a42cd 262 },
68cbdc84 263 stolen: 0,
b25a42cd
JB
264 failed: 0
265 },
266 runTime: {
267 history: new CircularArray()
268 },
269 waitTime: {
270 history: new CircularArray()
271 },
272 elu: {
273 idle: {
274 history: new CircularArray()
275 },
276 active: {
277 history: new CircularArray()
278 }
279 }
280 }
281 }
282
4b628b48
JB
283 /**
284 * Gets the worker id.
285 *
286 * @param worker - The worker.
60664f48 287 * @param workerType - The worker type.
4b628b48
JB
288 * @returns The worker id.
289 */
290 private getWorkerId (
291 worker: Worker,
292 workerType: WorkerType
293 ): number | undefined {
294 if (workerType === WorkerTypes.thread) {
295 return worker.threadId
296 } else if (workerType === WorkerTypes.cluster) {
297 return worker.id
298 }
299 }
300}