45b4f1ee18c041db8837ecc987c4d5cba38e998d
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import type { Task } from '../utility-types'
4 import {
5 DEFAULT_TASK_NAME,
6 EMPTY_FUNCTION,
7 exponentialDelay,
8 getWorkerId,
9 getWorkerType,
10 sleep
11 } from '../utils'
12 import { Deque } from '../deque'
13 import {
14 type BackPressureCallback,
15 type EmptyQueueCallback,
16 type IWorker,
17 type IWorkerNode,
18 type WorkerInfo,
19 type WorkerType,
20 WorkerTypes,
21 type WorkerUsage
22 } from './worker'
23
24 /**
25 * Worker node.
26 *
27 * @typeParam Worker - Type of worker.
28 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
29 */
30 export class WorkerNode<Worker extends IWorker, Data = unknown>
31 implements IWorkerNode<Worker, Data> {
32 /** @inheritdoc */
33 public readonly worker: Worker
34 /** @inheritdoc */
35 public readonly info: WorkerInfo
36 /** @inheritdoc */
37 public usage: WorkerUsage
38 /** @inheritdoc */
39 public messageChannel?: MessageChannel
40 /** @inheritdoc */
41 public tasksQueueBackPressureSize: number
42 /** @inheritdoc */
43 public onBackPressure?: BackPressureCallback
44 /** @inheritdoc */
45 public onEmptyQueue?: EmptyQueueCallback
46 private readonly tasksQueue: Deque<Task<Data>>
47 private onEmptyQueueCount: number
48 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
49
50 /**
51 * Constructs a new worker node.
52 *
53 * @param worker - The worker.
54 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
55 */
56 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
57 if (worker == null) {
58 throw new TypeError('Cannot construct a worker node without a worker')
59 }
60 if (tasksQueueBackPressureSize == null) {
61 throw new TypeError(
62 'Cannot construct a worker node without a tasks queue back pressure size'
63 )
64 }
65 if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
66 throw new TypeError(
67 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
68 )
69 }
70 this.worker = worker
71 this.info = this.initWorkerInfo(worker)
72 this.usage = this.initWorkerUsage()
73 if (this.info.type === WorkerTypes.thread) {
74 this.messageChannel = new MessageChannel()
75 }
76 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
77 this.tasksQueue = new Deque<Task<Data>>()
78 this.onEmptyQueueCount = 0
79 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
80 }
81
82 /** @inheritdoc */
83 public tasksQueueSize (): number {
84 return this.tasksQueue.size
85 }
86
87 /** @inheritdoc */
88 public enqueueTask (task: Task<Data>): number {
89 const tasksQueueSize = this.tasksQueue.push(task)
90 if (this.onBackPressure != null && this.hasBackPressure()) {
91 this.onBackPressure(this.info.id as number)
92 }
93 return tasksQueueSize
94 }
95
96 /** @inheritdoc */
97 public unshiftTask (task: Task<Data>): number {
98 const tasksQueueSize = this.tasksQueue.unshift(task)
99 if (this.onBackPressure != null && this.hasBackPressure()) {
100 this.onBackPressure(this.info.id as number)
101 }
102 return tasksQueueSize
103 }
104
105 /** @inheritdoc */
106 public dequeueTask (): Task<Data> | undefined {
107 const task = this.tasksQueue.shift()
108 if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
109 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
110 }
111 return task
112 }
113
114 /** @inheritdoc */
115 public popTask (): Task<Data> | undefined {
116 const task = this.tasksQueue.pop()
117 if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
118 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
119 }
120 return task
121 }
122
123 /** @inheritdoc */
124 public clearTasksQueue (): void {
125 this.tasksQueue.clear()
126 }
127
128 /** @inheritdoc */
129 public hasBackPressure (): boolean {
130 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
131 }
132
133 /** @inheritdoc */
134 public resetUsage (): void {
135 this.usage = this.initWorkerUsage()
136 this.taskFunctionsUsage.clear()
137 }
138
139 /** @inheritdoc */
140 public closeChannel (): void {
141 if (this.messageChannel != null) {
142 this.messageChannel?.port1.unref()
143 this.messageChannel?.port2.unref()
144 this.messageChannel?.port1.close()
145 this.messageChannel?.port2.close()
146 delete this.messageChannel
147 }
148 }
149
150 /** @inheritdoc */
151 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
152 if (!Array.isArray(this.info.taskFunctions)) {
153 throw new Error(
154 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
155 )
156 }
157 if (
158 Array.isArray(this.info.taskFunctions) &&
159 this.info.taskFunctions.length < 3
160 ) {
161 throw new Error(
162 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
163 )
164 }
165 if (name === DEFAULT_TASK_NAME) {
166 name = this.info.taskFunctions[1]
167 }
168 if (!this.taskFunctionsUsage.has(name)) {
169 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
170 }
171 return this.taskFunctionsUsage.get(name)
172 }
173
174 private async startOnEmptyQueue (): Promise<void> {
175 if (
176 this.onEmptyQueueCount > 0 &&
177 (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
178 ) {
179 this.onEmptyQueueCount = 0
180 return
181 }
182 (this.onEmptyQueue as EmptyQueueCallback)(this.info.id as number)
183 ++this.onEmptyQueueCount
184 await sleep(exponentialDelay(this.onEmptyQueueCount))
185 await this.startOnEmptyQueue()
186 }
187
188 private initWorkerInfo (worker: Worker): WorkerInfo {
189 return {
190 id: getWorkerId(worker),
191 type: getWorkerType(worker) as WorkerType,
192 dynamic: false,
193 ready: false
194 }
195 }
196
197 private initWorkerUsage (): WorkerUsage {
198 const getTasksQueueSize = (): number => {
199 return this.tasksQueue.size
200 }
201 const getTasksQueueMaxSize = (): number => {
202 return this.tasksQueue.maxSize
203 }
204 return {
205 tasks: {
206 executed: 0,
207 executing: 0,
208 get queued (): number {
209 return getTasksQueueSize()
210 },
211 get maxQueued (): number {
212 return getTasksQueueMaxSize()
213 },
214 stolen: 0,
215 failed: 0
216 },
217 runTime: {
218 history: new CircularArray()
219 },
220 waitTime: {
221 history: new CircularArray()
222 },
223 elu: {
224 idle: {
225 history: new CircularArray()
226 },
227 active: {
228 history: new CircularArray()
229 }
230 }
231 }
232 }
233
234 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
235 const getTaskFunctionQueueSize = (): number => {
236 let taskFunctionQueueSize = 0
237 for (const task of this.tasksQueue) {
238 if (
239 (task.name === DEFAULT_TASK_NAME &&
240 name === (this.info.taskFunctions as string[])[1]) ||
241 (task.name !== DEFAULT_TASK_NAME && name === task.name)
242 ) {
243 ++taskFunctionQueueSize
244 }
245 }
246 return taskFunctionQueueSize
247 }
248 return {
249 tasks: {
250 executed: 0,
251 executing: 0,
252 get queued (): number {
253 return getTaskFunctionQueueSize()
254 },
255 stolen: 0,
256 failed: 0
257 },
258 runTime: {
259 history: new CircularArray()
260 },
261 waitTime: {
262 history: new CircularArray()
263 },
264 elu: {
265 idle: {
266 history: new CircularArray()
267 },
268 active: {
269 history: new CircularArray()
270 }
271 }
272 }
273 }
274 }