build(ci): reduce coverage requirements for node 18.x
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
e1c2dba7 2import { EventEmitter } from 'node:events'
4b628b48 3import { CircularArray } from '../circular-array'
5c4d16da 4import type { Task } from '../utility-types'
463226a4 5import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
574b351d 6import { Deque } from '../deque'
4b628b48
JB
7import {
8 type IWorker,
9 type IWorkerNode,
f3a91bac 10 type StrategyData,
4b628b48
JB
11 type WorkerInfo,
12 type WorkerType,
13 WorkerTypes,
14 type WorkerUsage
15} from './worker'
9a38f99e 16import { checkWorkerNodeArguments } from './utils'
4b628b48 17
60664f48
JB
18/**
19 * Worker node.
20 *
21 * @typeParam Worker - Type of worker.
22 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
23 */
4b628b48 24export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 25 extends EventEmitter
9f95d5eb 26 implements IWorkerNode<Worker, Data> {
671d5154 27 /** @inheritdoc */
4b628b48 28 public readonly worker: Worker
671d5154 29 /** @inheritdoc */
4b628b48 30 public readonly info: WorkerInfo
671d5154 31 /** @inheritdoc */
4b628b48 32 public usage: WorkerUsage
20c6f652 33 /** @inheritdoc */
f3a91bac
JB
34 public strategyData?: StrategyData
35 /** @inheritdoc */
26fb3c18
JB
36 public messageChannel?: MessageChannel
37 /** @inheritdoc */
20c6f652 38 public tasksQueueBackPressureSize: number
574b351d 39 private readonly tasksQueue: Deque<Task<Data>>
47352846 40 private onBackPressureStarted: boolean
26fb3c18 41 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 42
60664f48
JB
43 /**
44 * Constructs a new worker node.
45 *
46 * @param worker - The worker.
20c6f652 47 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
60664f48 48 */
75de9f41 49 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
9f95d5eb 50 super()
9a38f99e 51 checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
4b628b48 52 this.worker = worker
75de9f41 53 this.info = this.initWorkerInfo(worker)
26fb3c18 54 this.usage = this.initWorkerUsage()
75de9f41 55 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
56 this.messageChannel = new MessageChannel()
57 }
20c6f652 58 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
26fb3c18 59 this.tasksQueue = new Deque<Task<Data>>()
47352846 60 this.onBackPressureStarted = false
26fb3c18 61 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
62 }
63
64 /** @inheritdoc */
65 public tasksQueueSize (): number {
66 return this.tasksQueue.size
67 }
68
4b628b48
JB
69 /** @inheritdoc */
70 public enqueueTask (task: Task<Data>): number {
72695f86 71 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 72 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 73 this.onBackPressureStarted = true
e1c2dba7 74 this.emit('backPressure', { workerId: this.info.id as number })
47352846 75 this.onBackPressureStarted = false
72695f86
JB
76 }
77 return tasksQueueSize
78 }
79
80 /** @inheritdoc */
81 public unshiftTask (task: Task<Data>): number {
82 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 83 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 84 this.onBackPressureStarted = true
e1c2dba7 85 this.emit('backPressure', { workerId: this.info.id as number })
47352846 86 this.onBackPressureStarted = false
72695f86
JB
87 }
88 return tasksQueueSize
4b628b48
JB
89 }
90
91 /** @inheritdoc */
92 public dequeueTask (): Task<Data> | undefined {
463226a4 93 return this.tasksQueue.shift()
4b628b48
JB
94 }
95
72695f86
JB
96 /** @inheritdoc */
97 public popTask (): Task<Data> | undefined {
463226a4 98 return this.tasksQueue.pop()
72695f86
JB
99 }
100
4b628b48
JB
101 /** @inheritdoc */
102 public clearTasksQueue (): void {
103 this.tasksQueue.clear()
104 }
105
671d5154
JB
106 /** @inheritdoc */
107 public hasBackPressure (): boolean {
8735b4e5 108 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
109 }
110
ff128cc9 111 /** @inheritdoc */
4b628b48
JB
112 public resetUsage (): void {
113 this.usage = this.initWorkerUsage()
db0e38ee 114 this.taskFunctionsUsage.clear()
ff128cc9
JB
115 }
116
3f09ed9f
JB
117 /** @inheritdoc */
118 public closeChannel (): void {
7884d183 119 if (this.messageChannel != null) {
3ff2b910
JB
120 this.messageChannel.port1.unref()
121 this.messageChannel.port2.unref()
122 this.messageChannel.port1.close()
123 this.messageChannel.port2.close()
7884d183 124 delete this.messageChannel
3f09ed9f
JB
125 }
126 }
127
ff128cc9 128 /** @inheritdoc */
db0e38ee 129 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 130 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 131 throw new Error(
db0e38ee 132 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
133 )
134 }
b558f6b5 135 if (
6703b9f4
JB
136 Array.isArray(this.info.taskFunctionNames) &&
137 this.info.taskFunctionNames.length < 3
b558f6b5 138 ) {
db0e38ee
JB
139 throw new Error(
140 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
141 )
142 }
143 if (name === DEFAULT_TASK_NAME) {
6703b9f4 144 name = this.info.taskFunctionNames[1]
b558f6b5 145 }
db0e38ee
JB
146 if (!this.taskFunctionsUsage.has(name)) {
147 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 148 }
db0e38ee 149 return this.taskFunctionsUsage.get(name)
4b628b48
JB
150 }
151
adee6053
JB
152 /** @inheritdoc */
153 public deleteTaskFunctionWorkerUsage (name: string): boolean {
154 return this.taskFunctionsUsage.delete(name)
155 }
156
75de9f41 157 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 158 return {
75de9f41
JB
159 id: getWorkerId(worker),
160 type: getWorkerType(worker) as WorkerType,
4b628b48 161 dynamic: false,
7884d183 162 ready: false
4b628b48
JB
163 }
164 }
165
166 private initWorkerUsage (): WorkerUsage {
167 const getTasksQueueSize = (): number => {
dd951876 168 return this.tasksQueue.size
4b628b48 169 }
bf4ef2ca 170 const getTasksQueueMaxSize = (): number => {
dd951876 171 return this.tasksQueue.maxSize
4b628b48
JB
172 }
173 return {
174 tasks: {
175 executed: 0,
176 executing: 0,
177 get queued (): number {
178 return getTasksQueueSize()
179 },
180 get maxQueued (): number {
bf4ef2ca 181 return getTasksQueueMaxSize()
4b628b48 182 },
463226a4 183 sequentiallyStolen: 0,
68cbdc84 184 stolen: 0,
4b628b48
JB
185 failed: 0
186 },
187 runTime: {
c52475b8 188 history: new CircularArray<number>()
4b628b48
JB
189 },
190 waitTime: {
c52475b8 191 history: new CircularArray<number>()
4b628b48
JB
192 },
193 elu: {
194 idle: {
c52475b8 195 history: new CircularArray<number>()
4b628b48
JB
196 },
197 active: {
c52475b8 198 history: new CircularArray<number>()
4b628b48
JB
199 }
200 }
201 }
202 }
203
db0e38ee 204 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
205 const getTaskFunctionQueueSize = (): number => {
206 let taskFunctionQueueSize = 0
b25a42cd 207 for (const task of this.tasksQueue) {
dd92a715 208 if (
e5ece61d 209 (task.name === DEFAULT_TASK_NAME &&
6703b9f4 210 name === (this.info.taskFunctionNames as string[])[1]) ||
e5ece61d 211 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 212 ) {
e5ece61d 213 ++taskFunctionQueueSize
b25a42cd
JB
214 }
215 }
e5ece61d 216 return taskFunctionQueueSize
b25a42cd
JB
217 }
218 return {
219 tasks: {
220 executed: 0,
221 executing: 0,
222 get queued (): number {
e5ece61d 223 return getTaskFunctionQueueSize()
b25a42cd 224 },
463226a4 225 sequentiallyStolen: 0,
68cbdc84 226 stolen: 0,
b25a42cd
JB
227 failed: 0
228 },
229 runTime: {
c52475b8 230 history: new CircularArray<number>()
b25a42cd
JB
231 },
232 waitTime: {
c52475b8 233 history: new CircularArray<number>()
b25a42cd
JB
234 },
235 elu: {
236 idle: {
c52475b8 237 history: new CircularArray<number>()
b25a42cd
JB
238 },
239 active: {
c52475b8 240 history: new CircularArray<number>()
b25a42cd
JB
241 }
242 }
243 }
244 }
4b628b48 245}