refactor: move worker setup into worker node constructor
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
e1c2dba7 2import { EventEmitter } from 'node:events'
4b628b48 3import { CircularArray } from '../circular-array'
5c4d16da 4import type { Task } from '../utility-types'
463226a4 5import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
574b351d 6import { Deque } from '../deque'
4b628b48 7import {
c3719753
JB
8 type ErrorHandler,
9 type ExitHandler,
4b628b48
JB
10 type IWorker,
11 type IWorkerNode,
c3719753
JB
12 type MessageHandler,
13 type OnlineHandler,
f3a91bac 14 type StrategyData,
4b628b48 15 type WorkerInfo,
c3719753 16 type WorkerNodeOptions,
4b628b48
JB
17 type WorkerType,
18 WorkerTypes,
19 type WorkerUsage
20} from './worker'
c3719753 21import { checkWorkerNodeArguments, createWorker } from './utils'
4b628b48 22
60664f48
JB
23/**
24 * Worker node.
25 *
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 */
4b628b48 29export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 30 extends EventEmitter
9f95d5eb 31 implements IWorkerNode<Worker, Data> {
671d5154 32 /** @inheritdoc */
4b628b48 33 public readonly worker: Worker
671d5154 34 /** @inheritdoc */
4b628b48 35 public readonly info: WorkerInfo
671d5154 36 /** @inheritdoc */
4b628b48 37 public usage: WorkerUsage
20c6f652 38 /** @inheritdoc */
f3a91bac
JB
39 public strategyData?: StrategyData
40 /** @inheritdoc */
26fb3c18
JB
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
20c6f652 43 public tasksQueueBackPressureSize: number
574b351d 44 private readonly tasksQueue: Deque<Task<Data>>
47352846 45 private onBackPressureStarted: boolean
26fb3c18 46 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 47
60664f48
JB
48 /**
49 * Constructs a new worker node.
50 *
c3719753
JB
51 * @param type - The worker type.
52 * @param filePath - The worker file path.
53 * @param opts - The worker node options.
60664f48 54 */
c3719753 55 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 56 super()
c3719753
JB
57 checkWorkerNodeArguments(type, filePath, opts)
58 this.worker = createWorker<Worker>(type, filePath, {
59 env: opts.env,
60 workerOptions: opts.workerOptions
61 })
62 this.info = this.initWorkerInfo(this.worker)
26fb3c18 63 this.usage = this.initWorkerUsage()
75de9f41 64 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
65 this.messageChannel = new MessageChannel()
66 }
c3719753 67 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
26fb3c18 68 this.tasksQueue = new Deque<Task<Data>>()
47352846 69 this.onBackPressureStarted = false
26fb3c18 70 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
71 }
72
73 /** @inheritdoc */
74 public tasksQueueSize (): number {
75 return this.tasksQueue.size
76 }
77
4b628b48
JB
78 /** @inheritdoc */
79 public enqueueTask (task: Task<Data>): number {
72695f86 80 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 81 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 82 this.onBackPressureStarted = true
e1c2dba7 83 this.emit('backPressure', { workerId: this.info.id as number })
47352846 84 this.onBackPressureStarted = false
72695f86
JB
85 }
86 return tasksQueueSize
87 }
88
89 /** @inheritdoc */
90 public unshiftTask (task: Task<Data>): number {
91 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 92 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 93 this.onBackPressureStarted = true
e1c2dba7 94 this.emit('backPressure', { workerId: this.info.id as number })
47352846 95 this.onBackPressureStarted = false
72695f86
JB
96 }
97 return tasksQueueSize
4b628b48
JB
98 }
99
100 /** @inheritdoc */
101 public dequeueTask (): Task<Data> | undefined {
463226a4 102 return this.tasksQueue.shift()
4b628b48
JB
103 }
104
72695f86
JB
105 /** @inheritdoc */
106 public popTask (): Task<Data> | undefined {
463226a4 107 return this.tasksQueue.pop()
72695f86
JB
108 }
109
4b628b48
JB
110 /** @inheritdoc */
111 public clearTasksQueue (): void {
112 this.tasksQueue.clear()
113 }
114
671d5154
JB
115 /** @inheritdoc */
116 public hasBackPressure (): boolean {
8735b4e5 117 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
118 }
119
ff128cc9 120 /** @inheritdoc */
4b628b48
JB
121 public resetUsage (): void {
122 this.usage = this.initWorkerUsage()
db0e38ee 123 this.taskFunctionsUsage.clear()
ff128cc9
JB
124 }
125
3f09ed9f
JB
126 /** @inheritdoc */
127 public closeChannel (): void {
7884d183 128 if (this.messageChannel != null) {
3ff2b910
JB
129 this.messageChannel.port1.unref()
130 this.messageChannel.port2.unref()
131 this.messageChannel.port1.close()
132 this.messageChannel.port2.close()
7884d183 133 delete this.messageChannel
3f09ed9f
JB
134 }
135 }
136
c3719753
JB
137 /** @inheritdoc */
138 public registerWorkerEventHandler (
139 event: string,
140 listener:
141 | OnlineHandler<Worker>
142 | MessageHandler<Worker>
143 | ErrorHandler<Worker>
144 | ExitHandler<Worker>
145 ): void {
146 this.worker.on(event, listener)
147 }
148
149 /** @inheritdoc */
150 public registerOnceWorkerEventHandler (
151 event: string,
152 listener:
153 | OnlineHandler<Worker>
154 | MessageHandler<Worker>
155 | ErrorHandler<Worker>
156 | ExitHandler<Worker>
157 ): void {
158 this.worker.once(event, listener)
159 }
160
ff128cc9 161 /** @inheritdoc */
db0e38ee 162 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 163 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 164 throw new Error(
db0e38ee 165 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
166 )
167 }
b558f6b5 168 if (
6703b9f4
JB
169 Array.isArray(this.info.taskFunctionNames) &&
170 this.info.taskFunctionNames.length < 3
b558f6b5 171 ) {
db0e38ee
JB
172 throw new Error(
173 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
174 )
175 }
176 if (name === DEFAULT_TASK_NAME) {
6703b9f4 177 name = this.info.taskFunctionNames[1]
b558f6b5 178 }
db0e38ee
JB
179 if (!this.taskFunctionsUsage.has(name)) {
180 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 181 }
db0e38ee 182 return this.taskFunctionsUsage.get(name)
4b628b48
JB
183 }
184
adee6053
JB
185 /** @inheritdoc */
186 public deleteTaskFunctionWorkerUsage (name: string): boolean {
187 return this.taskFunctionsUsage.delete(name)
188 }
189
75de9f41 190 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 191 return {
75de9f41
JB
192 id: getWorkerId(worker),
193 type: getWorkerType(worker) as WorkerType,
4b628b48 194 dynamic: false,
7884d183 195 ready: false
4b628b48
JB
196 }
197 }
198
199 private initWorkerUsage (): WorkerUsage {
200 const getTasksQueueSize = (): number => {
dd951876 201 return this.tasksQueue.size
4b628b48 202 }
bf4ef2ca 203 const getTasksQueueMaxSize = (): number => {
dd951876 204 return this.tasksQueue.maxSize
4b628b48
JB
205 }
206 return {
207 tasks: {
208 executed: 0,
209 executing: 0,
210 get queued (): number {
211 return getTasksQueueSize()
212 },
213 get maxQueued (): number {
bf4ef2ca 214 return getTasksQueueMaxSize()
4b628b48 215 },
463226a4 216 sequentiallyStolen: 0,
68cbdc84 217 stolen: 0,
4b628b48
JB
218 failed: 0
219 },
220 runTime: {
c52475b8 221 history: new CircularArray<number>()
4b628b48
JB
222 },
223 waitTime: {
c52475b8 224 history: new CircularArray<number>()
4b628b48
JB
225 },
226 elu: {
227 idle: {
c52475b8 228 history: new CircularArray<number>()
4b628b48
JB
229 },
230 active: {
c52475b8 231 history: new CircularArray<number>()
4b628b48
JB
232 }
233 }
234 }
235 }
236
db0e38ee 237 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
238 const getTaskFunctionQueueSize = (): number => {
239 let taskFunctionQueueSize = 0
b25a42cd 240 for (const task of this.tasksQueue) {
dd92a715 241 if (
e5ece61d 242 (task.name === DEFAULT_TASK_NAME &&
6703b9f4 243 name === (this.info.taskFunctionNames as string[])[1]) ||
e5ece61d 244 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 245 ) {
e5ece61d 246 ++taskFunctionQueueSize
b25a42cd
JB
247 }
248 }
e5ece61d 249 return taskFunctionQueueSize
b25a42cd
JB
250 }
251 return {
252 tasks: {
253 executed: 0,
254 executing: 0,
255 get queued (): number {
e5ece61d 256 return getTaskFunctionQueueSize()
b25a42cd 257 },
463226a4 258 sequentiallyStolen: 0,
68cbdc84 259 stolen: 0,
b25a42cd
JB
260 failed: 0
261 },
262 runTime: {
c52475b8 263 history: new CircularArray<number>()
b25a42cd
JB
264 },
265 waitTime: {
c52475b8 266 history: new CircularArray<number>()
b25a42cd
JB
267 },
268 elu: {
269 idle: {
c52475b8 270 history: new CircularArray<number>()
b25a42cd
JB
271 },
272 active: {
c52475b8 273 history: new CircularArray<number>()
b25a42cd
JB
274 }
275 }
276 }
277 }
4b628b48 278}