fix: ensure `worker_threads` workers are unreferenced at termination
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
e1c2dba7 2import { EventEmitter } from 'node:events'
d35e5717
JB
3import { CircularArray } from '../circular-array.js'
4import type { Task } from '../utility-types.js'
e9ed6eee 5import { DEFAULT_TASK_NAME } from '../utils.js'
d35e5717 6import { Deque } from '../deque.js'
4b628b48 7import {
3bcbd4c5 8 type EventHandler,
4b628b48
JB
9 type IWorker,
10 type IWorkerNode,
f3a91bac 11 type StrategyData,
4b628b48 12 type WorkerInfo,
c3719753 13 type WorkerNodeOptions,
4b628b48
JB
14 type WorkerType,
15 WorkerTypes,
16 type WorkerUsage
d35e5717 17} from './worker.js'
e9ed6eee
JB
18import {
19 checkWorkerNodeArguments,
20 createWorker,
21 getWorkerId,
22 getWorkerType
23} from './utils.js'
4b628b48 24
60664f48
JB
25/**
26 * Worker node.
27 *
28 * @typeParam Worker - Type of worker.
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 */
4b628b48 31export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 32 extends EventEmitter
9f95d5eb 33 implements IWorkerNode<Worker, Data> {
671d5154 34 /** @inheritdoc */
4b628b48 35 public readonly worker: Worker
671d5154 36 /** @inheritdoc */
4b628b48 37 public readonly info: WorkerInfo
671d5154 38 /** @inheritdoc */
4b628b48 39 public usage: WorkerUsage
20c6f652 40 /** @inheritdoc */
f3a91bac
JB
41 public strategyData?: StrategyData
42 /** @inheritdoc */
26fb3c18
JB
43 public messageChannel?: MessageChannel
44 /** @inheritdoc */
20c6f652 45 public tasksQueueBackPressureSize: number
574b351d 46 private readonly tasksQueue: Deque<Task<Data>>
47352846 47 private onBackPressureStarted: boolean
26fb3c18 48 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 49
60664f48
JB
50 /**
51 * Constructs a new worker node.
52 *
c3719753 53 * @param type - The worker type.
9974369e 54 * @param filePath - Path to the worker file.
c3719753 55 * @param opts - The worker node options.
60664f48 56 */
c3719753 57 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 58 super()
c3719753
JB
59 checkWorkerNodeArguments(type, filePath, opts)
60 this.worker = createWorker<Worker>(type, filePath, {
61 env: opts.env,
62 workerOptions: opts.workerOptions
63 })
64 this.info = this.initWorkerInfo(this.worker)
26fb3c18 65 this.usage = this.initWorkerUsage()
75de9f41 66 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
67 this.messageChannel = new MessageChannel()
68 }
c63a35a0
JB
69 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
70 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
26fb3c18 71 this.tasksQueue = new Deque<Task<Data>>()
47352846 72 this.onBackPressureStarted = false
26fb3c18 73 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
74 }
75
76 /** @inheritdoc */
77 public tasksQueueSize (): number {
78 return this.tasksQueue.size
79 }
80
4b628b48
JB
81 /** @inheritdoc */
82 public enqueueTask (task: Task<Data>): number {
72695f86 83 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 84 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 85 this.onBackPressureStarted = true
7f0e1334 86 this.emit('backPressure', { workerId: this.info.id })
47352846 87 this.onBackPressureStarted = false
72695f86
JB
88 }
89 return tasksQueueSize
90 }
91
92 /** @inheritdoc */
93 public unshiftTask (task: Task<Data>): number {
94 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 95 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 96 this.onBackPressureStarted = true
7f0e1334 97 this.emit('backPressure', { workerId: this.info.id })
47352846 98 this.onBackPressureStarted = false
72695f86
JB
99 }
100 return tasksQueueSize
4b628b48
JB
101 }
102
103 /** @inheritdoc */
104 public dequeueTask (): Task<Data> | undefined {
463226a4 105 return this.tasksQueue.shift()
4b628b48
JB
106 }
107
72695f86
JB
108 /** @inheritdoc */
109 public popTask (): Task<Data> | undefined {
463226a4 110 return this.tasksQueue.pop()
72695f86
JB
111 }
112
4b628b48
JB
113 /** @inheritdoc */
114 public clearTasksQueue (): void {
115 this.tasksQueue.clear()
116 }
117
671d5154
JB
118 /** @inheritdoc */
119 public hasBackPressure (): boolean {
8735b4e5 120 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
121 }
122
ff128cc9 123 /** @inheritdoc */
4b628b48
JB
124 public resetUsage (): void {
125 this.usage = this.initWorkerUsage()
db0e38ee 126 this.taskFunctionsUsage.clear()
ff128cc9
JB
127 }
128
3f09ed9f 129 /** @inheritdoc */
07e0c9e5
JB
130 public async terminate (): Promise<void> {
131 const waitWorkerExit = new Promise<void>(resolve => {
132 this.registerOnceWorkerEventHandler('exit', () => {
133 resolve()
134 })
135 })
136 this.closeMessageChannel()
137 this.removeAllListeners()
839b98b8
JB
138 switch (this.info.type) {
139 case WorkerTypes.thread:
d20cde84 140 this.worker.unref?.()
839b98b8
JB
141 await this.worker.terminate?.()
142 break
143 case WorkerTypes.cluster:
144 this.registerOnceWorkerEventHandler('disconnect', () => {
145 this.worker.kill?.()
146 })
147 this.worker.disconnect?.()
148 break
3f09ed9f 149 }
07e0c9e5 150 await waitWorkerExit
3f09ed9f
JB
151 }
152
c3719753
JB
153 /** @inheritdoc */
154 public registerWorkerEventHandler (
155 event: string,
3bcbd4c5 156 handler: EventHandler<Worker>
c3719753 157 ): void {
88af9bf1 158 this.worker.on(event, handler)
c3719753
JB
159 }
160
161 /** @inheritdoc */
162 public registerOnceWorkerEventHandler (
163 event: string,
3bcbd4c5 164 handler: EventHandler<Worker>
c3719753 165 ): void {
88af9bf1 166 this.worker.once(event, handler)
c3719753
JB
167 }
168
ff128cc9 169 /** @inheritdoc */
db0e38ee 170 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 171 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 172 throw new Error(
db0e38ee 173 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
174 )
175 }
b558f6b5 176 if (
6703b9f4
JB
177 Array.isArray(this.info.taskFunctionNames) &&
178 this.info.taskFunctionNames.length < 3
b558f6b5 179 ) {
db0e38ee
JB
180 throw new Error(
181 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
182 )
183 }
184 if (name === DEFAULT_TASK_NAME) {
6703b9f4 185 name = this.info.taskFunctionNames[1]
b558f6b5 186 }
db0e38ee
JB
187 if (!this.taskFunctionsUsage.has(name)) {
188 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 189 }
db0e38ee 190 return this.taskFunctionsUsage.get(name)
4b628b48
JB
191 }
192
adee6053
JB
193 /** @inheritdoc */
194 public deleteTaskFunctionWorkerUsage (name: string): boolean {
195 return this.taskFunctionsUsage.delete(name)
196 }
197
07e0c9e5
JB
198 private closeMessageChannel (): void {
199 if (this.messageChannel != null) {
200 this.messageChannel.port1.unref()
201 this.messageChannel.port2.unref()
202 this.messageChannel.port1.close()
203 this.messageChannel.port2.close()
204 delete this.messageChannel
205 }
206 }
207
75de9f41 208 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 209 return {
75de9f41 210 id: getWorkerId(worker),
67f3f2d6
JB
211 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
212 type: getWorkerType(worker)!,
4b628b48 213 dynamic: false,
5eb72b9e
JB
214 ready: false,
215 stealing: false
4b628b48
JB
216 }
217 }
218
219 private initWorkerUsage (): WorkerUsage {
220 const getTasksQueueSize = (): number => {
dd951876 221 return this.tasksQueue.size
4b628b48 222 }
bf4ef2ca 223 const getTasksQueueMaxSize = (): number => {
dd951876 224 return this.tasksQueue.maxSize
4b628b48
JB
225 }
226 return {
227 tasks: {
228 executed: 0,
229 executing: 0,
230 get queued (): number {
231 return getTasksQueueSize()
232 },
233 get maxQueued (): number {
bf4ef2ca 234 return getTasksQueueMaxSize()
4b628b48 235 },
463226a4 236 sequentiallyStolen: 0,
68cbdc84 237 stolen: 0,
4b628b48
JB
238 failed: 0
239 },
240 runTime: {
c52475b8 241 history: new CircularArray<number>()
4b628b48
JB
242 },
243 waitTime: {
c52475b8 244 history: new CircularArray<number>()
4b628b48
JB
245 },
246 elu: {
247 idle: {
c52475b8 248 history: new CircularArray<number>()
4b628b48
JB
249 },
250 active: {
c52475b8 251 history: new CircularArray<number>()
4b628b48
JB
252 }
253 }
254 }
255 }
256
db0e38ee 257 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
258 const getTaskFunctionQueueSize = (): number => {
259 let taskFunctionQueueSize = 0
b25a42cd 260 for (const task of this.tasksQueue) {
dd92a715 261 if (
e5ece61d 262 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6
JB
263 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
264 name === this.info.taskFunctionNames![1]) ||
e5ece61d 265 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 266 ) {
e5ece61d 267 ++taskFunctionQueueSize
b25a42cd
JB
268 }
269 }
e5ece61d 270 return taskFunctionQueueSize
b25a42cd
JB
271 }
272 return {
273 tasks: {
274 executed: 0,
275 executing: 0,
276 get queued (): number {
e5ece61d 277 return getTaskFunctionQueueSize()
b25a42cd 278 },
463226a4 279 sequentiallyStolen: 0,
68cbdc84 280 stolen: 0,
b25a42cd
JB
281 failed: 0
282 },
283 runTime: {
c52475b8 284 history: new CircularArray<number>()
b25a42cd
JB
285 },
286 waitTime: {
c52475b8 287 history: new CircularArray<number>()
b25a42cd
JB
288 },
289 elu: {
290 idle: {
c52475b8 291 history: new CircularArray<number>()
b25a42cd
JB
292 },
293 active: {
c52475b8 294 history: new CircularArray<number>()
b25a42cd
JB
295 }
296 }
297 }
298 }
4b628b48 299}