refactor: cleanup dead code
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
e1c2dba7 1import { EventEmitter } from 'node:events'
ded253e2
JB
2import { MessageChannel } from 'node:worker_threads'
3
d35e5717 4import { CircularArray } from '../circular-array.js'
ded253e2 5import { Deque } from '../deque.js'
d35e5717 6import type { Task } from '../utility-types.js'
e9ed6eee 7import { DEFAULT_TASK_NAME } from '../utils.js'
ded253e2
JB
8import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13} from './utils.js'
4b628b48 14import {
3bcbd4c5 15 type EventHandler,
4b628b48
JB
16 type IWorker,
17 type IWorkerNode,
f3a91bac 18 type StrategyData,
4b628b48 19 type WorkerInfo,
c3719753 20 type WorkerNodeOptions,
4b628b48
JB
21 type WorkerType,
22 WorkerTypes,
23 type WorkerUsage
d35e5717 24} from './worker.js'
4b628b48 25
60664f48
JB
26/**
27 * Worker node.
28 *
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
4b628b48 32export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 33 extends EventEmitter
9f95d5eb 34 implements IWorkerNode<Worker, Data> {
671d5154 35 /** @inheritdoc */
4b628b48 36 public readonly worker: Worker
671d5154 37 /** @inheritdoc */
4b628b48 38 public readonly info: WorkerInfo
671d5154 39 /** @inheritdoc */
4b628b48 40 public usage: WorkerUsage
20c6f652 41 /** @inheritdoc */
f3a91bac
JB
42 public strategyData?: StrategyData
43 /** @inheritdoc */
26fb3c18
JB
44 public messageChannel?: MessageChannel
45 /** @inheritdoc */
20c6f652 46 public tasksQueueBackPressureSize: number
574b351d 47 private readonly tasksQueue: Deque<Task<Data>>
47352846 48 private onBackPressureStarted: boolean
26fb3c18 49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 50
60664f48
JB
51 /**
52 * Constructs a new worker node.
53 *
c3719753 54 * @param type - The worker type.
9974369e 55 * @param filePath - Path to the worker file.
c3719753 56 * @param opts - The worker node options.
60664f48 57 */
c3719753 58 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 59 super()
c3719753
JB
60 checkWorkerNodeArguments(type, filePath, opts)
61 this.worker = createWorker<Worker>(type, filePath, {
62 env: opts.env,
63 workerOptions: opts.workerOptions
64 })
65 this.info = this.initWorkerInfo(this.worker)
26fb3c18 66 this.usage = this.initWorkerUsage()
75de9f41 67 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
68 this.messageChannel = new MessageChannel()
69 }
c63a35a0
JB
70 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
71 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
26fb3c18 72 this.tasksQueue = new Deque<Task<Data>>()
47352846 73 this.onBackPressureStarted = false
26fb3c18 74 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
75 }
76
77 /** @inheritdoc */
78 public tasksQueueSize (): number {
79 return this.tasksQueue.size
80 }
81
4b628b48
JB
82 /** @inheritdoc */
83 public enqueueTask (task: Task<Data>): number {
72695f86 84 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 85 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 86 this.onBackPressureStarted = true
7f0e1334 87 this.emit('backPressure', { workerId: this.info.id })
47352846 88 this.onBackPressureStarted = false
72695f86
JB
89 }
90 return tasksQueueSize
91 }
92
93 /** @inheritdoc */
94 public unshiftTask (task: Task<Data>): number {
95 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 96 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 97 this.onBackPressureStarted = true
7f0e1334 98 this.emit('backPressure', { workerId: this.info.id })
47352846 99 this.onBackPressureStarted = false
72695f86
JB
100 }
101 return tasksQueueSize
4b628b48
JB
102 }
103
104 /** @inheritdoc */
105 public dequeueTask (): Task<Data> | undefined {
463226a4 106 return this.tasksQueue.shift()
4b628b48
JB
107 }
108
72695f86
JB
109 /** @inheritdoc */
110 public popTask (): Task<Data> | undefined {
463226a4 111 return this.tasksQueue.pop()
72695f86
JB
112 }
113
4b628b48
JB
114 /** @inheritdoc */
115 public clearTasksQueue (): void {
116 this.tasksQueue.clear()
117 }
118
671d5154
JB
119 /** @inheritdoc */
120 public hasBackPressure (): boolean {
8735b4e5 121 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
122 }
123
3f09ed9f 124 /** @inheritdoc */
07e0c9e5
JB
125 public async terminate (): Promise<void> {
126 const waitWorkerExit = new Promise<void>(resolve => {
127 this.registerOnceWorkerEventHandler('exit', () => {
128 resolve()
129 })
130 })
131 this.closeMessageChannel()
132 this.removeAllListeners()
839b98b8
JB
133 switch (this.info.type) {
134 case WorkerTypes.thread:
d20cde84 135 this.worker.unref?.()
839b98b8
JB
136 await this.worker.terminate?.()
137 break
138 case WorkerTypes.cluster:
139 this.registerOnceWorkerEventHandler('disconnect', () => {
140 this.worker.kill?.()
141 })
142 this.worker.disconnect?.()
143 break
3f09ed9f 144 }
07e0c9e5 145 await waitWorkerExit
3f09ed9f
JB
146 }
147
c3719753
JB
148 /** @inheritdoc */
149 public registerWorkerEventHandler (
150 event: string,
3bcbd4c5 151 handler: EventHandler<Worker>
c3719753 152 ): void {
88af9bf1 153 this.worker.on(event, handler)
c3719753
JB
154 }
155
156 /** @inheritdoc */
157 public registerOnceWorkerEventHandler (
158 event: string,
3bcbd4c5 159 handler: EventHandler<Worker>
c3719753 160 ): void {
88af9bf1 161 this.worker.once(event, handler)
c3719753
JB
162 }
163
ff128cc9 164 /** @inheritdoc */
db0e38ee 165 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
31847469 166 if (!Array.isArray(this.info.taskFunctionsProperties)) {
71b2b6d8 167 throw new Error(
31847469 168 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
71b2b6d8
JB
169 )
170 }
b558f6b5 171 if (
31847469
JB
172 Array.isArray(this.info.taskFunctionsProperties) &&
173 this.info.taskFunctionsProperties.length < 3
b558f6b5 174 ) {
db0e38ee 175 throw new Error(
31847469 176 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
db0e38ee
JB
177 )
178 }
179 if (name === DEFAULT_TASK_NAME) {
31847469 180 name = this.info.taskFunctionsProperties[1].name
b558f6b5 181 }
db0e38ee
JB
182 if (!this.taskFunctionsUsage.has(name)) {
183 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 184 }
db0e38ee 185 return this.taskFunctionsUsage.get(name)
4b628b48
JB
186 }
187
adee6053
JB
188 /** @inheritdoc */
189 public deleteTaskFunctionWorkerUsage (name: string): boolean {
190 return this.taskFunctionsUsage.delete(name)
191 }
192
07e0c9e5
JB
193 private closeMessageChannel (): void {
194 if (this.messageChannel != null) {
195 this.messageChannel.port1.unref()
196 this.messageChannel.port2.unref()
197 this.messageChannel.port1.close()
198 this.messageChannel.port2.close()
199 delete this.messageChannel
200 }
201 }
202
75de9f41 203 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 204 return {
75de9f41 205 id: getWorkerId(worker),
67f3f2d6
JB
206 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
207 type: getWorkerType(worker)!,
4b628b48 208 dynamic: false,
5eb72b9e
JB
209 ready: false,
210 stealing: false
4b628b48
JB
211 }
212 }
213
214 private initWorkerUsage (): WorkerUsage {
215 const getTasksQueueSize = (): number => {
dd951876 216 return this.tasksQueue.size
4b628b48 217 }
bf4ef2ca 218 const getTasksQueueMaxSize = (): number => {
dd951876 219 return this.tasksQueue.maxSize
4b628b48
JB
220 }
221 return {
222 tasks: {
223 executed: 0,
224 executing: 0,
225 get queued (): number {
226 return getTasksQueueSize()
227 },
228 get maxQueued (): number {
bf4ef2ca 229 return getTasksQueueMaxSize()
4b628b48 230 },
463226a4 231 sequentiallyStolen: 0,
68cbdc84 232 stolen: 0,
4b628b48
JB
233 failed: 0
234 },
235 runTime: {
c52475b8 236 history: new CircularArray<number>()
4b628b48
JB
237 },
238 waitTime: {
c52475b8 239 history: new CircularArray<number>()
4b628b48
JB
240 },
241 elu: {
242 idle: {
c52475b8 243 history: new CircularArray<number>()
4b628b48
JB
244 },
245 active: {
c52475b8 246 history: new CircularArray<number>()
4b628b48
JB
247 }
248 }
249 }
250 }
251
db0e38ee 252 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
253 const getTaskFunctionQueueSize = (): number => {
254 let taskFunctionQueueSize = 0
b25a42cd 255 for (const task of this.tasksQueue) {
dd92a715 256 if (
e5ece61d 257 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6 258 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
31847469 259 name === this.info.taskFunctionsProperties![1].name) ||
e5ece61d 260 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 261 ) {
e5ece61d 262 ++taskFunctionQueueSize
b25a42cd
JB
263 }
264 }
e5ece61d 265 return taskFunctionQueueSize
b25a42cd
JB
266 }
267 return {
268 tasks: {
269 executed: 0,
270 executing: 0,
271 get queued (): number {
e5ece61d 272 return getTaskFunctionQueueSize()
b25a42cd 273 },
463226a4 274 sequentiallyStolen: 0,
68cbdc84 275 stolen: 0,
b25a42cd
JB
276 failed: 0
277 },
278 runTime: {
c52475b8 279 history: new CircularArray<number>()
b25a42cd
JB
280 },
281 waitTime: {
c52475b8 282 history: new CircularArray<number>()
b25a42cd
JB
283 },
284 elu: {
285 idle: {
c52475b8 286 history: new CircularArray<number>()
b25a42cd
JB
287 },
288 active: {
c52475b8 289 history: new CircularArray<number>()
b25a42cd
JB
290 }
291 }
292 }
293 }
4b628b48 294}