refactor: factor out worker node termination code
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
e1c2dba7 2import { EventEmitter } from 'node:events'
4b628b48 3import { CircularArray } from '../circular-array'
5c4d16da 4import type { Task } from '../utility-types'
463226a4 5import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
574b351d 6import { Deque } from '../deque'
4b628b48 7import {
c3719753
JB
8 type ErrorHandler,
9 type ExitHandler,
4b628b48
JB
10 type IWorker,
11 type IWorkerNode,
c3719753
JB
12 type MessageHandler,
13 type OnlineHandler,
f3a91bac 14 type StrategyData,
4b628b48 15 type WorkerInfo,
c3719753 16 type WorkerNodeOptions,
4b628b48
JB
17 type WorkerType,
18 WorkerTypes,
19 type WorkerUsage
20} from './worker'
c3719753 21import { checkWorkerNodeArguments, createWorker } from './utils'
4b628b48 22
60664f48
JB
23/**
24 * Worker node.
25 *
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 */
4b628b48 29export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 30 extends EventEmitter
9f95d5eb 31 implements IWorkerNode<Worker, Data> {
671d5154 32 /** @inheritdoc */
4b628b48 33 public readonly worker: Worker
671d5154 34 /** @inheritdoc */
4b628b48 35 public readonly info: WorkerInfo
671d5154 36 /** @inheritdoc */
4b628b48 37 public usage: WorkerUsage
20c6f652 38 /** @inheritdoc */
f3a91bac
JB
39 public strategyData?: StrategyData
40 /** @inheritdoc */
26fb3c18
JB
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
20c6f652 43 public tasksQueueBackPressureSize: number
574b351d 44 private readonly tasksQueue: Deque<Task<Data>>
47352846 45 private onBackPressureStarted: boolean
26fb3c18 46 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 47
60664f48
JB
48 /**
49 * Constructs a new worker node.
50 *
c3719753
JB
51 * @param type - The worker type.
52 * @param filePath - The worker file path.
53 * @param opts - The worker node options.
60664f48 54 */
c3719753 55 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 56 super()
c3719753
JB
57 checkWorkerNodeArguments(type, filePath, opts)
58 this.worker = createWorker<Worker>(type, filePath, {
59 env: opts.env,
60 workerOptions: opts.workerOptions
61 })
62 this.info = this.initWorkerInfo(this.worker)
26fb3c18 63 this.usage = this.initWorkerUsage()
75de9f41 64 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
65 this.messageChannel = new MessageChannel()
66 }
c3719753 67 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
26fb3c18 68 this.tasksQueue = new Deque<Task<Data>>()
47352846 69 this.onBackPressureStarted = false
26fb3c18 70 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
71 }
72
73 /** @inheritdoc */
74 public tasksQueueSize (): number {
75 return this.tasksQueue.size
76 }
77
4b628b48
JB
78 /** @inheritdoc */
79 public enqueueTask (task: Task<Data>): number {
72695f86 80 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 81 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 82 this.onBackPressureStarted = true
e1c2dba7 83 this.emit('backPressure', { workerId: this.info.id as number })
47352846 84 this.onBackPressureStarted = false
72695f86
JB
85 }
86 return tasksQueueSize
87 }
88
89 /** @inheritdoc */
90 public unshiftTask (task: Task<Data>): number {
91 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 92 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 93 this.onBackPressureStarted = true
e1c2dba7 94 this.emit('backPressure', { workerId: this.info.id as number })
47352846 95 this.onBackPressureStarted = false
72695f86
JB
96 }
97 return tasksQueueSize
4b628b48
JB
98 }
99
100 /** @inheritdoc */
101 public dequeueTask (): Task<Data> | undefined {
463226a4 102 return this.tasksQueue.shift()
4b628b48
JB
103 }
104
72695f86
JB
105 /** @inheritdoc */
106 public popTask (): Task<Data> | undefined {
463226a4 107 return this.tasksQueue.pop()
72695f86
JB
108 }
109
4b628b48
JB
110 /** @inheritdoc */
111 public clearTasksQueue (): void {
112 this.tasksQueue.clear()
113 }
114
671d5154
JB
115 /** @inheritdoc */
116 public hasBackPressure (): boolean {
8735b4e5 117 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
118 }
119
ff128cc9 120 /** @inheritdoc */
4b628b48
JB
121 public resetUsage (): void {
122 this.usage = this.initWorkerUsage()
db0e38ee 123 this.taskFunctionsUsage.clear()
ff128cc9
JB
124 }
125
3f09ed9f 126 /** @inheritdoc */
07e0c9e5
JB
127 public async terminate (): Promise<void> {
128 const waitWorkerExit = new Promise<void>(resolve => {
129 this.registerOnceWorkerEventHandler('exit', () => {
130 resolve()
131 })
132 })
133 this.closeMessageChannel()
134 this.removeAllListeners()
135 if (this.info.type === WorkerTypes.thread) {
136 await this.worker.terminate?.()
137 } else if (this.info.type === WorkerTypes.cluster) {
138 this.registerOnceWorkerEventHandler('disconnect', () => {
139 this.worker.kill?.()
140 })
141 this.worker.disconnect?.()
3f09ed9f 142 }
07e0c9e5 143 await waitWorkerExit
3f09ed9f
JB
144 }
145
c3719753
JB
146 /** @inheritdoc */
147 public registerWorkerEventHandler (
148 event: string,
149 listener:
150 | OnlineHandler<Worker>
151 | MessageHandler<Worker>
152 | ErrorHandler<Worker>
153 | ExitHandler<Worker>
154 ): void {
155 this.worker.on(event, listener)
156 }
157
158 /** @inheritdoc */
159 public registerOnceWorkerEventHandler (
160 event: string,
161 listener:
162 | OnlineHandler<Worker>
163 | MessageHandler<Worker>
164 | ErrorHandler<Worker>
165 | ExitHandler<Worker>
166 ): void {
167 this.worker.once(event, listener)
168 }
169
ff128cc9 170 /** @inheritdoc */
db0e38ee 171 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 172 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 173 throw new Error(
db0e38ee 174 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
175 )
176 }
b558f6b5 177 if (
6703b9f4
JB
178 Array.isArray(this.info.taskFunctionNames) &&
179 this.info.taskFunctionNames.length < 3
b558f6b5 180 ) {
db0e38ee
JB
181 throw new Error(
182 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
183 )
184 }
185 if (name === DEFAULT_TASK_NAME) {
6703b9f4 186 name = this.info.taskFunctionNames[1]
b558f6b5 187 }
db0e38ee
JB
188 if (!this.taskFunctionsUsage.has(name)) {
189 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 190 }
db0e38ee 191 return this.taskFunctionsUsage.get(name)
4b628b48
JB
192 }
193
adee6053
JB
194 /** @inheritdoc */
195 public deleteTaskFunctionWorkerUsage (name: string): boolean {
196 return this.taskFunctionsUsage.delete(name)
197 }
198
07e0c9e5
JB
199 private closeMessageChannel (): void {
200 if (this.messageChannel != null) {
201 this.messageChannel.port1.unref()
202 this.messageChannel.port2.unref()
203 this.messageChannel.port1.close()
204 this.messageChannel.port2.close()
205 delete this.messageChannel
206 }
207 }
208
75de9f41 209 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 210 return {
75de9f41
JB
211 id: getWorkerId(worker),
212 type: getWorkerType(worker) as WorkerType,
4b628b48 213 dynamic: false,
7884d183 214 ready: false
4b628b48
JB
215 }
216 }
217
218 private initWorkerUsage (): WorkerUsage {
219 const getTasksQueueSize = (): number => {
dd951876 220 return this.tasksQueue.size
4b628b48 221 }
bf4ef2ca 222 const getTasksQueueMaxSize = (): number => {
dd951876 223 return this.tasksQueue.maxSize
4b628b48
JB
224 }
225 return {
226 tasks: {
227 executed: 0,
228 executing: 0,
229 get queued (): number {
230 return getTasksQueueSize()
231 },
232 get maxQueued (): number {
bf4ef2ca 233 return getTasksQueueMaxSize()
4b628b48 234 },
463226a4 235 sequentiallyStolen: 0,
68cbdc84 236 stolen: 0,
4b628b48
JB
237 failed: 0
238 },
239 runTime: {
c52475b8 240 history: new CircularArray<number>()
4b628b48
JB
241 },
242 waitTime: {
c52475b8 243 history: new CircularArray<number>()
4b628b48
JB
244 },
245 elu: {
246 idle: {
c52475b8 247 history: new CircularArray<number>()
4b628b48
JB
248 },
249 active: {
c52475b8 250 history: new CircularArray<number>()
4b628b48
JB
251 }
252 }
253 }
254 }
255
db0e38ee 256 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
257 const getTaskFunctionQueueSize = (): number => {
258 let taskFunctionQueueSize = 0
b25a42cd 259 for (const task of this.tasksQueue) {
dd92a715 260 if (
e5ece61d 261 (task.name === DEFAULT_TASK_NAME &&
6703b9f4 262 name === (this.info.taskFunctionNames as string[])[1]) ||
e5ece61d 263 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 264 ) {
e5ece61d 265 ++taskFunctionQueueSize
b25a42cd
JB
266 }
267 }
e5ece61d 268 return taskFunctionQueueSize
b25a42cd
JB
269 }
270 return {
271 tasks: {
272 executed: 0,
273 executing: 0,
274 get queued (): number {
e5ece61d 275 return getTaskFunctionQueueSize()
b25a42cd 276 },
463226a4 277 sequentiallyStolen: 0,
68cbdc84 278 stolen: 0,
b25a42cd
JB
279 failed: 0
280 },
281 runTime: {
c52475b8 282 history: new CircularArray<number>()
b25a42cd
JB
283 },
284 waitTime: {
c52475b8 285 history: new CircularArray<number>()
b25a42cd
JB
286 },
287 elu: {
288 idle: {
c52475b8 289 history: new CircularArray<number>()
b25a42cd
JB
290 },
291 active: {
c52475b8 292 history: new CircularArray<number>()
b25a42cd
JB
293 }
294 }
295 }
296 }
4b628b48 297}