feat: add task function properties support
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
e1c2dba7 1import { EventEmitter } from 'node:events'
ded253e2
JB
2import { MessageChannel } from 'node:worker_threads'
3
d35e5717 4import { CircularArray } from '../circular-array.js'
ded253e2 5import { Deque } from '../deque.js'
d35e5717 6import type { Task } from '../utility-types.js'
e9ed6eee 7import { DEFAULT_TASK_NAME } from '../utils.js'
ded253e2
JB
8import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13} from './utils.js'
4b628b48 14import {
3bcbd4c5 15 type EventHandler,
4b628b48
JB
16 type IWorker,
17 type IWorkerNode,
f3a91bac 18 type StrategyData,
4b628b48 19 type WorkerInfo,
c3719753 20 type WorkerNodeOptions,
4b628b48
JB
21 type WorkerType,
22 WorkerTypes,
23 type WorkerUsage
d35e5717 24} from './worker.js'
4b628b48 25
60664f48
JB
26/**
27 * Worker node.
28 *
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
4b628b48 32export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 33 extends EventEmitter
9f95d5eb 34 implements IWorkerNode<Worker, Data> {
671d5154 35 /** @inheritdoc */
4b628b48 36 public readonly worker: Worker
671d5154 37 /** @inheritdoc */
4b628b48 38 public readonly info: WorkerInfo
671d5154 39 /** @inheritdoc */
4b628b48 40 public usage: WorkerUsage
20c6f652 41 /** @inheritdoc */
f3a91bac
JB
42 public strategyData?: StrategyData
43 /** @inheritdoc */
26fb3c18
JB
44 public messageChannel?: MessageChannel
45 /** @inheritdoc */
20c6f652 46 public tasksQueueBackPressureSize: number
574b351d 47 private readonly tasksQueue: Deque<Task<Data>>
47352846 48 private onBackPressureStarted: boolean
26fb3c18 49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 50
60664f48
JB
51 /**
52 * Constructs a new worker node.
53 *
c3719753 54 * @param type - The worker type.
9974369e 55 * @param filePath - Path to the worker file.
c3719753 56 * @param opts - The worker node options.
60664f48 57 */
c3719753 58 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 59 super()
c3719753
JB
60 checkWorkerNodeArguments(type, filePath, opts)
61 this.worker = createWorker<Worker>(type, filePath, {
62 env: opts.env,
63 workerOptions: opts.workerOptions
64 })
65 this.info = this.initWorkerInfo(this.worker)
26fb3c18 66 this.usage = this.initWorkerUsage()
75de9f41 67 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
68 this.messageChannel = new MessageChannel()
69 }
c63a35a0
JB
70 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
71 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
26fb3c18 72 this.tasksQueue = new Deque<Task<Data>>()
47352846 73 this.onBackPressureStarted = false
26fb3c18 74 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
75 }
76
77 /** @inheritdoc */
78 public tasksQueueSize (): number {
79 return this.tasksQueue.size
80 }
81
4b628b48
JB
82 /** @inheritdoc */
83 public enqueueTask (task: Task<Data>): number {
72695f86 84 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 85 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 86 this.onBackPressureStarted = true
7f0e1334 87 this.emit('backPressure', { workerId: this.info.id })
47352846 88 this.onBackPressureStarted = false
72695f86
JB
89 }
90 return tasksQueueSize
91 }
92
93 /** @inheritdoc */
94 public unshiftTask (task: Task<Data>): number {
95 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 96 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 97 this.onBackPressureStarted = true
7f0e1334 98 this.emit('backPressure', { workerId: this.info.id })
47352846 99 this.onBackPressureStarted = false
72695f86
JB
100 }
101 return tasksQueueSize
4b628b48
JB
102 }
103
104 /** @inheritdoc */
105 public dequeueTask (): Task<Data> | undefined {
463226a4 106 return this.tasksQueue.shift()
4b628b48
JB
107 }
108
72695f86
JB
109 /** @inheritdoc */
110 public popTask (): Task<Data> | undefined {
463226a4 111 return this.tasksQueue.pop()
72695f86
JB
112 }
113
4b628b48
JB
114 /** @inheritdoc */
115 public clearTasksQueue (): void {
116 this.tasksQueue.clear()
117 }
118
671d5154
JB
119 /** @inheritdoc */
120 public hasBackPressure (): boolean {
8735b4e5 121 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
122 }
123
ff128cc9 124 /** @inheritdoc */
4b628b48
JB
125 public resetUsage (): void {
126 this.usage = this.initWorkerUsage()
db0e38ee 127 this.taskFunctionsUsage.clear()
ff128cc9
JB
128 }
129
3f09ed9f 130 /** @inheritdoc */
07e0c9e5
JB
131 public async terminate (): Promise<void> {
132 const waitWorkerExit = new Promise<void>(resolve => {
133 this.registerOnceWorkerEventHandler('exit', () => {
134 resolve()
135 })
136 })
137 this.closeMessageChannel()
138 this.removeAllListeners()
839b98b8
JB
139 switch (this.info.type) {
140 case WorkerTypes.thread:
d20cde84 141 this.worker.unref?.()
839b98b8
JB
142 await this.worker.terminate?.()
143 break
144 case WorkerTypes.cluster:
145 this.registerOnceWorkerEventHandler('disconnect', () => {
146 this.worker.kill?.()
147 })
148 this.worker.disconnect?.()
149 break
3f09ed9f 150 }
07e0c9e5 151 await waitWorkerExit
3f09ed9f
JB
152 }
153
c3719753
JB
154 /** @inheritdoc */
155 public registerWorkerEventHandler (
156 event: string,
3bcbd4c5 157 handler: EventHandler<Worker>
c3719753 158 ): void {
88af9bf1 159 this.worker.on(event, handler)
c3719753
JB
160 }
161
162 /** @inheritdoc */
163 public registerOnceWorkerEventHandler (
164 event: string,
3bcbd4c5 165 handler: EventHandler<Worker>
c3719753 166 ): void {
88af9bf1 167 this.worker.once(event, handler)
c3719753
JB
168 }
169
ff128cc9 170 /** @inheritdoc */
db0e38ee 171 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
31847469 172 if (!Array.isArray(this.info.taskFunctionsProperties)) {
71b2b6d8 173 throw new Error(
31847469 174 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
71b2b6d8
JB
175 )
176 }
b558f6b5 177 if (
31847469
JB
178 Array.isArray(this.info.taskFunctionsProperties) &&
179 this.info.taskFunctionsProperties.length < 3
b558f6b5 180 ) {
db0e38ee 181 throw new Error(
31847469 182 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
db0e38ee
JB
183 )
184 }
185 if (name === DEFAULT_TASK_NAME) {
31847469 186 name = this.info.taskFunctionsProperties[1].name
b558f6b5 187 }
db0e38ee
JB
188 if (!this.taskFunctionsUsage.has(name)) {
189 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 190 }
db0e38ee 191 return this.taskFunctionsUsage.get(name)
4b628b48
JB
192 }
193
adee6053
JB
194 /** @inheritdoc */
195 public deleteTaskFunctionWorkerUsage (name: string): boolean {
196 return this.taskFunctionsUsage.delete(name)
197 }
198
07e0c9e5
JB
199 private closeMessageChannel (): void {
200 if (this.messageChannel != null) {
201 this.messageChannel.port1.unref()
202 this.messageChannel.port2.unref()
203 this.messageChannel.port1.close()
204 this.messageChannel.port2.close()
205 delete this.messageChannel
206 }
207 }
208
75de9f41 209 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 210 return {
75de9f41 211 id: getWorkerId(worker),
67f3f2d6
JB
212 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
213 type: getWorkerType(worker)!,
4b628b48 214 dynamic: false,
5eb72b9e
JB
215 ready: false,
216 stealing: false
4b628b48
JB
217 }
218 }
219
220 private initWorkerUsage (): WorkerUsage {
221 const getTasksQueueSize = (): number => {
dd951876 222 return this.tasksQueue.size
4b628b48 223 }
bf4ef2ca 224 const getTasksQueueMaxSize = (): number => {
dd951876 225 return this.tasksQueue.maxSize
4b628b48
JB
226 }
227 return {
228 tasks: {
229 executed: 0,
230 executing: 0,
231 get queued (): number {
232 return getTasksQueueSize()
233 },
234 get maxQueued (): number {
bf4ef2ca 235 return getTasksQueueMaxSize()
4b628b48 236 },
463226a4 237 sequentiallyStolen: 0,
68cbdc84 238 stolen: 0,
4b628b48
JB
239 failed: 0
240 },
241 runTime: {
c52475b8 242 history: new CircularArray<number>()
4b628b48
JB
243 },
244 waitTime: {
c52475b8 245 history: new CircularArray<number>()
4b628b48
JB
246 },
247 elu: {
248 idle: {
c52475b8 249 history: new CircularArray<number>()
4b628b48
JB
250 },
251 active: {
c52475b8 252 history: new CircularArray<number>()
4b628b48
JB
253 }
254 }
255 }
256 }
257
db0e38ee 258 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
259 const getTaskFunctionQueueSize = (): number => {
260 let taskFunctionQueueSize = 0
b25a42cd 261 for (const task of this.tasksQueue) {
dd92a715 262 if (
e5ece61d 263 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6 264 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
31847469 265 name === this.info.taskFunctionsProperties![1].name) ||
e5ece61d 266 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 267 ) {
e5ece61d 268 ++taskFunctionQueueSize
b25a42cd
JB
269 }
270 }
e5ece61d 271 return taskFunctionQueueSize
b25a42cd
JB
272 }
273 return {
274 tasks: {
275 executed: 0,
276 executing: 0,
277 get queued (): number {
e5ece61d 278 return getTaskFunctionQueueSize()
b25a42cd 279 },
463226a4 280 sequentiallyStolen: 0,
68cbdc84 281 stolen: 0,
b25a42cd
JB
282 failed: 0
283 },
284 runTime: {
c52475b8 285 history: new CircularArray<number>()
b25a42cd
JB
286 },
287 waitTime: {
c52475b8 288 history: new CircularArray<number>()
b25a42cd
JB
289 },
290 elu: {
291 idle: {
c52475b8 292 history: new CircularArray<number>()
b25a42cd
JB
293 },
294 active: {
c52475b8 295 history: new CircularArray<number>()
b25a42cd
JB
296 }
297 }
298 }
299 }
4b628b48 300}