]> Piment Noir Git Repositories - poolifier.git/blame - src/pools/worker-node.ts
docs: publish documentation
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
e1c2dba7 1import { EventEmitter } from 'node:events'
ded253e2
JB
2import { MessageChannel } from 'node:worker_threads'
3
97231086
JB
4import type { Task } from '../utility-types.js'
5
f12182ad 6import { CircularBuffer } from '../circular-buffer.js'
c6dd1aeb 7import { PriorityQueue } from '../queues/priority-queue.js'
e9ed6eee 8import { DEFAULT_TASK_NAME } from '../utils.js'
ded253e2
JB
9import {
10 checkWorkerNodeArguments,
11 createWorker,
559c196a 12 initWorkerInfo,
ded253e2 13} from './utils.js'
4b628b48 14import {
3bcbd4c5 15 type EventHandler,
4b628b48
JB
16 type IWorker,
17 type IWorkerNode,
f12182ad 18 MeasurementHistorySize,
f3a91bac 19 type StrategyData,
4b628b48 20 type WorkerInfo,
c3719753 21 type WorkerNodeOptions,
4b628b48
JB
22 type WorkerType,
23 WorkerTypes,
3a502712 24 type WorkerUsage,
d35e5717 25} from './worker.js'
4b628b48 26
60664f48
JB
27/**
28 * Worker node.
60664f48
JB
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
4b628b48 32export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 33 extends EventEmitter
9f95d5eb 34 implements IWorkerNode<Worker, Data> {
671d5154 35 /** @inheritdoc */
4b628b48 36 public readonly info: WorkerInfo
671d5154 37 /** @inheritdoc */
97231086 38 public messageChannel?: MessageChannel
20c6f652 39 /** @inheritdoc */
f3a91bac
JB
40 public strategyData?: StrategyData
41 /** @inheritdoc */
f4289ecb
JB
42 public readonly tasksQueue: PriorityQueue<Task<Data>>
43 /** @inheritdoc */
20c6f652 44 public tasksQueueBackPressureSize: number
97231086
JB
45 /** @inheritdoc */
46 public usage: WorkerUsage
47 /** @inheritdoc */
48 public readonly worker: Worker
5de88a2a 49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 50
60664f48
JB
51 /**
52 * Constructs a new worker node.
c3719753 53 * @param type - The worker type.
9974369e 54 * @param filePath - Path to the worker file.
c3719753 55 * @param opts - The worker node options.
60664f48 56 */
c3719753 57 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 58 super()
c3719753
JB
59 checkWorkerNodeArguments(type, filePath, opts)
60 this.worker = createWorker<Worker>(type, filePath, {
61 env: opts.env,
3a502712 62 workerOptions: opts.workerOptions,
c3719753 63 })
559c196a 64 this.info = initWorkerInfo(this.worker)
26fb3c18 65 this.usage = this.initWorkerUsage()
75de9f41 66 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
67 this.messageChannel = new MessageChannel()
68 }
c63a35a0
JB
69 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
70 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
fcfc3353
JB
71 this.tasksQueue = new PriorityQueue<Task<Data>>(
72 opts.tasksQueueBucketSize,
73 opts.tasksQueuePriority
74 )
26fb3c18 75 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
76 }
77
fcfc3353 78 /** @inheritdoc */
97231086
JB
79 public clearTasksQueue (): void {
80 this.tasksQueue.clear()
fcfc3353
JB
81 }
82
f4289ecb
JB
83 /** @inheritdoc */
84 public deleteTask (task: Task<Data>): boolean {
85 return this.tasksQueue.delete(task)
86 }
87
4b628b48 88 /** @inheritdoc */
97231086
JB
89 public deleteTaskFunctionWorkerUsage (name: string): boolean {
90 return this.taskFunctionsUsage.delete(name)
4b628b48
JB
91 }
92
4b628b48 93 /** @inheritdoc */
97231086
JB
94 public dequeueLastPrioritizedTask (): Task<Data> | undefined {
95 // Start from the last empty or partially filled bucket
96 return this.dequeueTask(this.tasksQueue.buckets + 1)
72695f86
JB
97 }
98
99 /** @inheritdoc */
95d1a734 100 public dequeueTask (bucket?: number): Task<Data> | undefined {
2eee7220 101 const task = this.tasksQueue.dequeue(bucket)
8a2bf757 102 if (!this.hasBackPressure() && this.info.backPressure) {
2eee7220
JB
103 this.info.backPressure = false
104 }
2eee7220 105 return task
72695f86
JB
106 }
107
0d4e88b3 108 /** @inheritdoc */
97231086
JB
109 public enqueueTask (task: Task<Data>): number {
110 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
8a2bf757 111 if (this.hasBackPressure() && !this.info.backPressure) {
97231086
JB
112 this.info.backPressure = true
113 this.emit('backPressure', { workerId: this.info.id })
3f09ed9f 114 }
97231086 115 return tasksQueueSize
c3719753
JB
116 }
117
ff128cc9 118 /** @inheritdoc */
97231086 119 public getTaskFunctionWorkerUsage (name: string): undefined | WorkerUsage {
31847469 120 if (!Array.isArray(this.info.taskFunctionsProperties)) {
71b2b6d8 121 throw new Error(
31847469 122 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
71b2b6d8
JB
123 )
124 }
b558f6b5 125 if (
31847469
JB
126 Array.isArray(this.info.taskFunctionsProperties) &&
127 this.info.taskFunctionsProperties.length < 3
b558f6b5 128 ) {
db0e38ee 129 throw new Error(
31847469 130 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
db0e38ee
JB
131 )
132 }
133 if (name === DEFAULT_TASK_NAME) {
31847469 134 name = this.info.taskFunctionsProperties[1].name
b558f6b5 135 }
db0e38ee
JB
136 if (!this.taskFunctionsUsage.has(name)) {
137 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 138 }
db0e38ee 139 return this.taskFunctionsUsage.get(name)
4b628b48
JB
140 }
141
97231086
JB
142 /** @inheritdoc */
143 public registerOnceWorkerEventHandler (
144 event: string,
145 handler: EventHandler<Worker>
146 ): void {
147 this.worker.once(event, handler)
07e0c9e5
JB
148 }
149
97231086
JB
150 /** @inheritdoc */
151 public registerWorkerEventHandler (
152 event: string,
153 handler: EventHandler<Worker>
154 ): void {
155 this.worker.on(event, handler)
4b628b48
JB
156 }
157
97231086
JB
158 /** @inheritdoc */
159 public setTasksQueuePriority (enablePriority: boolean): void {
160 this.tasksQueue.enablePriority = enablePriority
4b628b48
JB
161 }
162
97231086
JB
163 /** @inheritdoc */
164 public tasksQueueSize (): number {
165 return this.tasksQueue.size
166 }
167
168 /** @inheritdoc */
169 public async terminate (): Promise<void> {
170 const waitWorkerExit = new Promise<void>(resolve => {
171 this.registerOnceWorkerEventHandler('exit', () => {
172 resolve()
173 })
174 })
175 this.closeMessageChannel()
176 this.removeAllListeners()
177 switch (this.info.type) {
97231086
JB
178 case WorkerTypes.cluster:
179 this.registerOnceWorkerEventHandler('disconnect', () => {
180 this.worker.kill?.()
181 })
182 this.worker.disconnect?.()
183 break
e7730ecc 184 case WorkerTypes.thread:
185 this.worker.unref?.()
186 await this.worker.terminate?.()
187 break
b25a42cd 188 }
97231086 189 await waitWorkerExit
b25a42cd 190 }
5de88a2a
JB
191
192 private closeMessageChannel (): void {
193 if (this.messageChannel != null) {
194 this.messageChannel.port1.unref()
195 this.messageChannel.port2.unref()
196 this.messageChannel.port1.close()
197 this.messageChannel.port2.close()
198 delete this.messageChannel
199 }
200 }
201
202 /**
203 * Whether the worker node is back pressured or not.
204 * @returns `true` if the worker node is back pressured, `false` otherwise.
205 */
206 private hasBackPressure (): boolean {
207 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
208 }
209
210 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
211 const getTaskFunctionQueueSize = (): number => {
212 let taskFunctionQueueSize = 0
213 for (const task of this.tasksQueue) {
214 if (
215 (task.name === DEFAULT_TASK_NAME &&
216 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
217 name === this.info.taskFunctionsProperties![1].name) ||
218 (task.name !== DEFAULT_TASK_NAME && name === task.name)
219 ) {
220 ++taskFunctionQueueSize
221 }
222 }
223 return taskFunctionQueueSize
224 }
225 return {
226 elu: {
227 active: {
228 history: new CircularBuffer(MeasurementHistorySize),
229 },
230 idle: {
231 history: new CircularBuffer(MeasurementHistorySize),
232 },
233 },
234 runTime: {
235 history: new CircularBuffer(MeasurementHistorySize),
236 },
237 tasks: {
238 executed: 0,
239 executing: 0,
240 failed: 0,
241 get queued (): number {
242 return getTaskFunctionQueueSize()
243 },
244 sequentiallyStolen: 0,
245 stolen: 0,
246 },
247 waitTime: {
248 history: new CircularBuffer(MeasurementHistorySize),
249 },
250 }
251 }
252
5de88a2a
JB
253 private initWorkerUsage (): WorkerUsage {
254 const getTasksQueueSize = (): number => {
255 return this.tasksQueue.size
256 }
257 const getTasksQueueMaxSize = (): number => {
258 return this.tasksQueue.maxSize
259 }
260 return {
261 elu: {
262 active: {
263 history: new CircularBuffer(MeasurementHistorySize),
264 },
265 idle: {
266 history: new CircularBuffer(MeasurementHistorySize),
267 },
268 },
269 runTime: {
270 history: new CircularBuffer(MeasurementHistorySize),
271 },
272 tasks: {
273 executed: 0,
274 executing: 0,
275 failed: 0,
276 get maxQueued (): number {
277 return getTasksQueueMaxSize()
278 },
279 get queued (): number {
280 return getTasksQueueSize()
281 },
282 sequentiallyStolen: 0,
283 stolen: 0,
284 },
285 waitTime: {
286 history: new CircularBuffer(MeasurementHistorySize),
287 },
288 }
289 }
4b628b48 290}