Merge branch 'master' of github.com:jerome-benoit/poolifier
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48 2import { CircularArray } from '../circular-array'
5c4d16da 3import type { Task } from '../utility-types'
68cbdc84
JB
4import {
5 DEFAULT_TASK_NAME,
6 EMPTY_FUNCTION,
7 exponentialDelay,
75de9f41
JB
8 getWorkerId,
9 getWorkerType,
68cbdc84
JB
10 sleep
11} from '../utils'
574b351d 12import { Deque } from '../deque'
4b628b48
JB
13import {
14 type IWorker,
15 type IWorkerNode,
4b628b48 16 type WorkerInfo,
a9780ad2 17 type WorkerNodeEventCallback,
4b628b48
JB
18 type WorkerType,
19 WorkerTypes,
20 type WorkerUsage
21} from './worker'
22
60664f48
JB
23/**
24 * Worker node.
25 *
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 */
4b628b48
JB
29export class WorkerNode<Worker extends IWorker, Data = unknown>
30implements IWorkerNode<Worker, Data> {
671d5154 31 /** @inheritdoc */
4b628b48 32 public readonly worker: Worker
671d5154 33 /** @inheritdoc */
4b628b48 34 public readonly info: WorkerInfo
671d5154 35 /** @inheritdoc */
4b628b48 36 public usage: WorkerUsage
20c6f652 37 /** @inheritdoc */
26fb3c18
JB
38 public messageChannel?: MessageChannel
39 /** @inheritdoc */
20c6f652 40 public tasksQueueBackPressureSize: number
72695f86 41 /** @inheritdoc */
a9780ad2 42 public onBackPressure?: WorkerNodeEventCallback
dd951876 43 /** @inheritdoc */
a9780ad2 44 public onEmptyQueue?: WorkerNodeEventCallback
574b351d 45 private readonly tasksQueue: Deque<Task<Data>>
68cbdc84 46 private onEmptyQueueCount: number
26fb3c18 47 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 48
60664f48
JB
49 /**
50 * Constructs a new worker node.
51 *
52 * @param worker - The worker.
20c6f652 53 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
60664f48 54 */
75de9f41 55 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
5b49e864 56 this.checkWorkerNodeArguments(worker, tasksQueueBackPressureSize)
4b628b48 57 this.worker = worker
75de9f41 58 this.info = this.initWorkerInfo(worker)
26fb3c18 59 this.usage = this.initWorkerUsage()
75de9f41 60 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
61 this.messageChannel = new MessageChannel()
62 }
20c6f652 63 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
26fb3c18 64 this.tasksQueue = new Deque<Task<Data>>()
68cbdc84 65 this.onEmptyQueueCount = 0
26fb3c18 66 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
67 }
68
69 /** @inheritdoc */
70 public tasksQueueSize (): number {
71 return this.tasksQueue.size
72 }
73
4b628b48
JB
74 /** @inheritdoc */
75 public enqueueTask (task: Task<Data>): number {
72695f86
JB
76 const tasksQueueSize = this.tasksQueue.push(task)
77 if (this.onBackPressure != null && this.hasBackPressure()) {
0741fbeb 78 this.onBackPressure(this.info.id as number)
72695f86
JB
79 }
80 return tasksQueueSize
81 }
82
83 /** @inheritdoc */
84 public unshiftTask (task: Task<Data>): number {
85 const tasksQueueSize = this.tasksQueue.unshift(task)
86 if (this.onBackPressure != null && this.hasBackPressure()) {
0741fbeb 87 this.onBackPressure(this.info.id as number)
72695f86
JB
88 }
89 return tasksQueueSize
4b628b48
JB
90 }
91
92 /** @inheritdoc */
93 public dequeueTask (): Task<Data> | undefined {
dd951876
JB
94 const task = this.tasksQueue.shift()
95 if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
68cbdc84 96 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
dd951876
JB
97 }
98 return task
4b628b48
JB
99 }
100
72695f86
JB
101 /** @inheritdoc */
102 public popTask (): Task<Data> | undefined {
dd951876
JB
103 const task = this.tasksQueue.pop()
104 if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
68cbdc84 105 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
dd951876
JB
106 }
107 return task
72695f86
JB
108 }
109
4b628b48
JB
110 /** @inheritdoc */
111 public clearTasksQueue (): void {
112 this.tasksQueue.clear()
113 }
114
671d5154
JB
115 /** @inheritdoc */
116 public hasBackPressure (): boolean {
8735b4e5 117 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
118 }
119
ff128cc9 120 /** @inheritdoc */
4b628b48
JB
121 public resetUsage (): void {
122 this.usage = this.initWorkerUsage()
db0e38ee 123 this.taskFunctionsUsage.clear()
ff128cc9
JB
124 }
125
3f09ed9f
JB
126 /** @inheritdoc */
127 public closeChannel (): void {
7884d183
JB
128 if (this.messageChannel != null) {
129 this.messageChannel?.port1.unref()
130 this.messageChannel?.port2.unref()
131 this.messageChannel?.port1.close()
132 this.messageChannel?.port2.close()
133 delete this.messageChannel
3f09ed9f
JB
134 }
135 }
136
ff128cc9 137 /** @inheritdoc */
db0e38ee 138 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
a5d15204 139 if (!Array.isArray(this.info.taskFunctions)) {
71b2b6d8 140 throw new Error(
db0e38ee 141 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
142 )
143 }
b558f6b5 144 if (
71b2b6d8 145 Array.isArray(this.info.taskFunctions) &&
db0e38ee 146 this.info.taskFunctions.length < 3
b558f6b5 147 ) {
db0e38ee
JB
148 throw new Error(
149 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
150 )
151 }
152 if (name === DEFAULT_TASK_NAME) {
71b2b6d8 153 name = this.info.taskFunctions[1]
b558f6b5 154 }
db0e38ee
JB
155 if (!this.taskFunctionsUsage.has(name)) {
156 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 157 }
db0e38ee 158 return this.taskFunctionsUsage.get(name)
4b628b48
JB
159 }
160
68cbdc84 161 private async startOnEmptyQueue (): Promise<void> {
1f0766e7
JB
162 if (
163 this.onEmptyQueueCount > 0 &&
79b197bb 164 (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
1f0766e7 165 ) {
60b7a7cc
JB
166 this.onEmptyQueueCount = 0
167 return
168 }
a9780ad2 169 (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number)
60b7a7cc
JB
170 ++this.onEmptyQueueCount
171 await sleep(exponentialDelay(this.onEmptyQueueCount))
172 await this.startOnEmptyQueue()
68cbdc84
JB
173 }
174
75de9f41 175 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 176 return {
75de9f41
JB
177 id: getWorkerId(worker),
178 type: getWorkerType(worker) as WorkerType,
4b628b48 179 dynamic: false,
7884d183 180 ready: false
4b628b48
JB
181 }
182 }
183
184 private initWorkerUsage (): WorkerUsage {
185 const getTasksQueueSize = (): number => {
dd951876 186 return this.tasksQueue.size
4b628b48 187 }
bf4ef2ca 188 const getTasksQueueMaxSize = (): number => {
dd951876 189 return this.tasksQueue.maxSize
4b628b48
JB
190 }
191 return {
192 tasks: {
193 executed: 0,
194 executing: 0,
195 get queued (): number {
196 return getTasksQueueSize()
197 },
198 get maxQueued (): number {
bf4ef2ca 199 return getTasksQueueMaxSize()
4b628b48 200 },
68cbdc84 201 stolen: 0,
4b628b48
JB
202 failed: 0
203 },
204 runTime: {
205 history: new CircularArray()
206 },
207 waitTime: {
208 history: new CircularArray()
209 },
210 elu: {
211 idle: {
212 history: new CircularArray()
213 },
214 active: {
215 history: new CircularArray()
216 }
217 }
218 }
219 }
220
db0e38ee 221 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
222 const getTaskFunctionQueueSize = (): number => {
223 let taskFunctionQueueSize = 0
b25a42cd 224 for (const task of this.tasksQueue) {
dd92a715 225 if (
e5ece61d
JB
226 (task.name === DEFAULT_TASK_NAME &&
227 name === (this.info.taskFunctions as string[])[1]) ||
228 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 229 ) {
e5ece61d 230 ++taskFunctionQueueSize
b25a42cd
JB
231 }
232 }
e5ece61d 233 return taskFunctionQueueSize
b25a42cd
JB
234 }
235 return {
236 tasks: {
237 executed: 0,
238 executing: 0,
239 get queued (): number {
e5ece61d 240 return getTaskFunctionQueueSize()
b25a42cd 241 },
68cbdc84 242 stolen: 0,
b25a42cd
JB
243 failed: 0
244 },
245 runTime: {
246 history: new CircularArray()
247 },
248 waitTime: {
249 history: new CircularArray()
250 },
251 elu: {
252 idle: {
253 history: new CircularArray()
254 },
255 active: {
256 history: new CircularArray()
257 }
258 }
259 }
260 }
5b49e864
JB
261
262 private checkWorkerNodeArguments (
263 worker: Worker,
264 tasksQueueBackPressureSize: number
265 ): void {
266 if (worker == null) {
267 throw new TypeError('Cannot construct a worker node without a worker')
268 }
269 if (tasksQueueBackPressureSize == null) {
270 throw new TypeError(
271 'Cannot construct a worker node without a tasks queue back pressure size'
272 )
273 }
274 if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
275 throw new TypeError(
276 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
277 )
278 }
279 if (tasksQueueBackPressureSize <= 0) {
280 throw new RangeError(
281 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
282 )
283 }
284 }
4b628b48 285}