feat: add `queueMaxSize` option to tasks queue options
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import { Queue } from '../queue'
4 import type { Task } from '../utility-types'
5 import { DEFAULT_TASK_NAME } from '../utils'
6 import {
7 type IWorker,
8 type IWorkerNode,
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13 } from './worker'
14
15 /**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
21 export class WorkerNode<Worker extends IWorker, Data = unknown>
22 implements IWorkerNode<Worker, Data> {
23 /** @inheritdoc */
24 public readonly worker: Worker
25 /** @inheritdoc */
26 public readonly info: WorkerInfo
27 /** @inheritdoc */
28 public messageChannel?: MessageChannel
29 /** @inheritdoc */
30 public usage: WorkerUsage
31 /** @inheritdoc */
32 public tasksQueueBackPressureSize: number
33 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
34 private readonly tasksQueue: Queue<Task<Data>>
35
36 /**
37 * Constructs a new worker node.
38 *
39 * @param worker - The worker.
40 * @param workerType - The worker type.
41 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
42 */
43 constructor (
44 worker: Worker,
45 workerType: WorkerType,
46 tasksQueueBackPressureSize: number
47 ) {
48 if (worker == null) {
49 throw new TypeError('Cannot construct a worker node without a worker')
50 }
51 if (workerType == null) {
52 throw new TypeError(
53 'Cannot construct a worker node without a worker type'
54 )
55 }
56 if (tasksQueueBackPressureSize == null) {
57 throw new TypeError(
58 'Cannot construct a worker node without a tasks queue back pressure size'
59 )
60 }
61 if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
62 throw new TypeError(
63 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
64 )
65 }
66 this.worker = worker
67 this.info = this.initWorkerInfo(worker, workerType)
68 if (workerType === WorkerTypes.thread) {
69 this.messageChannel = new MessageChannel()
70 }
71 this.usage = this.initWorkerUsage()
72 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
73 this.tasksQueue = new Queue<Task<Data>>()
74 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
75 }
76
77 /** @inheritdoc */
78 public tasksQueueSize (): number {
79 return this.tasksQueue.size
80 }
81
82 /**
83 * Tasks queue maximum size.
84 *
85 * @returns The tasks queue maximum size.
86 */
87 private tasksQueueMaxSize (): number {
88 return this.tasksQueue.maxSize
89 }
90
91 /** @inheritdoc */
92 public enqueueTask (task: Task<Data>): number {
93 return this.tasksQueue.enqueue(task)
94 }
95
96 /** @inheritdoc */
97 public dequeueTask (): Task<Data> | undefined {
98 return this.tasksQueue.dequeue()
99 }
100
101 /** @inheritdoc */
102 public clearTasksQueue (): void {
103 this.tasksQueue.clear()
104 }
105
106 /** @inheritdoc */
107 public hasBackPressure (): boolean {
108 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
109 }
110
111 /** @inheritdoc */
112 public resetUsage (): void {
113 this.usage = this.initWorkerUsage()
114 this.taskFunctionsUsage.clear()
115 }
116
117 /** @inheritdoc */
118 public closeChannel (): void {
119 if (this.messageChannel != null) {
120 this.messageChannel?.port1.unref()
121 this.messageChannel?.port2.unref()
122 this.messageChannel?.port1.close()
123 this.messageChannel?.port2.close()
124 delete this.messageChannel
125 }
126 }
127
128 /** @inheritdoc */
129 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
130 if (!Array.isArray(this.info.taskFunctions)) {
131 throw new Error(
132 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
133 )
134 }
135 if (
136 Array.isArray(this.info.taskFunctions) &&
137 this.info.taskFunctions.length < 3
138 ) {
139 throw new Error(
140 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
141 )
142 }
143 if (name === DEFAULT_TASK_NAME) {
144 name = this.info.taskFunctions[1]
145 }
146 if (!this.taskFunctionsUsage.has(name)) {
147 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
148 }
149 return this.taskFunctionsUsage.get(name)
150 }
151
152 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
153 return {
154 id: this.getWorkerId(worker, workerType),
155 type: workerType,
156 dynamic: false,
157 ready: false
158 }
159 }
160
161 private initWorkerUsage (): WorkerUsage {
162 const getTasksQueueSize = (): number => {
163 return this.tasksQueueSize()
164 }
165 const getTasksQueueMaxSize = (): number => {
166 return this.tasksQueueMaxSize()
167 }
168 return {
169 tasks: {
170 executed: 0,
171 executing: 0,
172 get queued (): number {
173 return getTasksQueueSize()
174 },
175 get maxQueued (): number {
176 return getTasksQueueMaxSize()
177 },
178 failed: 0
179 },
180 runTime: {
181 history: new CircularArray()
182 },
183 waitTime: {
184 history: new CircularArray()
185 },
186 elu: {
187 idle: {
188 history: new CircularArray()
189 },
190 active: {
191 history: new CircularArray()
192 }
193 }
194 }
195 }
196
197 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
198 const getTaskFunctionQueueSize = (): number => {
199 let taskFunctionQueueSize = 0
200 for (const task of this.tasksQueue) {
201 if (
202 (task.name === DEFAULT_TASK_NAME &&
203 name === (this.info.taskFunctions as string[])[1]) ||
204 (task.name !== DEFAULT_TASK_NAME && name === task.name)
205 ) {
206 ++taskFunctionQueueSize
207 }
208 }
209 return taskFunctionQueueSize
210 }
211 return {
212 tasks: {
213 executed: 0,
214 executing: 0,
215 get queued (): number {
216 return getTaskFunctionQueueSize()
217 },
218 failed: 0
219 },
220 runTime: {
221 history: new CircularArray()
222 },
223 waitTime: {
224 history: new CircularArray()
225 },
226 elu: {
227 idle: {
228 history: new CircularArray()
229 },
230 active: {
231 history: new CircularArray()
232 }
233 }
234 }
235 }
236
237 /**
238 * Gets the worker id.
239 *
240 * @param worker - The worker.
241 * @param workerType - The worker type.
242 * @returns The worker id.
243 */
244 private getWorkerId (
245 worker: Worker,
246 workerType: WorkerType
247 ): number | undefined {
248 if (workerType === WorkerTypes.thread) {
249 return worker.threadId
250 } else if (workerType === WorkerTypes.cluster) {
251 return worker.id
252 }
253 }
254 }