fix: worker usage for the default task function
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48
JB
2import { CircularArray } from '../circular-array'
3import { Queue } from '../queue'
5c4d16da 4import type { Task } from '../utility-types'
b558f6b5 5import { DEFAULT_TASK_NAME } from '../utils'
4b628b48
JB
6import {
7 type IWorker,
8 type IWorkerNode,
4b628b48
JB
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13} from './worker'
14
60664f48
JB
15/**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
4b628b48
JB
21export class WorkerNode<Worker extends IWorker, Data = unknown>
22implements IWorkerNode<Worker, Data> {
671d5154 23 /** @inheritdoc */
4b628b48 24 public readonly worker: Worker
671d5154 25 /** @inheritdoc */
4b628b48 26 public readonly info: WorkerInfo
671d5154 27 /** @inheritdoc */
7884d183 28 public messageChannel?: MessageChannel
671d5154 29 /** @inheritdoc */
4b628b48 30 public usage: WorkerUsage
db0e38ee 31 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 32 private readonly tasksQueue: Queue<Task<Data>>
e2b31e32 33 private readonly tasksQueueBackPressureSize: number
4b628b48 34
60664f48
JB
35 /**
36 * Constructs a new worker node.
37 *
38 * @param worker - The worker.
39 * @param workerType - The worker type.
671d5154 40 * @param poolMaxSize - The pool maximum size.
60664f48 41 */
671d5154 42 constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) {
8735b4e5 43 if (worker == null) {
e695d66f 44 throw new TypeError('Cannot construct a worker node without a worker')
8735b4e5
JB
45 }
46 if (workerType == null) {
e695d66f
JB
47 throw new TypeError(
48 'Cannot construct a worker node without a worker type'
49 )
8735b4e5
JB
50 }
51 if (poolMaxSize == null) {
e695d66f 52 throw new TypeError(
8735b4e5
JB
53 'Cannot construct a worker node without a pool maximum size'
54 )
55 }
2d53d5ca 56 if (!Number.isSafeInteger(poolMaxSize)) {
e695d66f 57 throw new TypeError(
2d53d5ca 58 'Cannot construct a worker node with a pool maximum size that is not an integer'
8735b4e5
JB
59 )
60 }
4b628b48
JB
61 this.worker = worker
62 this.info = this.initWorkerInfo(worker, workerType)
7884d183
JB
63 if (workerType === WorkerTypes.thread) {
64 this.messageChannel = new MessageChannel()
65 }
4b628b48 66 this.usage = this.initWorkerUsage()
db0e38ee 67 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48 68 this.tasksQueue = new Queue<Task<Data>>()
e2b31e32 69 this.tasksQueueBackPressureSize = Math.pow(poolMaxSize, 2)
4b628b48
JB
70 }
71
72 /** @inheritdoc */
73 public tasksQueueSize (): number {
74 return this.tasksQueue.size
75 }
76
77 /**
eb8afc8a 78 * Tasks queue maximum size.
4b628b48
JB
79 *
80 * @returns The tasks queue maximum size.
81 */
82 private tasksQueueMaxSize (): number {
83 return this.tasksQueue.maxSize
84 }
85
86 /** @inheritdoc */
87 public enqueueTask (task: Task<Data>): number {
88 return this.tasksQueue.enqueue(task)
89 }
90
91 /** @inheritdoc */
92 public dequeueTask (): Task<Data> | undefined {
93 return this.tasksQueue.dequeue()
94 }
95
96 /** @inheritdoc */
97 public clearTasksQueue (): void {
98 this.tasksQueue.clear()
99 }
100
671d5154
JB
101 /** @inheritdoc */
102 public hasBackPressure (): boolean {
8735b4e5 103 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
104 }
105
ff128cc9 106 /** @inheritdoc */
4b628b48
JB
107 public resetUsage (): void {
108 this.usage = this.initWorkerUsage()
db0e38ee 109 this.taskFunctionsUsage.clear()
ff128cc9
JB
110 }
111
3f09ed9f
JB
112 /** @inheritdoc */
113 public closeChannel (): void {
7884d183
JB
114 if (this.messageChannel != null) {
115 this.messageChannel?.port1.unref()
116 this.messageChannel?.port2.unref()
117 this.messageChannel?.port1.close()
118 this.messageChannel?.port2.close()
119 delete this.messageChannel
3f09ed9f
JB
120 }
121 }
122
ff128cc9 123 /** @inheritdoc */
db0e38ee 124 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
a5d15204 125 if (!Array.isArray(this.info.taskFunctions)) {
71b2b6d8 126 throw new Error(
db0e38ee 127 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
128 )
129 }
b558f6b5 130 if (
71b2b6d8 131 Array.isArray(this.info.taskFunctions) &&
db0e38ee 132 this.info.taskFunctions.length < 3
b558f6b5 133 ) {
db0e38ee
JB
134 throw new Error(
135 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
136 )
137 }
138 if (name === DEFAULT_TASK_NAME) {
71b2b6d8 139 name = this.info.taskFunctions[1]
b558f6b5 140 }
db0e38ee
JB
141 if (!this.taskFunctionsUsage.has(name)) {
142 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 143 }
db0e38ee 144 return this.taskFunctionsUsage.get(name)
4b628b48
JB
145 }
146
147 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
148 return {
149 id: this.getWorkerId(worker, workerType),
150 type: workerType,
151 dynamic: false,
7884d183 152 ready: false
4b628b48
JB
153 }
154 }
155
156 private initWorkerUsage (): WorkerUsage {
157 const getTasksQueueSize = (): number => {
158 return this.tasksQueueSize()
159 }
bf4ef2ca 160 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
161 return this.tasksQueueMaxSize()
162 }
163 return {
164 tasks: {
165 executed: 0,
166 executing: 0,
167 get queued (): number {
168 return getTasksQueueSize()
169 },
170 get maxQueued (): number {
bf4ef2ca 171 return getTasksQueueMaxSize()
4b628b48
JB
172 },
173 failed: 0
174 },
175 runTime: {
176 history: new CircularArray()
177 },
178 waitTime: {
179 history: new CircularArray()
180 },
181 elu: {
182 idle: {
183 history: new CircularArray()
184 },
185 active: {
186 history: new CircularArray()
187 }
188 }
189 }
190 }
191
db0e38ee 192 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
b25a42cd
JB
193 const getTaskQueueSize = (): number => {
194 let taskQueueSize = 0
195 for (const task of this.tasksQueue) {
dd92a715
JB
196 if (
197 (name === DEFAULT_TASK_NAME &&
198 task.name === (this.info.taskFunctions as string[])[1]) ||
199 task.name === name
200 ) {
b25a42cd
JB
201 ++taskQueueSize
202 }
203 }
204 return taskQueueSize
205 }
206 return {
207 tasks: {
208 executed: 0,
209 executing: 0,
210 get queued (): number {
211 return getTaskQueueSize()
212 },
213 failed: 0
214 },
215 runTime: {
216 history: new CircularArray()
217 },
218 waitTime: {
219 history: new CircularArray()
220 },
221 elu: {
222 idle: {
223 history: new CircularArray()
224 },
225 active: {
226 history: new CircularArray()
227 }
228 }
229 }
230 }
231
4b628b48
JB
232 /**
233 * Gets the worker id.
234 *
235 * @param worker - The worker.
60664f48 236 * @param workerType - The worker type.
4b628b48
JB
237 * @returns The worker id.
238 */
239 private getWorkerId (
240 worker: Worker,
241 workerType: WorkerType
242 ): number | undefined {
243 if (workerType === WorkerTypes.thread) {
244 return worker.threadId
245 } else if (workerType === WorkerTypes.cluster) {
246 return worker.id
247 }
248 }
249}