refactor: remove unneeded null coalescing
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48
JB
2import { CircularArray } from '../circular-array'
3import { Queue } from '../queue'
5c4d16da 4import type { Task } from '../utility-types'
b558f6b5 5import { DEFAULT_TASK_NAME } from '../utils'
4b628b48
JB
6import {
7 type IWorker,
8 type IWorkerNode,
4b628b48
JB
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13} from './worker'
14
60664f48
JB
15/**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
4b628b48
JB
21export class WorkerNode<Worker extends IWorker, Data = unknown>
22implements IWorkerNode<Worker, Data> {
671d5154 23 /** @inheritdoc */
4b628b48 24 public readonly worker: Worker
671d5154 25 /** @inheritdoc */
4b628b48 26 public readonly info: WorkerInfo
671d5154 27 /** @inheritdoc */
7884d183 28 public messageChannel?: MessageChannel
671d5154 29 /** @inheritdoc */
4b628b48 30 public usage: WorkerUsage
20c6f652
JB
31 /** @inheritdoc */
32 public tasksQueueBackPressureSize: number
db0e38ee 33 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48
JB
34 private readonly tasksQueue: Queue<Task<Data>>
35
60664f48
JB
36 /**
37 * Constructs a new worker node.
38 *
39 * @param worker - The worker.
40 * @param workerType - The worker type.
20c6f652 41 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
60664f48 42 */
20c6f652
JB
43 constructor (
44 worker: Worker,
45 workerType: WorkerType,
46 tasksQueueBackPressureSize: number
47 ) {
8735b4e5 48 if (worker == null) {
e695d66f 49 throw new TypeError('Cannot construct a worker node without a worker')
8735b4e5
JB
50 }
51 if (workerType == null) {
e695d66f
JB
52 throw new TypeError(
53 'Cannot construct a worker node without a worker type'
54 )
8735b4e5 55 }
20c6f652 56 if (tasksQueueBackPressureSize == null) {
e695d66f 57 throw new TypeError(
20c6f652 58 'Cannot construct a worker node without a tasks queue back pressure size'
8735b4e5
JB
59 )
60 }
20c6f652 61 if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
e695d66f 62 throw new TypeError(
20c6f652 63 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
8735b4e5
JB
64 )
65 }
4b628b48
JB
66 this.worker = worker
67 this.info = this.initWorkerInfo(worker, workerType)
7884d183
JB
68 if (workerType === WorkerTypes.thread) {
69 this.messageChannel = new MessageChannel()
70 }
4b628b48 71 this.usage = this.initWorkerUsage()
db0e38ee 72 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48 73 this.tasksQueue = new Queue<Task<Data>>()
20c6f652 74 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
4b628b48
JB
75 }
76
77 /** @inheritdoc */
78 public tasksQueueSize (): number {
79 return this.tasksQueue.size
80 }
81
82 /**
eb8afc8a 83 * Tasks queue maximum size.
4b628b48
JB
84 *
85 * @returns The tasks queue maximum size.
86 */
87 private tasksQueueMaxSize (): number {
88 return this.tasksQueue.maxSize
89 }
90
91 /** @inheritdoc */
92 public enqueueTask (task: Task<Data>): number {
93 return this.tasksQueue.enqueue(task)
94 }
95
96 /** @inheritdoc */
97 public dequeueTask (): Task<Data> | undefined {
98 return this.tasksQueue.dequeue()
99 }
100
101 /** @inheritdoc */
102 public clearTasksQueue (): void {
103 this.tasksQueue.clear()
104 }
105
671d5154
JB
106 /** @inheritdoc */
107 public hasBackPressure (): boolean {
8735b4e5 108 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
109 }
110
ff128cc9 111 /** @inheritdoc */
4b628b48
JB
112 public resetUsage (): void {
113 this.usage = this.initWorkerUsage()
db0e38ee 114 this.taskFunctionsUsage.clear()
ff128cc9
JB
115 }
116
3f09ed9f
JB
117 /** @inheritdoc */
118 public closeChannel (): void {
7884d183
JB
119 if (this.messageChannel != null) {
120 this.messageChannel?.port1.unref()
121 this.messageChannel?.port2.unref()
122 this.messageChannel?.port1.close()
123 this.messageChannel?.port2.close()
124 delete this.messageChannel
3f09ed9f
JB
125 }
126 }
127
ff128cc9 128 /** @inheritdoc */
db0e38ee 129 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
a5d15204 130 if (!Array.isArray(this.info.taskFunctions)) {
71b2b6d8 131 throw new Error(
db0e38ee 132 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
133 )
134 }
b558f6b5 135 if (
71b2b6d8 136 Array.isArray(this.info.taskFunctions) &&
db0e38ee 137 this.info.taskFunctions.length < 3
b558f6b5 138 ) {
db0e38ee
JB
139 throw new Error(
140 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
141 )
142 }
143 if (name === DEFAULT_TASK_NAME) {
71b2b6d8 144 name = this.info.taskFunctions[1]
b558f6b5 145 }
db0e38ee
JB
146 if (!this.taskFunctionsUsage.has(name)) {
147 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 148 }
db0e38ee 149 return this.taskFunctionsUsage.get(name)
4b628b48
JB
150 }
151
152 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
153 return {
154 id: this.getWorkerId(worker, workerType),
155 type: workerType,
156 dynamic: false,
7884d183 157 ready: false
4b628b48
JB
158 }
159 }
160
161 private initWorkerUsage (): WorkerUsage {
162 const getTasksQueueSize = (): number => {
163 return this.tasksQueueSize()
164 }
bf4ef2ca 165 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
166 return this.tasksQueueMaxSize()
167 }
168 return {
169 tasks: {
170 executed: 0,
171 executing: 0,
172 get queued (): number {
173 return getTasksQueueSize()
174 },
175 get maxQueued (): number {
bf4ef2ca 176 return getTasksQueueMaxSize()
4b628b48
JB
177 },
178 failed: 0
179 },
180 runTime: {
181 history: new CircularArray()
182 },
183 waitTime: {
184 history: new CircularArray()
185 },
186 elu: {
187 idle: {
188 history: new CircularArray()
189 },
190 active: {
191 history: new CircularArray()
192 }
193 }
194 }
195 }
196
db0e38ee 197 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
198 const getTaskFunctionQueueSize = (): number => {
199 let taskFunctionQueueSize = 0
b25a42cd 200 for (const task of this.tasksQueue) {
dd92a715 201 if (
e5ece61d
JB
202 (task.name === DEFAULT_TASK_NAME &&
203 name === (this.info.taskFunctions as string[])[1]) ||
204 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 205 ) {
e5ece61d 206 ++taskFunctionQueueSize
b25a42cd
JB
207 }
208 }
e5ece61d 209 return taskFunctionQueueSize
b25a42cd
JB
210 }
211 return {
212 tasks: {
213 executed: 0,
214 executing: 0,
215 get queued (): number {
e5ece61d 216 return getTaskFunctionQueueSize()
b25a42cd
JB
217 },
218 failed: 0
219 },
220 runTime: {
221 history: new CircularArray()
222 },
223 waitTime: {
224 history: new CircularArray()
225 },
226 elu: {
227 idle: {
228 history: new CircularArray()
229 },
230 active: {
231 history: new CircularArray()
232 }
233 }
234 }
235 }
236
4b628b48
JB
237 /**
238 * Gets the worker id.
239 *
240 * @param worker - The worker.
60664f48 241 * @param workerType - The worker type.
4b628b48
JB
242 * @returns The worker id.
243 */
244 private getWorkerId (
245 worker: Worker,
246 workerType: WorkerType
247 ): number | undefined {
248 if (workerType === WorkerTypes.thread) {
249 return worker.threadId
250 } else if (workerType === WorkerTypes.cluster) {
251 return worker.id
252 }
253 }
254}