fix: fix back pressure detection
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48
JB
2import { CircularArray } from '../circular-array'
3import { Queue } from '../queue'
5c4d16da 4import type { Task } from '../utility-types'
b558f6b5 5import { DEFAULT_TASK_NAME } from '../utils'
4b628b48
JB
6import {
7 type IWorker,
8 type IWorkerNode,
4b628b48
JB
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13} from './worker'
14
60664f48
JB
15/**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
4b628b48
JB
21export class WorkerNode<Worker extends IWorker, Data = unknown>
22implements IWorkerNode<Worker, Data> {
671d5154 23 /** @inheritdoc */
4b628b48 24 public readonly worker: Worker
671d5154 25 /** @inheritdoc */
4b628b48 26 public readonly info: WorkerInfo
671d5154 27 /** @inheritdoc */
7884d183 28 public messageChannel?: MessageChannel
671d5154 29 /** @inheritdoc */
4b628b48 30 public usage: WorkerUsage
db0e38ee 31 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 32 private readonly tasksQueue: Queue<Task<Data>>
e2b31e32 33 private readonly tasksQueueBackPressureSize: number
4b628b48 34
60664f48
JB
35 /**
36 * Constructs a new worker node.
37 *
38 * @param worker - The worker.
39 * @param workerType - The worker type.
671d5154 40 * @param poolMaxSize - The pool maximum size.
60664f48 41 */
671d5154 42 constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) {
4b628b48
JB
43 this.worker = worker
44 this.info = this.initWorkerInfo(worker, workerType)
7884d183
JB
45 if (workerType === WorkerTypes.thread) {
46 this.messageChannel = new MessageChannel()
47 }
4b628b48 48 this.usage = this.initWorkerUsage()
db0e38ee 49 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48 50 this.tasksQueue = new Queue<Task<Data>>()
e2b31e32 51 this.tasksQueueBackPressureSize = Math.pow(poolMaxSize, 2)
4b628b48
JB
52 }
53
54 /** @inheritdoc */
55 public tasksQueueSize (): number {
56 return this.tasksQueue.size
57 }
58
59 /**
eb8afc8a 60 * Tasks queue maximum size.
4b628b48
JB
61 *
62 * @returns The tasks queue maximum size.
63 */
64 private tasksQueueMaxSize (): number {
65 return this.tasksQueue.maxSize
66 }
67
68 /** @inheritdoc */
69 public enqueueTask (task: Task<Data>): number {
70 return this.tasksQueue.enqueue(task)
71 }
72
73 /** @inheritdoc */
74 public dequeueTask (): Task<Data> | undefined {
75 return this.tasksQueue.dequeue()
76 }
77
78 /** @inheritdoc */
79 public clearTasksQueue (): void {
80 this.tasksQueue.clear()
81 }
82
671d5154
JB
83 /** @inheritdoc */
84 public hasBackPressure (): boolean {
e2b31e32 85 return this.tasksQueueSize() >= this.tasksQueueBackPressureSize
671d5154
JB
86 }
87
ff128cc9 88 /** @inheritdoc */
4b628b48
JB
89 public resetUsage (): void {
90 this.usage = this.initWorkerUsage()
db0e38ee 91 this.taskFunctionsUsage.clear()
ff128cc9
JB
92 }
93
3f09ed9f
JB
94 /** @inheritdoc */
95 public closeChannel (): void {
7884d183
JB
96 if (this.messageChannel != null) {
97 this.messageChannel?.port1.unref()
98 this.messageChannel?.port2.unref()
99 this.messageChannel?.port1.close()
100 this.messageChannel?.port2.close()
101 delete this.messageChannel
3f09ed9f
JB
102 }
103 }
104
ff128cc9 105 /** @inheritdoc */
db0e38ee 106 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
a5d15204 107 if (!Array.isArray(this.info.taskFunctions)) {
71b2b6d8 108 throw new Error(
db0e38ee 109 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
110 )
111 }
b558f6b5 112 if (
71b2b6d8 113 Array.isArray(this.info.taskFunctions) &&
db0e38ee 114 this.info.taskFunctions.length < 3
b558f6b5 115 ) {
db0e38ee
JB
116 throw new Error(
117 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
118 )
119 }
120 if (name === DEFAULT_TASK_NAME) {
71b2b6d8 121 name = this.info.taskFunctions[1]
b558f6b5 122 }
db0e38ee
JB
123 if (!this.taskFunctionsUsage.has(name)) {
124 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 125 }
db0e38ee 126 return this.taskFunctionsUsage.get(name)
4b628b48
JB
127 }
128
129 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
130 return {
131 id: this.getWorkerId(worker, workerType),
132 type: workerType,
133 dynamic: false,
7884d183 134 ready: false
4b628b48
JB
135 }
136 }
137
138 private initWorkerUsage (): WorkerUsage {
139 const getTasksQueueSize = (): number => {
140 return this.tasksQueueSize()
141 }
bf4ef2ca 142 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
143 return this.tasksQueueMaxSize()
144 }
145 return {
146 tasks: {
147 executed: 0,
148 executing: 0,
149 get queued (): number {
150 return getTasksQueueSize()
151 },
152 get maxQueued (): number {
bf4ef2ca 153 return getTasksQueueMaxSize()
4b628b48
JB
154 },
155 failed: 0
156 },
157 runTime: {
158 history: new CircularArray()
159 },
160 waitTime: {
161 history: new CircularArray()
162 },
163 elu: {
164 idle: {
165 history: new CircularArray()
166 },
167 active: {
168 history: new CircularArray()
169 }
170 }
171 }
172 }
173
db0e38ee 174 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
b25a42cd
JB
175 const getTaskQueueSize = (): number => {
176 let taskQueueSize = 0
177 for (const task of this.tasksQueue) {
178 if (task.name === name) {
179 ++taskQueueSize
180 }
181 }
182 return taskQueueSize
183 }
184 return {
185 tasks: {
186 executed: 0,
187 executing: 0,
188 get queued (): number {
189 return getTaskQueueSize()
190 },
191 failed: 0
192 },
193 runTime: {
194 history: new CircularArray()
195 },
196 waitTime: {
197 history: new CircularArray()
198 },
199 elu: {
200 idle: {
201 history: new CircularArray()
202 },
203 active: {
204 history: new CircularArray()
205 }
206 }
207 }
208 }
209
4b628b48
JB
210 /**
211 * Gets the worker id.
212 *
213 * @param worker - The worker.
60664f48 214 * @param workerType - The worker type.
4b628b48
JB
215 * @returns The worker id.
216 */
217 private getWorkerId (
218 worker: Worker,
219 workerType: WorkerType
220 ): number | undefined {
221 if (workerType === WorkerTypes.thread) {
222 return worker.threadId
223 } else if (workerType === WorkerTypes.cluster) {
224 return worker.id
225 }
226 }
227}