feat: add tasks stealing algorithm
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48 2import { CircularArray } from '../circular-array'
5c4d16da 3import type { Task } from '../utility-types'
b558f6b5 4import { DEFAULT_TASK_NAME } from '../utils'
574b351d 5import { Deque } from '../deque'
4b628b48
JB
6import {
7 type IWorker,
8 type IWorkerNode,
4b628b48
JB
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13} from './worker'
14
60664f48
JB
15/**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
4b628b48
JB
21export class WorkerNode<Worker extends IWorker, Data = unknown>
22implements IWorkerNode<Worker, Data> {
671d5154 23 /** @inheritdoc */
4b628b48 24 public readonly worker: Worker
671d5154 25 /** @inheritdoc */
4b628b48 26 public readonly info: WorkerInfo
671d5154 27 /** @inheritdoc */
7884d183 28 public messageChannel?: MessageChannel
671d5154 29 /** @inheritdoc */
4b628b48 30 public usage: WorkerUsage
20c6f652
JB
31 /** @inheritdoc */
32 public tasksQueueBackPressureSize: number
72695f86
JB
33 /** @inheritdoc */
34 public onBackPressure?: (workerId: number) => void
db0e38ee 35 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
574b351d 36 private readonly tasksQueue: Deque<Task<Data>>
4b628b48 37
60664f48
JB
38 /**
39 * Constructs a new worker node.
40 *
41 * @param worker - The worker.
42 * @param workerType - The worker type.
20c6f652 43 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
60664f48 44 */
20c6f652
JB
45 constructor (
46 worker: Worker,
47 workerType: WorkerType,
48 tasksQueueBackPressureSize: number
49 ) {
8735b4e5 50 if (worker == null) {
e695d66f 51 throw new TypeError('Cannot construct a worker node without a worker')
8735b4e5
JB
52 }
53 if (workerType == null) {
e695d66f
JB
54 throw new TypeError(
55 'Cannot construct a worker node without a worker type'
56 )
8735b4e5 57 }
20c6f652 58 if (tasksQueueBackPressureSize == null) {
e695d66f 59 throw new TypeError(
20c6f652 60 'Cannot construct a worker node without a tasks queue back pressure size'
8735b4e5
JB
61 )
62 }
20c6f652 63 if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
e695d66f 64 throw new TypeError(
20c6f652 65 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
8735b4e5
JB
66 )
67 }
4b628b48
JB
68 this.worker = worker
69 this.info = this.initWorkerInfo(worker, workerType)
7884d183
JB
70 if (workerType === WorkerTypes.thread) {
71 this.messageChannel = new MessageChannel()
72 }
4b628b48 73 this.usage = this.initWorkerUsage()
db0e38ee 74 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
574b351d 75 this.tasksQueue = new Deque<Task<Data>>()
20c6f652 76 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
4b628b48
JB
77 }
78
79 /** @inheritdoc */
80 public tasksQueueSize (): number {
81 return this.tasksQueue.size
82 }
83
84 /**
eb8afc8a 85 * Tasks queue maximum size.
4b628b48
JB
86 *
87 * @returns The tasks queue maximum size.
88 */
89 private tasksQueueMaxSize (): number {
90 return this.tasksQueue.maxSize
91 }
92
93 /** @inheritdoc */
94 public enqueueTask (task: Task<Data>): number {
72695f86
JB
95 const tasksQueueSize = this.tasksQueue.push(task)
96 if (this.onBackPressure != null && this.hasBackPressure()) {
97 this.once(this.onBackPressure)(this.info.id as number)
98 }
99 return tasksQueueSize
100 }
101
102 /** @inheritdoc */
103 public unshiftTask (task: Task<Data>): number {
104 const tasksQueueSize = this.tasksQueue.unshift(task)
105 if (this.onBackPressure != null && this.hasBackPressure()) {
106 this.once(this.onBackPressure)(this.info.id as number)
107 }
108 return tasksQueueSize
4b628b48
JB
109 }
110
111 /** @inheritdoc */
112 public dequeueTask (): Task<Data> | undefined {
574b351d 113 return this.tasksQueue.shift()
4b628b48
JB
114 }
115
72695f86
JB
116 /** @inheritdoc */
117 public popTask (): Task<Data> | undefined {
118 return this.tasksQueue.pop()
119 }
120
4b628b48
JB
121 /** @inheritdoc */
122 public clearTasksQueue (): void {
123 this.tasksQueue.clear()
124 }
125
671d5154
JB
126 /** @inheritdoc */
127 public hasBackPressure (): boolean {
8735b4e5 128 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
129 }
130
ff128cc9 131 /** @inheritdoc */
4b628b48
JB
132 public resetUsage (): void {
133 this.usage = this.initWorkerUsage()
db0e38ee 134 this.taskFunctionsUsage.clear()
ff128cc9
JB
135 }
136
3f09ed9f
JB
137 /** @inheritdoc */
138 public closeChannel (): void {
7884d183
JB
139 if (this.messageChannel != null) {
140 this.messageChannel?.port1.unref()
141 this.messageChannel?.port2.unref()
142 this.messageChannel?.port1.close()
143 this.messageChannel?.port2.close()
144 delete this.messageChannel
3f09ed9f
JB
145 }
146 }
147
ff128cc9 148 /** @inheritdoc */
db0e38ee 149 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
a5d15204 150 if (!Array.isArray(this.info.taskFunctions)) {
71b2b6d8 151 throw new Error(
db0e38ee 152 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
153 )
154 }
b558f6b5 155 if (
71b2b6d8 156 Array.isArray(this.info.taskFunctions) &&
db0e38ee 157 this.info.taskFunctions.length < 3
b558f6b5 158 ) {
db0e38ee
JB
159 throw new Error(
160 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
161 )
162 }
163 if (name === DEFAULT_TASK_NAME) {
71b2b6d8 164 name = this.info.taskFunctions[1]
b558f6b5 165 }
db0e38ee
JB
166 if (!this.taskFunctionsUsage.has(name)) {
167 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 168 }
db0e38ee 169 return this.taskFunctionsUsage.get(name)
4b628b48
JB
170 }
171
172 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
173 return {
174 id: this.getWorkerId(worker, workerType),
175 type: workerType,
176 dynamic: false,
7884d183 177 ready: false
4b628b48
JB
178 }
179 }
180
181 private initWorkerUsage (): WorkerUsage {
182 const getTasksQueueSize = (): number => {
183 return this.tasksQueueSize()
184 }
bf4ef2ca 185 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
186 return this.tasksQueueMaxSize()
187 }
188 return {
189 tasks: {
190 executed: 0,
191 executing: 0,
192 get queued (): number {
193 return getTasksQueueSize()
194 },
195 get maxQueued (): number {
bf4ef2ca 196 return getTasksQueueMaxSize()
4b628b48
JB
197 },
198 failed: 0
199 },
200 runTime: {
201 history: new CircularArray()
202 },
203 waitTime: {
204 history: new CircularArray()
205 },
206 elu: {
207 idle: {
208 history: new CircularArray()
209 },
210 active: {
211 history: new CircularArray()
212 }
213 }
214 }
215 }
216
db0e38ee 217 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
218 const getTaskFunctionQueueSize = (): number => {
219 let taskFunctionQueueSize = 0
b25a42cd 220 for (const task of this.tasksQueue) {
dd92a715 221 if (
e5ece61d
JB
222 (task.name === DEFAULT_TASK_NAME &&
223 name === (this.info.taskFunctions as string[])[1]) ||
224 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 225 ) {
e5ece61d 226 ++taskFunctionQueueSize
b25a42cd
JB
227 }
228 }
e5ece61d 229 return taskFunctionQueueSize
b25a42cd
JB
230 }
231 return {
232 tasks: {
233 executed: 0,
234 executing: 0,
235 get queued (): number {
e5ece61d 236 return getTaskFunctionQueueSize()
b25a42cd
JB
237 },
238 failed: 0
239 },
240 runTime: {
241 history: new CircularArray()
242 },
243 waitTime: {
244 history: new CircularArray()
245 },
246 elu: {
247 idle: {
248 history: new CircularArray()
249 },
250 active: {
251 history: new CircularArray()
252 }
253 }
254 }
255 }
256
4b628b48
JB
257 /**
258 * Gets the worker id.
259 *
260 * @param worker - The worker.
60664f48 261 * @param workerType - The worker type.
4b628b48
JB
262 * @returns The worker id.
263 */
264 private getWorkerId (
265 worker: Worker,
266 workerType: WorkerType
267 ): number | undefined {
268 if (workerType === WorkerTypes.thread) {
269 return worker.threadId
270 } else if (workerType === WorkerTypes.cluster) {
271 return worker.id
272 }
273 }
72695f86
JB
274
275 /**
276 * Executes a function once at a time.
277 */
278
279 private once (
280 // eslint-disable-next-line @typescript-eslint/no-explicit-any
281 fn: (...args: any[]) => void,
282 context = this
283 // eslint-disable-next-line @typescript-eslint/no-explicit-any
284 ): (...args: any[]) => void {
285 let called = false
286 // eslint-disable-next-line @typescript-eslint/no-explicit-any
287 return function (...args: any[]): void {
288 if (!called) {
289 called = true
290 fn.apply(context, args)
291 called = false
292 }
293 }
294 }
4b628b48 295}