fix: refine pool statuses handling
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
e1c2dba7 2import { EventEmitter } from 'node:events'
4b628b48 3import { CircularArray } from '../circular-array'
5c4d16da 4import type { Task } from '../utility-types'
463226a4 5import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
574b351d 6import { Deque } from '../deque'
4b628b48 7import {
c3719753
JB
8 type ErrorHandler,
9 type ExitHandler,
4b628b48
JB
10 type IWorker,
11 type IWorkerNode,
c3719753
JB
12 type MessageHandler,
13 type OnlineHandler,
f3a91bac 14 type StrategyData,
4b628b48 15 type WorkerInfo,
c3719753 16 type WorkerNodeOptions,
4b628b48
JB
17 type WorkerType,
18 WorkerTypes,
19 type WorkerUsage
20} from './worker'
c3719753 21import { checkWorkerNodeArguments, createWorker } from './utils'
4b628b48 22
60664f48
JB
23/**
24 * Worker node.
25 *
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 */
4b628b48 29export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 30 extends EventEmitter
9f95d5eb 31 implements IWorkerNode<Worker, Data> {
671d5154 32 /** @inheritdoc */
4b628b48 33 public readonly worker: Worker
671d5154 34 /** @inheritdoc */
4b628b48 35 public readonly info: WorkerInfo
671d5154 36 /** @inheritdoc */
4b628b48 37 public usage: WorkerUsage
20c6f652 38 /** @inheritdoc */
f3a91bac
JB
39 public strategyData?: StrategyData
40 /** @inheritdoc */
26fb3c18
JB
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
20c6f652 43 public tasksQueueBackPressureSize: number
574b351d 44 private readonly tasksQueue: Deque<Task<Data>>
47352846 45 private onBackPressureStarted: boolean
26fb3c18 46 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 47
60664f48
JB
48 /**
49 * Constructs a new worker node.
50 *
c3719753 51 * @param type - The worker type.
9974369e 52 * @param filePath - Path to the worker file.
c3719753 53 * @param opts - The worker node options.
60664f48 54 */
c3719753 55 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 56 super()
c3719753
JB
57 checkWorkerNodeArguments(type, filePath, opts)
58 this.worker = createWorker<Worker>(type, filePath, {
59 env: opts.env,
60 workerOptions: opts.workerOptions
61 })
62 this.info = this.initWorkerInfo(this.worker)
26fb3c18 63 this.usage = this.initWorkerUsage()
75de9f41 64 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
65 this.messageChannel = new MessageChannel()
66 }
c3719753 67 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
26fb3c18 68 this.tasksQueue = new Deque<Task<Data>>()
47352846 69 this.onBackPressureStarted = false
26fb3c18 70 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
71 }
72
73 /** @inheritdoc */
74 public tasksQueueSize (): number {
75 return this.tasksQueue.size
76 }
77
4b628b48
JB
78 /** @inheritdoc */
79 public enqueueTask (task: Task<Data>): number {
72695f86 80 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 81 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 82 this.onBackPressureStarted = true
e1c2dba7 83 this.emit('backPressure', { workerId: this.info.id as number })
47352846 84 this.onBackPressureStarted = false
72695f86
JB
85 }
86 return tasksQueueSize
87 }
88
89 /** @inheritdoc */
90 public unshiftTask (task: Task<Data>): number {
91 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 92 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 93 this.onBackPressureStarted = true
e1c2dba7 94 this.emit('backPressure', { workerId: this.info.id as number })
47352846 95 this.onBackPressureStarted = false
72695f86
JB
96 }
97 return tasksQueueSize
4b628b48
JB
98 }
99
100 /** @inheritdoc */
101 public dequeueTask (): Task<Data> | undefined {
463226a4 102 return this.tasksQueue.shift()
4b628b48
JB
103 }
104
72695f86
JB
105 /** @inheritdoc */
106 public popTask (): Task<Data> | undefined {
463226a4 107 return this.tasksQueue.pop()
72695f86
JB
108 }
109
4b628b48
JB
110 /** @inheritdoc */
111 public clearTasksQueue (): void {
112 this.tasksQueue.clear()
113 }
114
671d5154
JB
115 /** @inheritdoc */
116 public hasBackPressure (): boolean {
8735b4e5 117 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
118 }
119
ff128cc9 120 /** @inheritdoc */
4b628b48
JB
121 public resetUsage (): void {
122 this.usage = this.initWorkerUsage()
db0e38ee 123 this.taskFunctionsUsage.clear()
ff128cc9
JB
124 }
125
3f09ed9f 126 /** @inheritdoc */
07e0c9e5
JB
127 public async terminate (): Promise<void> {
128 const waitWorkerExit = new Promise<void>(resolve => {
129 this.registerOnceWorkerEventHandler('exit', () => {
130 resolve()
131 })
132 })
133 this.closeMessageChannel()
134 this.removeAllListeners()
839b98b8
JB
135 switch (this.info.type) {
136 case WorkerTypes.thread:
137 await this.worker.terminate?.()
138 break
139 case WorkerTypes.cluster:
140 this.registerOnceWorkerEventHandler('disconnect', () => {
141 this.worker.kill?.()
142 })
143 this.worker.disconnect?.()
144 break
3f09ed9f 145 }
07e0c9e5 146 await waitWorkerExit
3f09ed9f
JB
147 }
148
c3719753
JB
149 /** @inheritdoc */
150 public registerWorkerEventHandler (
151 event: string,
88af9bf1 152 handler:
c3719753
JB
153 | OnlineHandler<Worker>
154 | MessageHandler<Worker>
155 | ErrorHandler<Worker>
156 | ExitHandler<Worker>
157 ): void {
88af9bf1 158 this.worker.on(event, handler)
c3719753
JB
159 }
160
161 /** @inheritdoc */
162 public registerOnceWorkerEventHandler (
163 event: string,
88af9bf1 164 handler:
c3719753
JB
165 | OnlineHandler<Worker>
166 | MessageHandler<Worker>
167 | ErrorHandler<Worker>
168 | ExitHandler<Worker>
169 ): void {
88af9bf1 170 this.worker.once(event, handler)
c3719753
JB
171 }
172
ff128cc9 173 /** @inheritdoc */
db0e38ee 174 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 175 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 176 throw new Error(
db0e38ee 177 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
178 )
179 }
b558f6b5 180 if (
6703b9f4
JB
181 Array.isArray(this.info.taskFunctionNames) &&
182 this.info.taskFunctionNames.length < 3
b558f6b5 183 ) {
db0e38ee
JB
184 throw new Error(
185 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
186 )
187 }
188 if (name === DEFAULT_TASK_NAME) {
6703b9f4 189 name = this.info.taskFunctionNames[1]
b558f6b5 190 }
db0e38ee
JB
191 if (!this.taskFunctionsUsage.has(name)) {
192 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 193 }
db0e38ee 194 return this.taskFunctionsUsage.get(name)
4b628b48
JB
195 }
196
adee6053
JB
197 /** @inheritdoc */
198 public deleteTaskFunctionWorkerUsage (name: string): boolean {
199 return this.taskFunctionsUsage.delete(name)
200 }
201
07e0c9e5
JB
202 private closeMessageChannel (): void {
203 if (this.messageChannel != null) {
204 this.messageChannel.port1.unref()
205 this.messageChannel.port2.unref()
206 this.messageChannel.port1.close()
207 this.messageChannel.port2.close()
208 delete this.messageChannel
209 }
210 }
211
75de9f41 212 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 213 return {
75de9f41
JB
214 id: getWorkerId(worker),
215 type: getWorkerType(worker) as WorkerType,
4b628b48 216 dynamic: false,
5eb72b9e
JB
217 ready: false,
218 stealing: false
4b628b48
JB
219 }
220 }
221
222 private initWorkerUsage (): WorkerUsage {
223 const getTasksQueueSize = (): number => {
dd951876 224 return this.tasksQueue.size
4b628b48 225 }
bf4ef2ca 226 const getTasksQueueMaxSize = (): number => {
dd951876 227 return this.tasksQueue.maxSize
4b628b48
JB
228 }
229 return {
230 tasks: {
231 executed: 0,
232 executing: 0,
233 get queued (): number {
234 return getTasksQueueSize()
235 },
236 get maxQueued (): number {
bf4ef2ca 237 return getTasksQueueMaxSize()
4b628b48 238 },
463226a4 239 sequentiallyStolen: 0,
68cbdc84 240 stolen: 0,
4b628b48
JB
241 failed: 0
242 },
243 runTime: {
c52475b8 244 history: new CircularArray<number>()
4b628b48
JB
245 },
246 waitTime: {
c52475b8 247 history: new CircularArray<number>()
4b628b48
JB
248 },
249 elu: {
250 idle: {
c52475b8 251 history: new CircularArray<number>()
4b628b48
JB
252 },
253 active: {
c52475b8 254 history: new CircularArray<number>()
4b628b48
JB
255 }
256 }
257 }
258 }
259
db0e38ee 260 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
261 const getTaskFunctionQueueSize = (): number => {
262 let taskFunctionQueueSize = 0
b25a42cd 263 for (const task of this.tasksQueue) {
dd92a715 264 if (
e5ece61d 265 (task.name === DEFAULT_TASK_NAME &&
6703b9f4 266 name === (this.info.taskFunctionNames as string[])[1]) ||
e5ece61d 267 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 268 ) {
e5ece61d 269 ++taskFunctionQueueSize
b25a42cd
JB
270 }
271 }
e5ece61d 272 return taskFunctionQueueSize
b25a42cd
JB
273 }
274 return {
275 tasks: {
276 executed: 0,
277 executing: 0,
278 get queued (): number {
e5ece61d 279 return getTaskFunctionQueueSize()
b25a42cd 280 },
463226a4 281 sequentiallyStolen: 0,
68cbdc84 282 stolen: 0,
b25a42cd
JB
283 failed: 0
284 },
285 runTime: {
c52475b8 286 history: new CircularArray<number>()
b25a42cd
JB
287 },
288 waitTime: {
c52475b8 289 history: new CircularArray<number>()
b25a42cd
JB
290 },
291 elu: {
292 idle: {
c52475b8 293 history: new CircularArray<number>()
b25a42cd
JB
294 },
295 active: {
c52475b8 296 history: new CircularArray<number>()
b25a42cd
JB
297 }
298 }
299 }
300 }
4b628b48 301}