feat: fire tasks stealing at worker node idling
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48 2import { CircularArray } from '../circular-array'
5c4d16da 3import type { Task } from '../utility-types'
68cbdc84
JB
4import {
5 DEFAULT_TASK_NAME,
6 EMPTY_FUNCTION,
7 exponentialDelay,
75de9f41
JB
8 getWorkerId,
9 getWorkerType,
68cbdc84
JB
10 sleep
11} from '../utils'
574b351d 12import { Deque } from '../deque'
4b628b48
JB
13import {
14 type IWorker,
15 type IWorkerNode,
f3a91bac 16 type StrategyData,
4b628b48 17 type WorkerInfo,
9f95d5eb 18 type WorkerNodeEventDetail,
4b628b48
JB
19 type WorkerType,
20 WorkerTypes,
21 type WorkerUsage
22} from './worker'
9a38f99e 23import { checkWorkerNodeArguments } from './utils'
4b628b48 24
60664f48
JB
25/**
26 * Worker node.
27 *
28 * @typeParam Worker - Type of worker.
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 */
4b628b48 31export class WorkerNode<Worker extends IWorker, Data = unknown>
9f95d5eb
JB
32 extends EventTarget
33 implements IWorkerNode<Worker, Data> {
671d5154 34 /** @inheritdoc */
4b628b48 35 public readonly worker: Worker
671d5154 36 /** @inheritdoc */
4b628b48 37 public readonly info: WorkerInfo
671d5154 38 /** @inheritdoc */
4b628b48 39 public usage: WorkerUsage
20c6f652 40 /** @inheritdoc */
f3a91bac
JB
41 public strategyData?: StrategyData
42 /** @inheritdoc */
26fb3c18
JB
43 public messageChannel?: MessageChannel
44 /** @inheritdoc */
20c6f652 45 public tasksQueueBackPressureSize: number
574b351d 46 private readonly tasksQueue: Deque<Task<Data>>
47352846 47 private onBackPressureStarted: boolean
65542a35 48 private onIdleWorkerNodeCount: number
26fb3c18 49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 50
60664f48
JB
51 /**
52 * Constructs a new worker node.
53 *
54 * @param worker - The worker.
20c6f652 55 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
60664f48 56 */
75de9f41 57 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
9f95d5eb 58 super()
9a38f99e 59 checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
4b628b48 60 this.worker = worker
75de9f41 61 this.info = this.initWorkerInfo(worker)
26fb3c18 62 this.usage = this.initWorkerUsage()
75de9f41 63 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
64 this.messageChannel = new MessageChannel()
65 }
20c6f652 66 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
26fb3c18 67 this.tasksQueue = new Deque<Task<Data>>()
47352846 68 this.onBackPressureStarted = false
65542a35 69 this.onIdleWorkerNodeCount = 0
26fb3c18 70 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
71 }
72
73 /** @inheritdoc */
74 public tasksQueueSize (): number {
75 return this.tasksQueue.size
76 }
77
4b628b48
JB
78 /** @inheritdoc */
79 public enqueueTask (task: Task<Data>): number {
72695f86 80 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 81 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 82 this.onBackPressureStarted = true
9f95d5eb 83 this.dispatchEvent(
b5e75be8 84 new CustomEvent<WorkerNodeEventDetail>('backPressure', {
9f95d5eb
JB
85 detail: { workerId: this.info.id as number }
86 })
87 )
47352846 88 this.onBackPressureStarted = false
72695f86
JB
89 }
90 return tasksQueueSize
91 }
92
93 /** @inheritdoc */
94 public unshiftTask (task: Task<Data>): number {
95 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 96 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 97 this.onBackPressureStarted = true
9f95d5eb 98 this.dispatchEvent(
b5e75be8 99 new CustomEvent<WorkerNodeEventDetail>('backPressure', {
9f95d5eb
JB
100 detail: { workerId: this.info.id as number }
101 })
102 )
47352846 103 this.onBackPressureStarted = false
72695f86
JB
104 }
105 return tasksQueueSize
4b628b48
JB
106 }
107
108 /** @inheritdoc */
109 public dequeueTask (): Task<Data> | undefined {
dd951876 110 const task = this.tasksQueue.shift()
65542a35
JB
111 if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
112 this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
dd951876
JB
113 }
114 return task
4b628b48
JB
115 }
116
72695f86
JB
117 /** @inheritdoc */
118 public popTask (): Task<Data> | undefined {
dd951876 119 const task = this.tasksQueue.pop()
65542a35
JB
120 if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
121 this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
dd951876
JB
122 }
123 return task
72695f86
JB
124 }
125
4b628b48
JB
126 /** @inheritdoc */
127 public clearTasksQueue (): void {
128 this.tasksQueue.clear()
129 }
130
671d5154
JB
131 /** @inheritdoc */
132 public hasBackPressure (): boolean {
8735b4e5 133 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
134 }
135
ff128cc9 136 /** @inheritdoc */
4b628b48
JB
137 public resetUsage (): void {
138 this.usage = this.initWorkerUsage()
db0e38ee 139 this.taskFunctionsUsage.clear()
ff128cc9
JB
140 }
141
3f09ed9f
JB
142 /** @inheritdoc */
143 public closeChannel (): void {
7884d183 144 if (this.messageChannel != null) {
3ff2b910
JB
145 this.messageChannel.port1.unref()
146 this.messageChannel.port2.unref()
147 this.messageChannel.port1.close()
148 this.messageChannel.port2.close()
7884d183 149 delete this.messageChannel
3f09ed9f
JB
150 }
151 }
152
ff128cc9 153 /** @inheritdoc */
db0e38ee 154 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 155 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 156 throw new Error(
db0e38ee 157 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
158 )
159 }
b558f6b5 160 if (
6703b9f4
JB
161 Array.isArray(this.info.taskFunctionNames) &&
162 this.info.taskFunctionNames.length < 3
b558f6b5 163 ) {
db0e38ee
JB
164 throw new Error(
165 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
166 )
167 }
168 if (name === DEFAULT_TASK_NAME) {
6703b9f4 169 name = this.info.taskFunctionNames[1]
b558f6b5 170 }
db0e38ee
JB
171 if (!this.taskFunctionsUsage.has(name)) {
172 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 173 }
db0e38ee 174 return this.taskFunctionsUsage.get(name)
4b628b48
JB
175 }
176
adee6053
JB
177 /** @inheritdoc */
178 public deleteTaskFunctionWorkerUsage (name: string): boolean {
179 return this.taskFunctionsUsage.delete(name)
180 }
181
65542a35 182 private async startOnIdleWorkerNode (): Promise<void> {
1f0766e7 183 if (
65542a35 184 this.onIdleWorkerNodeCount > 0 &&
79b197bb 185 (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
1f0766e7 186 ) {
65542a35 187 this.onIdleWorkerNodeCount = 0
60b7a7cc
JB
188 return
189 }
65542a35 190 ++this.onIdleWorkerNodeCount
9f95d5eb 191 this.dispatchEvent(
65542a35 192 new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
9f95d5eb
JB
193 detail: { workerId: this.info.id as number }
194 })
195 )
65542a35
JB
196 await sleep(exponentialDelay(this.onIdleWorkerNodeCount))
197 await this.startOnIdleWorkerNode()
198 }
199
200 private isIdle (): boolean {
201 return this.usage.tasks.executing === 0 && this.tasksQueue.size === 0
68cbdc84
JB
202 }
203
75de9f41 204 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 205 return {
75de9f41
JB
206 id: getWorkerId(worker),
207 type: getWorkerType(worker) as WorkerType,
4b628b48 208 dynamic: false,
7884d183 209 ready: false
4b628b48
JB
210 }
211 }
212
213 private initWorkerUsage (): WorkerUsage {
214 const getTasksQueueSize = (): number => {
dd951876 215 return this.tasksQueue.size
4b628b48 216 }
bf4ef2ca 217 const getTasksQueueMaxSize = (): number => {
dd951876 218 return this.tasksQueue.maxSize
4b628b48
JB
219 }
220 return {
221 tasks: {
222 executed: 0,
223 executing: 0,
224 get queued (): number {
225 return getTasksQueueSize()
226 },
227 get maxQueued (): number {
bf4ef2ca 228 return getTasksQueueMaxSize()
4b628b48 229 },
68cbdc84 230 stolen: 0,
4b628b48
JB
231 failed: 0
232 },
233 runTime: {
c52475b8 234 history: new CircularArray<number>()
4b628b48
JB
235 },
236 waitTime: {
c52475b8 237 history: new CircularArray<number>()
4b628b48
JB
238 },
239 elu: {
240 idle: {
c52475b8 241 history: new CircularArray<number>()
4b628b48
JB
242 },
243 active: {
c52475b8 244 history: new CircularArray<number>()
4b628b48
JB
245 }
246 }
247 }
248 }
249
db0e38ee 250 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
251 const getTaskFunctionQueueSize = (): number => {
252 let taskFunctionQueueSize = 0
b25a42cd 253 for (const task of this.tasksQueue) {
dd92a715 254 if (
e5ece61d 255 (task.name === DEFAULT_TASK_NAME &&
6703b9f4 256 name === (this.info.taskFunctionNames as string[])[1]) ||
e5ece61d 257 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 258 ) {
e5ece61d 259 ++taskFunctionQueueSize
b25a42cd
JB
260 }
261 }
e5ece61d 262 return taskFunctionQueueSize
b25a42cd
JB
263 }
264 return {
265 tasks: {
266 executed: 0,
267 executing: 0,
268 get queued (): number {
e5ece61d 269 return getTaskFunctionQueueSize()
b25a42cd 270 },
68cbdc84 271 stolen: 0,
b25a42cd
JB
272 failed: 0
273 },
274 runTime: {
c52475b8 275 history: new CircularArray<number>()
b25a42cd
JB
276 },
277 waitTime: {
c52475b8 278 history: new CircularArray<number>()
b25a42cd
JB
279 },
280 elu: {
281 idle: {
c52475b8 282 history: new CircularArray<number>()
b25a42cd
JB
283 },
284 active: {
c52475b8 285 history: new CircularArray<number>()
b25a42cd
JB
286 }
287 }
288 }
289 }
4b628b48 290}