fix: ensure worker node cannot be instantiaed without proper arguments
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48
JB
2import { CircularArray } from '../circular-array'
3import { Queue } from '../queue'
5c4d16da 4import type { Task } from '../utility-types'
b558f6b5 5import { DEFAULT_TASK_NAME } from '../utils'
4b628b48
JB
6import {
7 type IWorker,
8 type IWorkerNode,
4b628b48
JB
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13} from './worker'
14
60664f48
JB
15/**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
4b628b48
JB
21export class WorkerNode<Worker extends IWorker, Data = unknown>
22implements IWorkerNode<Worker, Data> {
671d5154 23 /** @inheritdoc */
4b628b48 24 public readonly worker: Worker
671d5154 25 /** @inheritdoc */
4b628b48 26 public readonly info: WorkerInfo
671d5154 27 /** @inheritdoc */
7884d183 28 public messageChannel?: MessageChannel
671d5154 29 /** @inheritdoc */
4b628b48 30 public usage: WorkerUsage
db0e38ee 31 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 32 private readonly tasksQueue: Queue<Task<Data>>
e2b31e32 33 private readonly tasksQueueBackPressureSize: number
4b628b48 34
60664f48
JB
35 /**
36 * Constructs a new worker node.
37 *
38 * @param worker - The worker.
39 * @param workerType - The worker type.
671d5154 40 * @param poolMaxSize - The pool maximum size.
60664f48 41 */
671d5154 42 constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) {
8735b4e5
JB
43 if (worker == null) {
44 throw new Error('Cannot construct a worker node without a worker')
45 }
46 if (workerType == null) {
47 throw new Error('Cannot construct a worker node without a worker type')
48 }
49 if (poolMaxSize == null) {
50 throw new Error(
51 'Cannot construct a worker node without a pool maximum size'
52 )
53 }
54 if (isNaN(poolMaxSize)) {
55 throw new Error(
56 'Cannot construct a worker node with a NaN pool maximum size'
57 )
58 }
4b628b48
JB
59 this.worker = worker
60 this.info = this.initWorkerInfo(worker, workerType)
7884d183
JB
61 if (workerType === WorkerTypes.thread) {
62 this.messageChannel = new MessageChannel()
63 }
4b628b48 64 this.usage = this.initWorkerUsage()
db0e38ee 65 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48 66 this.tasksQueue = new Queue<Task<Data>>()
e2b31e32 67 this.tasksQueueBackPressureSize = Math.pow(poolMaxSize, 2)
4b628b48
JB
68 }
69
70 /** @inheritdoc */
71 public tasksQueueSize (): number {
72 return this.tasksQueue.size
73 }
74
75 /**
eb8afc8a 76 * Tasks queue maximum size.
4b628b48
JB
77 *
78 * @returns The tasks queue maximum size.
79 */
80 private tasksQueueMaxSize (): number {
81 return this.tasksQueue.maxSize
82 }
83
84 /** @inheritdoc */
85 public enqueueTask (task: Task<Data>): number {
86 return this.tasksQueue.enqueue(task)
87 }
88
89 /** @inheritdoc */
90 public dequeueTask (): Task<Data> | undefined {
91 return this.tasksQueue.dequeue()
92 }
93
94 /** @inheritdoc */
95 public clearTasksQueue (): void {
96 this.tasksQueue.clear()
97 }
98
671d5154
JB
99 /** @inheritdoc */
100 public hasBackPressure (): boolean {
8735b4e5 101 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
102 }
103
ff128cc9 104 /** @inheritdoc */
4b628b48
JB
105 public resetUsage (): void {
106 this.usage = this.initWorkerUsage()
db0e38ee 107 this.taskFunctionsUsage.clear()
ff128cc9
JB
108 }
109
3f09ed9f
JB
110 /** @inheritdoc */
111 public closeChannel (): void {
7884d183
JB
112 if (this.messageChannel != null) {
113 this.messageChannel?.port1.unref()
114 this.messageChannel?.port2.unref()
115 this.messageChannel?.port1.close()
116 this.messageChannel?.port2.close()
117 delete this.messageChannel
3f09ed9f
JB
118 }
119 }
120
ff128cc9 121 /** @inheritdoc */
db0e38ee 122 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
a5d15204 123 if (!Array.isArray(this.info.taskFunctions)) {
71b2b6d8 124 throw new Error(
db0e38ee 125 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
126 )
127 }
b558f6b5 128 if (
71b2b6d8 129 Array.isArray(this.info.taskFunctions) &&
db0e38ee 130 this.info.taskFunctions.length < 3
b558f6b5 131 ) {
db0e38ee
JB
132 throw new Error(
133 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
134 )
135 }
136 if (name === DEFAULT_TASK_NAME) {
71b2b6d8 137 name = this.info.taskFunctions[1]
b558f6b5 138 }
db0e38ee
JB
139 if (!this.taskFunctionsUsage.has(name)) {
140 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 141 }
db0e38ee 142 return this.taskFunctionsUsage.get(name)
4b628b48
JB
143 }
144
145 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
146 return {
147 id: this.getWorkerId(worker, workerType),
148 type: workerType,
149 dynamic: false,
7884d183 150 ready: false
4b628b48
JB
151 }
152 }
153
154 private initWorkerUsage (): WorkerUsage {
155 const getTasksQueueSize = (): number => {
156 return this.tasksQueueSize()
157 }
bf4ef2ca 158 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
159 return this.tasksQueueMaxSize()
160 }
161 return {
162 tasks: {
163 executed: 0,
164 executing: 0,
165 get queued (): number {
166 return getTasksQueueSize()
167 },
168 get maxQueued (): number {
bf4ef2ca 169 return getTasksQueueMaxSize()
4b628b48
JB
170 },
171 failed: 0
172 },
173 runTime: {
174 history: new CircularArray()
175 },
176 waitTime: {
177 history: new CircularArray()
178 },
179 elu: {
180 idle: {
181 history: new CircularArray()
182 },
183 active: {
184 history: new CircularArray()
185 }
186 }
187 }
188 }
189
db0e38ee 190 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
b25a42cd
JB
191 const getTaskQueueSize = (): number => {
192 let taskQueueSize = 0
193 for (const task of this.tasksQueue) {
194 if (task.name === name) {
195 ++taskQueueSize
196 }
197 }
198 return taskQueueSize
199 }
200 return {
201 tasks: {
202 executed: 0,
203 executing: 0,
204 get queued (): number {
205 return getTaskQueueSize()
206 },
207 failed: 0
208 },
209 runTime: {
210 history: new CircularArray()
211 },
212 waitTime: {
213 history: new CircularArray()
214 },
215 elu: {
216 idle: {
217 history: new CircularArray()
218 },
219 active: {
220 history: new CircularArray()
221 }
222 }
223 }
224 }
225
4b628b48
JB
226 /**
227 * Gets the worker id.
228 *
229 * @param worker - The worker.
60664f48 230 * @param workerType - The worker type.
4b628b48
JB
231 * @returns The worker id.
232 */
233 private getWorkerId (
234 worker: Worker,
235 workerType: WorkerType
236 ): number | undefined {
237 if (workerType === WorkerTypes.thread) {
238 return worker.threadId
239 } else if (workerType === WorkerTypes.cluster) {
240 return worker.id
241 }
242 }
243}