59c4de7a388e81434c373f1835b144c1b3c88f57
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { EventEmitter } from 'node:events'
3 import { CircularArray } from '../circular-array'
4 import type { Task } from '../utility-types'
5 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
6 import { Deque } from '../deque'
7 import {
8 type IWorker,
9 type IWorkerNode,
10 type StrategyData,
11 type WorkerInfo,
12 type WorkerType,
13 WorkerTypes,
14 type WorkerUsage
15 } from './worker'
16 import { checkWorkerNodeArguments } from './utils'
17
18 /**
19 * Worker node.
20 *
21 * @typeParam Worker - Type of worker.
22 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
23 */
24 export class WorkerNode<Worker extends IWorker, Data = unknown>
25 extends EventEmitter
26 implements IWorkerNode<Worker, Data> {
27 /** @inheritdoc */
28 public readonly worker: Worker
29 /** @inheritdoc */
30 public readonly info: WorkerInfo
31 /** @inheritdoc */
32 public usage: WorkerUsage
33 /** @inheritdoc */
34 public strategyData?: StrategyData
35 /** @inheritdoc */
36 public messageChannel?: MessageChannel
37 /** @inheritdoc */
38 public tasksQueueBackPressureSize: number
39 private readonly tasksQueue: Deque<Task<Data>>
40 private onBackPressureStarted: boolean
41 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
42
43 /**
44 * Constructs a new worker node.
45 *
46 * @param worker - The worker.
47 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
48 */
49 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
50 super()
51 checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
52 this.worker = worker
53 this.info = this.initWorkerInfo(worker)
54 this.usage = this.initWorkerUsage()
55 if (this.info.type === WorkerTypes.thread) {
56 this.messageChannel = new MessageChannel()
57 }
58 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
59 this.tasksQueue = new Deque<Task<Data>>()
60 this.onBackPressureStarted = false
61 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
62 }
63
64 /** @inheritdoc */
65 public tasksQueueSize (): number {
66 return this.tasksQueue.size
67 }
68
69 /** @inheritdoc */
70 public enqueueTask (task: Task<Data>): number {
71 const tasksQueueSize = this.tasksQueue.push(task)
72 if (this.hasBackPressure() && !this.onBackPressureStarted) {
73 this.onBackPressureStarted = true
74 this.emit('backPressure', { workerId: this.info.id as number })
75 this.onBackPressureStarted = false
76 }
77 return tasksQueueSize
78 }
79
80 /** @inheritdoc */
81 public unshiftTask (task: Task<Data>): number {
82 const tasksQueueSize = this.tasksQueue.unshift(task)
83 if (this.hasBackPressure() && !this.onBackPressureStarted) {
84 this.onBackPressureStarted = true
85 this.emit('backPressure', { workerId: this.info.id as number })
86 this.onBackPressureStarted = false
87 }
88 return tasksQueueSize
89 }
90
91 /** @inheritdoc */
92 public dequeueTask (): Task<Data> | undefined {
93 return this.tasksQueue.shift()
94 }
95
96 /** @inheritdoc */
97 public popTask (): Task<Data> | undefined {
98 return this.tasksQueue.pop()
99 }
100
101 /** @inheritdoc */
102 public clearTasksQueue (): void {
103 this.tasksQueue.clear()
104 }
105
106 /** @inheritdoc */
107 public hasBackPressure (): boolean {
108 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
109 }
110
111 /** @inheritdoc */
112 public resetUsage (): void {
113 this.usage = this.initWorkerUsage()
114 this.taskFunctionsUsage.clear()
115 }
116
117 /** @inheritdoc */
118 public closeChannel (): void {
119 if (this.messageChannel != null) {
120 this.messageChannel.port1.unref()
121 this.messageChannel.port2.unref()
122 this.messageChannel.port1.close()
123 this.messageChannel.port2.close()
124 delete this.messageChannel
125 }
126 }
127
128 /** @inheritdoc */
129 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
130 if (!Array.isArray(this.info.taskFunctionNames)) {
131 throw new Error(
132 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
133 )
134 }
135 if (
136 Array.isArray(this.info.taskFunctionNames) &&
137 this.info.taskFunctionNames.length < 3
138 ) {
139 throw new Error(
140 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
141 )
142 }
143 if (name === DEFAULT_TASK_NAME) {
144 name = this.info.taskFunctionNames[1]
145 }
146 if (!this.taskFunctionsUsage.has(name)) {
147 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
148 }
149 return this.taskFunctionsUsage.get(name)
150 }
151
152 /** @inheritdoc */
153 public deleteTaskFunctionWorkerUsage (name: string): boolean {
154 return this.taskFunctionsUsage.delete(name)
155 }
156
157 private initWorkerInfo (worker: Worker): WorkerInfo {
158 return {
159 id: getWorkerId(worker),
160 type: getWorkerType(worker) as WorkerType,
161 dynamic: false,
162 ready: false
163 }
164 }
165
166 private initWorkerUsage (): WorkerUsage {
167 const getTasksQueueSize = (): number => {
168 return this.tasksQueue.size
169 }
170 const getTasksQueueMaxSize = (): number => {
171 return this.tasksQueue.maxSize
172 }
173 return {
174 tasks: {
175 executed: 0,
176 executing: 0,
177 get queued (): number {
178 return getTasksQueueSize()
179 },
180 get maxQueued (): number {
181 return getTasksQueueMaxSize()
182 },
183 sequentiallyStolen: 0,
184 stolen: 0,
185 failed: 0
186 },
187 runTime: {
188 history: new CircularArray<number>()
189 },
190 waitTime: {
191 history: new CircularArray<number>()
192 },
193 elu: {
194 idle: {
195 history: new CircularArray<number>()
196 },
197 active: {
198 history: new CircularArray<number>()
199 }
200 }
201 }
202 }
203
204 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
205 const getTaskFunctionQueueSize = (): number => {
206 let taskFunctionQueueSize = 0
207 for (const task of this.tasksQueue) {
208 if (
209 (task.name === DEFAULT_TASK_NAME &&
210 name === (this.info.taskFunctionNames as string[])[1]) ||
211 (task.name !== DEFAULT_TASK_NAME && name === task.name)
212 ) {
213 ++taskFunctionQueueSize
214 }
215 }
216 return taskFunctionQueueSize
217 }
218 return {
219 tasks: {
220 executed: 0,
221 executing: 0,
222 get queued (): number {
223 return getTaskFunctionQueueSize()
224 },
225 sequentiallyStolen: 0,
226 stolen: 0,
227 failed: 0
228 },
229 runTime: {
230 history: new CircularArray<number>()
231 },
232 waitTime: {
233 history: new CircularArray<number>()
234 },
235 elu: {
236 idle: {
237 history: new CircularArray<number>()
238 },
239 active: {
240 history: new CircularArray<number>()
241 }
242 }
243 }
244 }
245 }