Merge branch 'master' of github.com:jerome-benoit/poolifier
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import type { Task } from '../utility-types'
4 import {
5 DEFAULT_TASK_NAME,
6 EMPTY_FUNCTION,
7 exponentialDelay,
8 getWorkerId,
9 getWorkerType,
10 sleep
11 } from '../utils'
12 import { Deque } from '../deque'
13 import {
14 type IWorker,
15 type IWorkerNode,
16 type WorkerInfo,
17 type WorkerNodeEventCallback,
18 type WorkerType,
19 WorkerTypes,
20 type WorkerUsage
21 } from './worker'
22
23 /**
24 * Worker node.
25 *
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 */
29 export class WorkerNode<Worker extends IWorker, Data = unknown>
30 implements IWorkerNode<Worker, Data> {
31 /** @inheritdoc */
32 public readonly worker: Worker
33 /** @inheritdoc */
34 public readonly info: WorkerInfo
35 /** @inheritdoc */
36 public usage: WorkerUsage
37 /** @inheritdoc */
38 public messageChannel?: MessageChannel
39 /** @inheritdoc */
40 public tasksQueueBackPressureSize: number
41 /** @inheritdoc */
42 public onBackPressure?: WorkerNodeEventCallback
43 /** @inheritdoc */
44 public onEmptyQueue?: WorkerNodeEventCallback
45 private readonly tasksQueue: Deque<Task<Data>>
46 private onEmptyQueueCount: number
47 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
48
49 /**
50 * Constructs a new worker node.
51 *
52 * @param worker - The worker.
53 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
54 */
55 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
56 this.checkWorkerNodeArguments(worker, tasksQueueBackPressureSize)
57 this.worker = worker
58 this.info = this.initWorkerInfo(worker)
59 this.usage = this.initWorkerUsage()
60 if (this.info.type === WorkerTypes.thread) {
61 this.messageChannel = new MessageChannel()
62 }
63 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
64 this.tasksQueue = new Deque<Task<Data>>()
65 this.onEmptyQueueCount = 0
66 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
67 }
68
69 /** @inheritdoc */
70 public tasksQueueSize (): number {
71 return this.tasksQueue.size
72 }
73
74 /** @inheritdoc */
75 public enqueueTask (task: Task<Data>): number {
76 const tasksQueueSize = this.tasksQueue.push(task)
77 if (this.onBackPressure != null && this.hasBackPressure()) {
78 this.onBackPressure(this.info.id as number)
79 }
80 return tasksQueueSize
81 }
82
83 /** @inheritdoc */
84 public unshiftTask (task: Task<Data>): number {
85 const tasksQueueSize = this.tasksQueue.unshift(task)
86 if (this.onBackPressure != null && this.hasBackPressure()) {
87 this.onBackPressure(this.info.id as number)
88 }
89 return tasksQueueSize
90 }
91
92 /** @inheritdoc */
93 public dequeueTask (): Task<Data> | undefined {
94 const task = this.tasksQueue.shift()
95 if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
96 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
97 }
98 return task
99 }
100
101 /** @inheritdoc */
102 public popTask (): Task<Data> | undefined {
103 const task = this.tasksQueue.pop()
104 if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
105 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
106 }
107 return task
108 }
109
110 /** @inheritdoc */
111 public clearTasksQueue (): void {
112 this.tasksQueue.clear()
113 }
114
115 /** @inheritdoc */
116 public hasBackPressure (): boolean {
117 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
118 }
119
120 /** @inheritdoc */
121 public resetUsage (): void {
122 this.usage = this.initWorkerUsage()
123 this.taskFunctionsUsage.clear()
124 }
125
126 /** @inheritdoc */
127 public closeChannel (): void {
128 if (this.messageChannel != null) {
129 this.messageChannel?.port1.unref()
130 this.messageChannel?.port2.unref()
131 this.messageChannel?.port1.close()
132 this.messageChannel?.port2.close()
133 delete this.messageChannel
134 }
135 }
136
137 /** @inheritdoc */
138 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
139 if (!Array.isArray(this.info.taskFunctions)) {
140 throw new Error(
141 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
142 )
143 }
144 if (
145 Array.isArray(this.info.taskFunctions) &&
146 this.info.taskFunctions.length < 3
147 ) {
148 throw new Error(
149 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
150 )
151 }
152 if (name === DEFAULT_TASK_NAME) {
153 name = this.info.taskFunctions[1]
154 }
155 if (!this.taskFunctionsUsage.has(name)) {
156 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
157 }
158 return this.taskFunctionsUsage.get(name)
159 }
160
161 private async startOnEmptyQueue (): Promise<void> {
162 if (
163 this.onEmptyQueueCount > 0 &&
164 (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
165 ) {
166 this.onEmptyQueueCount = 0
167 return
168 }
169 (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number)
170 ++this.onEmptyQueueCount
171 await sleep(exponentialDelay(this.onEmptyQueueCount))
172 await this.startOnEmptyQueue()
173 }
174
175 private initWorkerInfo (worker: Worker): WorkerInfo {
176 return {
177 id: getWorkerId(worker),
178 type: getWorkerType(worker) as WorkerType,
179 dynamic: false,
180 ready: false
181 }
182 }
183
184 private initWorkerUsage (): WorkerUsage {
185 const getTasksQueueSize = (): number => {
186 return this.tasksQueue.size
187 }
188 const getTasksQueueMaxSize = (): number => {
189 return this.tasksQueue.maxSize
190 }
191 return {
192 tasks: {
193 executed: 0,
194 executing: 0,
195 get queued (): number {
196 return getTasksQueueSize()
197 },
198 get maxQueued (): number {
199 return getTasksQueueMaxSize()
200 },
201 stolen: 0,
202 failed: 0
203 },
204 runTime: {
205 history: new CircularArray()
206 },
207 waitTime: {
208 history: new CircularArray()
209 },
210 elu: {
211 idle: {
212 history: new CircularArray()
213 },
214 active: {
215 history: new CircularArray()
216 }
217 }
218 }
219 }
220
221 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
222 const getTaskFunctionQueueSize = (): number => {
223 let taskFunctionQueueSize = 0
224 for (const task of this.tasksQueue) {
225 if (
226 (task.name === DEFAULT_TASK_NAME &&
227 name === (this.info.taskFunctions as string[])[1]) ||
228 (task.name !== DEFAULT_TASK_NAME && name === task.name)
229 ) {
230 ++taskFunctionQueueSize
231 }
232 }
233 return taskFunctionQueueSize
234 }
235 return {
236 tasks: {
237 executed: 0,
238 executing: 0,
239 get queued (): number {
240 return getTaskFunctionQueueSize()
241 },
242 stolen: 0,
243 failed: 0
244 },
245 runTime: {
246 history: new CircularArray()
247 },
248 waitTime: {
249 history: new CircularArray()
250 },
251 elu: {
252 idle: {
253 history: new CircularArray()
254 },
255 active: {
256 history: new CircularArray()
257 }
258 }
259 }
260 }
261
262 private checkWorkerNodeArguments (
263 worker: Worker,
264 tasksQueueBackPressureSize: number
265 ): void {
266 if (worker == null) {
267 throw new TypeError('Cannot construct a worker node without a worker')
268 }
269 if (tasksQueueBackPressureSize == null) {
270 throw new TypeError(
271 'Cannot construct a worker node without a tasks queue back pressure size'
272 )
273 }
274 if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
275 throw new TypeError(
276 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
277 )
278 }
279 if (tasksQueueBackPressureSize <= 0) {
280 throw new RangeError(
281 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
282 )
283 }
284 }
285 }