Merge dependabot/npm_and_yarn/examples/typescript/http-server-pool/express-cluster...
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
e1c2dba7 2import { EventEmitter } from 'node:events'
d35e5717
JB
3import { CircularArray } from '../circular-array.js'
4import type { Task } from '../utility-types.js'
5import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils.js'
6import { Deque } from '../deque.js'
4b628b48 7import {
3bcbd4c5 8 type EventHandler,
4b628b48
JB
9 type IWorker,
10 type IWorkerNode,
f3a91bac 11 type StrategyData,
4b628b48 12 type WorkerInfo,
c3719753 13 type WorkerNodeOptions,
4b628b48
JB
14 type WorkerType,
15 WorkerTypes,
16 type WorkerUsage
d35e5717
JB
17} from './worker.js'
18import { checkWorkerNodeArguments, createWorker } from './utils.js'
4b628b48 19
60664f48
JB
20/**
21 * Worker node.
22 *
23 * @typeParam Worker - Type of worker.
24 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
25 */
4b628b48 26export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 27 extends EventEmitter
9f95d5eb 28 implements IWorkerNode<Worker, Data> {
671d5154 29 /** @inheritdoc */
4b628b48 30 public readonly worker: Worker
671d5154 31 /** @inheritdoc */
4b628b48 32 public readonly info: WorkerInfo
671d5154 33 /** @inheritdoc */
4b628b48 34 public usage: WorkerUsage
20c6f652 35 /** @inheritdoc */
f3a91bac
JB
36 public strategyData?: StrategyData
37 /** @inheritdoc */
26fb3c18
JB
38 public messageChannel?: MessageChannel
39 /** @inheritdoc */
20c6f652 40 public tasksQueueBackPressureSize: number
574b351d 41 private readonly tasksQueue: Deque<Task<Data>>
47352846 42 private onBackPressureStarted: boolean
26fb3c18 43 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 44
60664f48
JB
45 /**
46 * Constructs a new worker node.
47 *
c3719753 48 * @param type - The worker type.
9974369e 49 * @param filePath - Path to the worker file.
c3719753 50 * @param opts - The worker node options.
60664f48 51 */
c3719753 52 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 53 super()
c3719753
JB
54 checkWorkerNodeArguments(type, filePath, opts)
55 this.worker = createWorker<Worker>(type, filePath, {
56 env: opts.env,
57 workerOptions: opts.workerOptions
58 })
59 this.info = this.initWorkerInfo(this.worker)
26fb3c18 60 this.usage = this.initWorkerUsage()
75de9f41 61 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
62 this.messageChannel = new MessageChannel()
63 }
c63a35a0
JB
64 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
65 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
26fb3c18 66 this.tasksQueue = new Deque<Task<Data>>()
47352846 67 this.onBackPressureStarted = false
26fb3c18 68 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
69 }
70
71 /** @inheritdoc */
72 public tasksQueueSize (): number {
73 return this.tasksQueue.size
74 }
75
4b628b48
JB
76 /** @inheritdoc */
77 public enqueueTask (task: Task<Data>): number {
72695f86 78 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 79 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 80 this.onBackPressureStarted = true
7f0e1334 81 this.emit('backPressure', { workerId: this.info.id })
47352846 82 this.onBackPressureStarted = false
72695f86
JB
83 }
84 return tasksQueueSize
85 }
86
87 /** @inheritdoc */
88 public unshiftTask (task: Task<Data>): number {
89 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 90 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 91 this.onBackPressureStarted = true
7f0e1334 92 this.emit('backPressure', { workerId: this.info.id })
47352846 93 this.onBackPressureStarted = false
72695f86
JB
94 }
95 return tasksQueueSize
4b628b48
JB
96 }
97
98 /** @inheritdoc */
99 public dequeueTask (): Task<Data> | undefined {
463226a4 100 return this.tasksQueue.shift()
4b628b48
JB
101 }
102
72695f86
JB
103 /** @inheritdoc */
104 public popTask (): Task<Data> | undefined {
463226a4 105 return this.tasksQueue.pop()
72695f86
JB
106 }
107
4b628b48
JB
108 /** @inheritdoc */
109 public clearTasksQueue (): void {
110 this.tasksQueue.clear()
111 }
112
671d5154
JB
113 /** @inheritdoc */
114 public hasBackPressure (): boolean {
8735b4e5 115 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
116 }
117
ff128cc9 118 /** @inheritdoc */
4b628b48
JB
119 public resetUsage (): void {
120 this.usage = this.initWorkerUsage()
db0e38ee 121 this.taskFunctionsUsage.clear()
ff128cc9
JB
122 }
123
3f09ed9f 124 /** @inheritdoc */
07e0c9e5
JB
125 public async terminate (): Promise<void> {
126 const waitWorkerExit = new Promise<void>(resolve => {
127 this.registerOnceWorkerEventHandler('exit', () => {
128 resolve()
129 })
130 })
131 this.closeMessageChannel()
132 this.removeAllListeners()
839b98b8
JB
133 switch (this.info.type) {
134 case WorkerTypes.thread:
135 await this.worker.terminate?.()
136 break
137 case WorkerTypes.cluster:
138 this.registerOnceWorkerEventHandler('disconnect', () => {
139 this.worker.kill?.()
140 })
141 this.worker.disconnect?.()
142 break
3f09ed9f 143 }
07e0c9e5 144 await waitWorkerExit
3f09ed9f
JB
145 }
146
c3719753
JB
147 /** @inheritdoc */
148 public registerWorkerEventHandler (
149 event: string,
3bcbd4c5 150 handler: EventHandler<Worker>
c3719753 151 ): void {
88af9bf1 152 this.worker.on(event, handler)
c3719753
JB
153 }
154
155 /** @inheritdoc */
156 public registerOnceWorkerEventHandler (
157 event: string,
3bcbd4c5 158 handler: EventHandler<Worker>
c3719753 159 ): void {
88af9bf1 160 this.worker.once(event, handler)
c3719753
JB
161 }
162
ff128cc9 163 /** @inheritdoc */
db0e38ee 164 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 165 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 166 throw new Error(
db0e38ee 167 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
168 )
169 }
b558f6b5 170 if (
6703b9f4
JB
171 Array.isArray(this.info.taskFunctionNames) &&
172 this.info.taskFunctionNames.length < 3
b558f6b5 173 ) {
db0e38ee
JB
174 throw new Error(
175 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
176 )
177 }
178 if (name === DEFAULT_TASK_NAME) {
6703b9f4 179 name = this.info.taskFunctionNames[1]
b558f6b5 180 }
db0e38ee
JB
181 if (!this.taskFunctionsUsage.has(name)) {
182 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 183 }
db0e38ee 184 return this.taskFunctionsUsage.get(name)
4b628b48
JB
185 }
186
adee6053
JB
187 /** @inheritdoc */
188 public deleteTaskFunctionWorkerUsage (name: string): boolean {
189 return this.taskFunctionsUsage.delete(name)
190 }
191
07e0c9e5
JB
192 private closeMessageChannel (): void {
193 if (this.messageChannel != null) {
194 this.messageChannel.port1.unref()
195 this.messageChannel.port2.unref()
196 this.messageChannel.port1.close()
197 this.messageChannel.port2.close()
198 delete this.messageChannel
199 }
200 }
201
75de9f41 202 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 203 return {
75de9f41 204 id: getWorkerId(worker),
67f3f2d6
JB
205 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
206 type: getWorkerType(worker)!,
4b628b48 207 dynamic: false,
5eb72b9e
JB
208 ready: false,
209 stealing: false
4b628b48
JB
210 }
211 }
212
213 private initWorkerUsage (): WorkerUsage {
214 const getTasksQueueSize = (): number => {
dd951876 215 return this.tasksQueue.size
4b628b48 216 }
bf4ef2ca 217 const getTasksQueueMaxSize = (): number => {
dd951876 218 return this.tasksQueue.maxSize
4b628b48
JB
219 }
220 return {
221 tasks: {
222 executed: 0,
223 executing: 0,
224 get queued (): number {
225 return getTasksQueueSize()
226 },
227 get maxQueued (): number {
bf4ef2ca 228 return getTasksQueueMaxSize()
4b628b48 229 },
463226a4 230 sequentiallyStolen: 0,
68cbdc84 231 stolen: 0,
4b628b48
JB
232 failed: 0
233 },
234 runTime: {
c52475b8 235 history: new CircularArray<number>()
4b628b48
JB
236 },
237 waitTime: {
c52475b8 238 history: new CircularArray<number>()
4b628b48
JB
239 },
240 elu: {
241 idle: {
c52475b8 242 history: new CircularArray<number>()
4b628b48
JB
243 },
244 active: {
c52475b8 245 history: new CircularArray<number>()
4b628b48
JB
246 }
247 }
248 }
249 }
250
db0e38ee 251 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
252 const getTaskFunctionQueueSize = (): number => {
253 let taskFunctionQueueSize = 0
b25a42cd 254 for (const task of this.tasksQueue) {
dd92a715 255 if (
e5ece61d 256 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6
JB
257 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
258 name === this.info.taskFunctionNames![1]) ||
e5ece61d 259 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 260 ) {
e5ece61d 261 ++taskFunctionQueueSize
b25a42cd
JB
262 }
263 }
e5ece61d 264 return taskFunctionQueueSize
b25a42cd
JB
265 }
266 return {
267 tasks: {
268 executed: 0,
269 executing: 0,
270 get queued (): number {
e5ece61d 271 return getTaskFunctionQueueSize()
b25a42cd 272 },
463226a4 273 sequentiallyStolen: 0,
68cbdc84 274 stolen: 0,
b25a42cd
JB
275 failed: 0
276 },
277 runTime: {
c52475b8 278 history: new CircularArray<number>()
b25a42cd
JB
279 },
280 waitTime: {
c52475b8 281 history: new CircularArray<number>()
b25a42cd
JB
282 },
283 elu: {
284 idle: {
c52475b8 285 history: new CircularArray<number>()
b25a42cd
JB
286 },
287 active: {
c52475b8 288 history: new CircularArray<number>()
b25a42cd
JB
289 }
290 }
291 }
292 }
4b628b48 293}