perf: improve node eligibility branching on worker choice strategies
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48 2import { CircularArray } from '../circular-array'
5c4d16da 3import type { Task } from '../utility-types'
68cbdc84
JB
4import {
5 DEFAULT_TASK_NAME,
6 EMPTY_FUNCTION,
7 exponentialDelay,
75de9f41
JB
8 getWorkerId,
9 getWorkerType,
68cbdc84
JB
10 sleep
11} from '../utils'
574b351d 12import { Deque } from '../deque'
4b628b48 13import {
ec287edf
JB
14 type BackPressureCallback,
15 type EmptyQueueCallback,
4b628b48
JB
16 type IWorker,
17 type IWorkerNode,
4b628b48
JB
18 type WorkerInfo,
19 type WorkerType,
20 WorkerTypes,
21 type WorkerUsage
22} from './worker'
23
60664f48
JB
24/**
25 * Worker node.
26 *
27 * @typeParam Worker - Type of worker.
28 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
29 */
4b628b48
JB
30export class WorkerNode<Worker extends IWorker, Data = unknown>
31implements IWorkerNode<Worker, Data> {
671d5154 32 /** @inheritdoc */
4b628b48 33 public readonly worker: Worker
671d5154 34 /** @inheritdoc */
4b628b48 35 public readonly info: WorkerInfo
671d5154 36 /** @inheritdoc */
4b628b48 37 public usage: WorkerUsage
20c6f652 38 /** @inheritdoc */
26fb3c18
JB
39 public messageChannel?: MessageChannel
40 /** @inheritdoc */
20c6f652 41 public tasksQueueBackPressureSize: number
72695f86 42 /** @inheritdoc */
6e9c39d3 43 public onBackPressure?: BackPressureCallback
dd951876 44 /** @inheritdoc */
6e9c39d3 45 public onEmptyQueue?: EmptyQueueCallback
574b351d 46 private readonly tasksQueue: Deque<Task<Data>>
68cbdc84 47 private onEmptyQueueCount: number
26fb3c18 48 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 49
60664f48
JB
50 /**
51 * Constructs a new worker node.
52 *
53 * @param worker - The worker.
20c6f652 54 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
60664f48 55 */
75de9f41 56 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
8735b4e5 57 if (worker == null) {
e695d66f 58 throw new TypeError('Cannot construct a worker node without a worker')
8735b4e5 59 }
75de9f41 60
20c6f652 61 if (tasksQueueBackPressureSize == null) {
e695d66f 62 throw new TypeError(
20c6f652 63 'Cannot construct a worker node without a tasks queue back pressure size'
8735b4e5
JB
64 )
65 }
20c6f652 66 if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
e695d66f 67 throw new TypeError(
20c6f652 68 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
8735b4e5
JB
69 )
70 }
4b628b48 71 this.worker = worker
75de9f41 72 this.info = this.initWorkerInfo(worker)
26fb3c18 73 this.usage = this.initWorkerUsage()
75de9f41 74 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
75 this.messageChannel = new MessageChannel()
76 }
20c6f652 77 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
26fb3c18 78 this.tasksQueue = new Deque<Task<Data>>()
68cbdc84 79 this.onEmptyQueueCount = 0
26fb3c18 80 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
81 }
82
83 /** @inheritdoc */
84 public tasksQueueSize (): number {
85 return this.tasksQueue.size
86 }
87
4b628b48
JB
88 /** @inheritdoc */
89 public enqueueTask (task: Task<Data>): number {
72695f86
JB
90 const tasksQueueSize = this.tasksQueue.push(task)
91 if (this.onBackPressure != null && this.hasBackPressure()) {
0741fbeb 92 this.onBackPressure(this.info.id as number)
72695f86
JB
93 }
94 return tasksQueueSize
95 }
96
97 /** @inheritdoc */
98 public unshiftTask (task: Task<Data>): number {
99 const tasksQueueSize = this.tasksQueue.unshift(task)
100 if (this.onBackPressure != null && this.hasBackPressure()) {
0741fbeb 101 this.onBackPressure(this.info.id as number)
72695f86
JB
102 }
103 return tasksQueueSize
4b628b48
JB
104 }
105
106 /** @inheritdoc */
107 public dequeueTask (): Task<Data> | undefined {
dd951876
JB
108 const task = this.tasksQueue.shift()
109 if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
68cbdc84 110 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
dd951876
JB
111 }
112 return task
4b628b48
JB
113 }
114
72695f86
JB
115 /** @inheritdoc */
116 public popTask (): Task<Data> | undefined {
dd951876
JB
117 const task = this.tasksQueue.pop()
118 if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
68cbdc84 119 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
dd951876
JB
120 }
121 return task
72695f86
JB
122 }
123
4b628b48
JB
124 /** @inheritdoc */
125 public clearTasksQueue (): void {
126 this.tasksQueue.clear()
127 }
128
671d5154
JB
129 /** @inheritdoc */
130 public hasBackPressure (): boolean {
8735b4e5 131 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
132 }
133
ff128cc9 134 /** @inheritdoc */
4b628b48
JB
135 public resetUsage (): void {
136 this.usage = this.initWorkerUsage()
db0e38ee 137 this.taskFunctionsUsage.clear()
ff128cc9
JB
138 }
139
3f09ed9f
JB
140 /** @inheritdoc */
141 public closeChannel (): void {
7884d183
JB
142 if (this.messageChannel != null) {
143 this.messageChannel?.port1.unref()
144 this.messageChannel?.port2.unref()
145 this.messageChannel?.port1.close()
146 this.messageChannel?.port2.close()
147 delete this.messageChannel
3f09ed9f
JB
148 }
149 }
150
ff128cc9 151 /** @inheritdoc */
db0e38ee 152 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
a5d15204 153 if (!Array.isArray(this.info.taskFunctions)) {
71b2b6d8 154 throw new Error(
db0e38ee 155 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
156 )
157 }
b558f6b5 158 if (
71b2b6d8 159 Array.isArray(this.info.taskFunctions) &&
db0e38ee 160 this.info.taskFunctions.length < 3
b558f6b5 161 ) {
db0e38ee
JB
162 throw new Error(
163 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
164 )
165 }
166 if (name === DEFAULT_TASK_NAME) {
71b2b6d8 167 name = this.info.taskFunctions[1]
b558f6b5 168 }
db0e38ee
JB
169 if (!this.taskFunctionsUsage.has(name)) {
170 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 171 }
db0e38ee 172 return this.taskFunctionsUsage.get(name)
4b628b48
JB
173 }
174
68cbdc84 175 private async startOnEmptyQueue (): Promise<void> {
1f0766e7
JB
176 if (
177 this.onEmptyQueueCount > 0 &&
79b197bb 178 (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
1f0766e7 179 ) {
60b7a7cc
JB
180 this.onEmptyQueueCount = 0
181 return
182 }
6e9c39d3 183 (this.onEmptyQueue as EmptyQueueCallback)(this.info.id as number)
60b7a7cc
JB
184 ++this.onEmptyQueueCount
185 await sleep(exponentialDelay(this.onEmptyQueueCount))
186 await this.startOnEmptyQueue()
68cbdc84
JB
187 }
188
75de9f41 189 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 190 return {
75de9f41
JB
191 id: getWorkerId(worker),
192 type: getWorkerType(worker) as WorkerType,
4b628b48 193 dynamic: false,
7884d183 194 ready: false
4b628b48
JB
195 }
196 }
197
198 private initWorkerUsage (): WorkerUsage {
199 const getTasksQueueSize = (): number => {
dd951876 200 return this.tasksQueue.size
4b628b48 201 }
bf4ef2ca 202 const getTasksQueueMaxSize = (): number => {
dd951876 203 return this.tasksQueue.maxSize
4b628b48
JB
204 }
205 return {
206 tasks: {
207 executed: 0,
208 executing: 0,
209 get queued (): number {
210 return getTasksQueueSize()
211 },
212 get maxQueued (): number {
bf4ef2ca 213 return getTasksQueueMaxSize()
4b628b48 214 },
68cbdc84 215 stolen: 0,
4b628b48
JB
216 failed: 0
217 },
218 runTime: {
219 history: new CircularArray()
220 },
221 waitTime: {
222 history: new CircularArray()
223 },
224 elu: {
225 idle: {
226 history: new CircularArray()
227 },
228 active: {
229 history: new CircularArray()
230 }
231 }
232 }
233 }
234
db0e38ee 235 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
236 const getTaskFunctionQueueSize = (): number => {
237 let taskFunctionQueueSize = 0
b25a42cd 238 for (const task of this.tasksQueue) {
dd92a715 239 if (
e5ece61d
JB
240 (task.name === DEFAULT_TASK_NAME &&
241 name === (this.info.taskFunctions as string[])[1]) ||
242 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 243 ) {
e5ece61d 244 ++taskFunctionQueueSize
b25a42cd
JB
245 }
246 }
e5ece61d 247 return taskFunctionQueueSize
b25a42cd
JB
248 }
249 return {
250 tasks: {
251 executed: 0,
252 executing: 0,
253 get queued (): number {
e5ece61d 254 return getTaskFunctionQueueSize()
b25a42cd 255 },
68cbdc84 256 stolen: 0,
b25a42cd
JB
257 failed: 0
258 },
259 runTime: {
260 history: new CircularArray()
261 },
262 waitTime: {
263 history: new CircularArray()
264 },
265 elu: {
266 idle: {
267 history: new CircularArray()
268 },
269 active: {
270 history: new CircularArray()
271 }
272 }
273 }
274 }
4b628b48 275}