refactor: move worker setup into worker node constructor
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { EventEmitter } from 'node:events'
3 import { CircularArray } from '../circular-array'
4 import type { Task } from '../utility-types'
5 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
6 import { Deque } from '../deque'
7 import {
8 type ErrorHandler,
9 type ExitHandler,
10 type IWorker,
11 type IWorkerNode,
12 type MessageHandler,
13 type OnlineHandler,
14 type StrategyData,
15 type WorkerInfo,
16 type WorkerNodeOptions,
17 type WorkerType,
18 WorkerTypes,
19 type WorkerUsage
20 } from './worker'
21 import { checkWorkerNodeArguments, createWorker } from './utils'
22
23 /**
24 * Worker node.
25 *
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 */
29 export class WorkerNode<Worker extends IWorker, Data = unknown>
30 extends EventEmitter
31 implements IWorkerNode<Worker, Data> {
32 /** @inheritdoc */
33 public readonly worker: Worker
34 /** @inheritdoc */
35 public readonly info: WorkerInfo
36 /** @inheritdoc */
37 public usage: WorkerUsage
38 /** @inheritdoc */
39 public strategyData?: StrategyData
40 /** @inheritdoc */
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
43 public tasksQueueBackPressureSize: number
44 private readonly tasksQueue: Deque<Task<Data>>
45 private onBackPressureStarted: boolean
46 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
47
48 /**
49 * Constructs a new worker node.
50 *
51 * @param type - The worker type.
52 * @param filePath - The worker file path.
53 * @param opts - The worker node options.
54 */
55 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
56 super()
57 checkWorkerNodeArguments(type, filePath, opts)
58 this.worker = createWorker<Worker>(type, filePath, {
59 env: opts.env,
60 workerOptions: opts.workerOptions
61 })
62 this.info = this.initWorkerInfo(this.worker)
63 this.usage = this.initWorkerUsage()
64 if (this.info.type === WorkerTypes.thread) {
65 this.messageChannel = new MessageChannel()
66 }
67 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
68 this.tasksQueue = new Deque<Task<Data>>()
69 this.onBackPressureStarted = false
70 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
71 }
72
73 /** @inheritdoc */
74 public tasksQueueSize (): number {
75 return this.tasksQueue.size
76 }
77
78 /** @inheritdoc */
79 public enqueueTask (task: Task<Data>): number {
80 const tasksQueueSize = this.tasksQueue.push(task)
81 if (this.hasBackPressure() && !this.onBackPressureStarted) {
82 this.onBackPressureStarted = true
83 this.emit('backPressure', { workerId: this.info.id as number })
84 this.onBackPressureStarted = false
85 }
86 return tasksQueueSize
87 }
88
89 /** @inheritdoc */
90 public unshiftTask (task: Task<Data>): number {
91 const tasksQueueSize = this.tasksQueue.unshift(task)
92 if (this.hasBackPressure() && !this.onBackPressureStarted) {
93 this.onBackPressureStarted = true
94 this.emit('backPressure', { workerId: this.info.id as number })
95 this.onBackPressureStarted = false
96 }
97 return tasksQueueSize
98 }
99
100 /** @inheritdoc */
101 public dequeueTask (): Task<Data> | undefined {
102 return this.tasksQueue.shift()
103 }
104
105 /** @inheritdoc */
106 public popTask (): Task<Data> | undefined {
107 return this.tasksQueue.pop()
108 }
109
110 /** @inheritdoc */
111 public clearTasksQueue (): void {
112 this.tasksQueue.clear()
113 }
114
115 /** @inheritdoc */
116 public hasBackPressure (): boolean {
117 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
118 }
119
120 /** @inheritdoc */
121 public resetUsage (): void {
122 this.usage = this.initWorkerUsage()
123 this.taskFunctionsUsage.clear()
124 }
125
126 /** @inheritdoc */
127 public closeChannel (): void {
128 if (this.messageChannel != null) {
129 this.messageChannel.port1.unref()
130 this.messageChannel.port2.unref()
131 this.messageChannel.port1.close()
132 this.messageChannel.port2.close()
133 delete this.messageChannel
134 }
135 }
136
137 /** @inheritdoc */
138 public registerWorkerEventHandler (
139 event: string,
140 listener:
141 | OnlineHandler<Worker>
142 | MessageHandler<Worker>
143 | ErrorHandler<Worker>
144 | ExitHandler<Worker>
145 ): void {
146 this.worker.on(event, listener)
147 }
148
149 /** @inheritdoc */
150 public registerOnceWorkerEventHandler (
151 event: string,
152 listener:
153 | OnlineHandler<Worker>
154 | MessageHandler<Worker>
155 | ErrorHandler<Worker>
156 | ExitHandler<Worker>
157 ): void {
158 this.worker.once(event, listener)
159 }
160
161 /** @inheritdoc */
162 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
163 if (!Array.isArray(this.info.taskFunctionNames)) {
164 throw new Error(
165 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
166 )
167 }
168 if (
169 Array.isArray(this.info.taskFunctionNames) &&
170 this.info.taskFunctionNames.length < 3
171 ) {
172 throw new Error(
173 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
174 )
175 }
176 if (name === DEFAULT_TASK_NAME) {
177 name = this.info.taskFunctionNames[1]
178 }
179 if (!this.taskFunctionsUsage.has(name)) {
180 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
181 }
182 return this.taskFunctionsUsage.get(name)
183 }
184
185 /** @inheritdoc */
186 public deleteTaskFunctionWorkerUsage (name: string): boolean {
187 return this.taskFunctionsUsage.delete(name)
188 }
189
190 private initWorkerInfo (worker: Worker): WorkerInfo {
191 return {
192 id: getWorkerId(worker),
193 type: getWorkerType(worker) as WorkerType,
194 dynamic: false,
195 ready: false
196 }
197 }
198
199 private initWorkerUsage (): WorkerUsage {
200 const getTasksQueueSize = (): number => {
201 return this.tasksQueue.size
202 }
203 const getTasksQueueMaxSize = (): number => {
204 return this.tasksQueue.maxSize
205 }
206 return {
207 tasks: {
208 executed: 0,
209 executing: 0,
210 get queued (): number {
211 return getTasksQueueSize()
212 },
213 get maxQueued (): number {
214 return getTasksQueueMaxSize()
215 },
216 sequentiallyStolen: 0,
217 stolen: 0,
218 failed: 0
219 },
220 runTime: {
221 history: new CircularArray<number>()
222 },
223 waitTime: {
224 history: new CircularArray<number>()
225 },
226 elu: {
227 idle: {
228 history: new CircularArray<number>()
229 },
230 active: {
231 history: new CircularArray<number>()
232 }
233 }
234 }
235 }
236
237 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
238 const getTaskFunctionQueueSize = (): number => {
239 let taskFunctionQueueSize = 0
240 for (const task of this.tasksQueue) {
241 if (
242 (task.name === DEFAULT_TASK_NAME &&
243 name === (this.info.taskFunctionNames as string[])[1]) ||
244 (task.name !== DEFAULT_TASK_NAME && name === task.name)
245 ) {
246 ++taskFunctionQueueSize
247 }
248 }
249 return taskFunctionQueueSize
250 }
251 return {
252 tasks: {
253 executed: 0,
254 executing: 0,
255 get queued (): number {
256 return getTaskFunctionQueueSize()
257 },
258 sequentiallyStolen: 0,
259 stolen: 0,
260 failed: 0
261 },
262 runTime: {
263 history: new CircularArray<number>()
264 },
265 waitTime: {
266 history: new CircularArray<number>()
267 },
268 elu: {
269 idle: {
270 history: new CircularArray<number>()
271 },
272 active: {
273 history: new CircularArray<number>()
274 }
275 }
276 }
277 }
278 }