fix: reset worker choice strategies retries count
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48
JB
2import { CircularArray } from '../circular-array'
3import { Queue } from '../queue'
5c4d16da 4import type { Task } from '../utility-types'
b558f6b5 5import { DEFAULT_TASK_NAME } from '../utils'
4b628b48
JB
6import {
7 type IWorker,
8 type IWorkerNode,
4b628b48
JB
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13} from './worker'
14
60664f48
JB
15/**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
4b628b48
JB
21export class WorkerNode<Worker extends IWorker, Data = unknown>
22implements IWorkerNode<Worker, Data> {
671d5154 23 /** @inheritdoc */
4b628b48 24 public readonly worker: Worker
671d5154 25 /** @inheritdoc */
4b628b48 26 public readonly info: WorkerInfo
671d5154 27 /** @inheritdoc */
7884d183 28 public messageChannel?: MessageChannel
671d5154 29 /** @inheritdoc */
4b628b48 30 public usage: WorkerUsage
ff128cc9 31 private readonly tasksUsage: Map<string, WorkerUsage>
4b628b48 32 private readonly tasksQueue: Queue<Task<Data>>
e2b31e32 33 private readonly tasksQueueBackPressureSize: number
4b628b48 34
60664f48
JB
35 /**
36 * Constructs a new worker node.
37 *
38 * @param worker - The worker.
39 * @param workerType - The worker type.
671d5154 40 * @param poolMaxSize - The pool maximum size.
60664f48 41 */
671d5154 42 constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) {
4b628b48
JB
43 this.worker = worker
44 this.info = this.initWorkerInfo(worker, workerType)
7884d183
JB
45 if (workerType === WorkerTypes.thread) {
46 this.messageChannel = new MessageChannel()
47 }
4b628b48 48 this.usage = this.initWorkerUsage()
ff128cc9 49 this.tasksUsage = new Map<string, WorkerUsage>()
4b628b48 50 this.tasksQueue = new Queue<Task<Data>>()
e2b31e32 51 this.tasksQueueBackPressureSize = Math.pow(poolMaxSize, 2)
4b628b48
JB
52 }
53
54 /** @inheritdoc */
55 public tasksQueueSize (): number {
56 return this.tasksQueue.size
57 }
58
59 /**
eb8afc8a 60 * Tasks queue maximum size.
4b628b48
JB
61 *
62 * @returns The tasks queue maximum size.
63 */
64 private tasksQueueMaxSize (): number {
65 return this.tasksQueue.maxSize
66 }
67
68 /** @inheritdoc */
69 public enqueueTask (task: Task<Data>): number {
70 return this.tasksQueue.enqueue(task)
71 }
72
73 /** @inheritdoc */
74 public dequeueTask (): Task<Data> | undefined {
75 return this.tasksQueue.dequeue()
76 }
77
78 /** @inheritdoc */
79 public clearTasksQueue (): void {
80 this.tasksQueue.clear()
81 }
82
671d5154
JB
83 /** @inheritdoc */
84 public hasBackPressure (): boolean {
e2b31e32 85 return this.tasksQueueSize() >= this.tasksQueueBackPressureSize
671d5154
JB
86 }
87
ff128cc9 88 /** @inheritdoc */
4b628b48
JB
89 public resetUsage (): void {
90 this.usage = this.initWorkerUsage()
ff128cc9
JB
91 this.tasksUsage.clear()
92 }
93
3f09ed9f
JB
94 /** @inheritdoc */
95 public closeChannel (): void {
7884d183
JB
96 if (this.messageChannel != null) {
97 this.messageChannel?.port1.unref()
98 this.messageChannel?.port2.unref()
99 this.messageChannel?.port1.close()
100 this.messageChannel?.port2.close()
101 delete this.messageChannel
3f09ed9f
JB
102 }
103 }
104
ff128cc9 105 /** @inheritdoc */
ce1b31be 106 public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
a5d15204 107 if (!Array.isArray(this.info.taskFunctions)) {
71b2b6d8 108 throw new Error(
a5d15204 109 `Cannot get task worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
110 )
111 }
b558f6b5
JB
112 if (
113 name === DEFAULT_TASK_NAME &&
71b2b6d8
JB
114 Array.isArray(this.info.taskFunctions) &&
115 this.info.taskFunctions.length > 1
b558f6b5 116 ) {
71b2b6d8 117 name = this.info.taskFunctions[1]
b558f6b5 118 }
ff128cc9 119 if (!this.tasksUsage.has(name)) {
b25a42cd 120 this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
ff128cc9
JB
121 }
122 return this.tasksUsage.get(name)
4b628b48
JB
123 }
124
125 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
126 return {
127 id: this.getWorkerId(worker, workerType),
128 type: workerType,
129 dynamic: false,
7884d183 130 ready: false
4b628b48
JB
131 }
132 }
133
134 private initWorkerUsage (): WorkerUsage {
135 const getTasksQueueSize = (): number => {
136 return this.tasksQueueSize()
137 }
bf4ef2ca 138 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
139 return this.tasksQueueMaxSize()
140 }
141 return {
142 tasks: {
143 executed: 0,
144 executing: 0,
145 get queued (): number {
146 return getTasksQueueSize()
147 },
148 get maxQueued (): number {
bf4ef2ca 149 return getTasksQueueMaxSize()
4b628b48
JB
150 },
151 failed: 0
152 },
153 runTime: {
154 history: new CircularArray()
155 },
156 waitTime: {
157 history: new CircularArray()
158 },
159 elu: {
160 idle: {
161 history: new CircularArray()
162 },
163 active: {
164 history: new CircularArray()
165 }
166 }
167 }
168 }
169
b25a42cd
JB
170 private initTaskWorkerUsage (name: string): WorkerUsage {
171 const getTaskQueueSize = (): number => {
172 let taskQueueSize = 0
173 for (const task of this.tasksQueue) {
174 if (task.name === name) {
175 ++taskQueueSize
176 }
177 }
178 return taskQueueSize
179 }
180 return {
181 tasks: {
182 executed: 0,
183 executing: 0,
184 get queued (): number {
185 return getTaskQueueSize()
186 },
187 failed: 0
188 },
189 runTime: {
190 history: new CircularArray()
191 },
192 waitTime: {
193 history: new CircularArray()
194 },
195 elu: {
196 idle: {
197 history: new CircularArray()
198 },
199 active: {
200 history: new CircularArray()
201 }
202 }
203 }
204 }
205
4b628b48
JB
206 /**
207 * Gets the worker id.
208 *
209 * @param worker - The worker.
60664f48 210 * @param workerType - The worker type.
4b628b48
JB
211 * @returns The worker id.
212 */
213 private getWorkerId (
214 worker: Worker,
215 workerType: WorkerType
216 ): number | undefined {
217 if (workerType === WorkerTypes.thread) {
218 return worker.threadId
219 } else if (workerType === WorkerTypes.cluster) {
220 return worker.id
221 }
222 }
223}