perf: optimize task(s) stealing
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
e1c2dba7 1import { EventEmitter } from 'node:events'
ded253e2
JB
2import { MessageChannel } from 'node:worker_threads'
3
d35e5717 4import { CircularArray } from '../circular-array.js'
e41b0271 5import { PriorityQueue } from '../priority-queue.js'
d35e5717 6import type { Task } from '../utility-types.js'
e9ed6eee 7import { DEFAULT_TASK_NAME } from '../utils.js'
ded253e2
JB
8import {
9 checkWorkerNodeArguments,
10 createWorker,
11 getWorkerId,
12 getWorkerType
13} from './utils.js'
4b628b48 14import {
3bcbd4c5 15 type EventHandler,
4b628b48
JB
16 type IWorker,
17 type IWorkerNode,
f3a91bac 18 type StrategyData,
4b628b48 19 type WorkerInfo,
c3719753 20 type WorkerNodeOptions,
4b628b48
JB
21 type WorkerType,
22 WorkerTypes,
23 type WorkerUsage
d35e5717 24} from './worker.js'
4b628b48 25
60664f48
JB
26/**
27 * Worker node.
28 *
29 * @typeParam Worker - Type of worker.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 */
4b628b48 32export class WorkerNode<Worker extends IWorker, Data = unknown>
e1c2dba7 33 extends EventEmitter
9f95d5eb 34 implements IWorkerNode<Worker, Data> {
671d5154 35 /** @inheritdoc */
4b628b48 36 public readonly worker: Worker
671d5154 37 /** @inheritdoc */
4b628b48 38 public readonly info: WorkerInfo
671d5154 39 /** @inheritdoc */
4b628b48 40 public usage: WorkerUsage
20c6f652 41 /** @inheritdoc */
f3a91bac
JB
42 public strategyData?: StrategyData
43 /** @inheritdoc */
26fb3c18
JB
44 public messageChannel?: MessageChannel
45 /** @inheritdoc */
20c6f652 46 public tasksQueueBackPressureSize: number
95d1a734 47 private readonly tasksQueue: PriorityQueue<Task<Data>>
47352846 48 private onBackPressureStarted: boolean
26fb3c18 49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 50
60664f48
JB
51 /**
52 * Constructs a new worker node.
53 *
c3719753 54 * @param type - The worker type.
9974369e 55 * @param filePath - Path to the worker file.
c3719753 56 * @param opts - The worker node options.
60664f48 57 */
c3719753 58 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
9f95d5eb 59 super()
c3719753
JB
60 checkWorkerNodeArguments(type, filePath, opts)
61 this.worker = createWorker<Worker>(type, filePath, {
62 env: opts.env,
63 workerOptions: opts.workerOptions
64 })
65 this.info = this.initWorkerInfo(this.worker)
26fb3c18 66 this.usage = this.initWorkerUsage()
75de9f41 67 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
68 this.messageChannel = new MessageChannel()
69 }
c63a35a0
JB
70 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
71 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
95d1a734 72 this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
47352846 73 this.onBackPressureStarted = false
26fb3c18 74 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
75 }
76
77 /** @inheritdoc */
78 public tasksQueueSize (): number {
79 return this.tasksQueue.size
80 }
81
4b628b48
JB
82 /** @inheritdoc */
83 public enqueueTask (task: Task<Data>): number {
95d1a734 84 const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
9f95d5eb 85 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 86 this.onBackPressureStarted = true
7f0e1334 87 this.emit('backPressure', { workerId: this.info.id })
47352846 88 this.onBackPressureStarted = false
72695f86
JB
89 }
90 return tasksQueueSize
91 }
92
93 /** @inheritdoc */
95d1a734
JB
94 public dequeueTask (bucket?: number): Task<Data> | undefined {
95 return this.tasksQueue.dequeue(bucket)
72695f86
JB
96 }
97
0d4e88b3
JB
98 /** @inheritdoc */
99 public dequeueLastBucketTask (): Task<Data> | undefined {
100 // Start from the last empty or partially filled bucket
101 return this.tasksQueue.dequeue(this.tasksQueue.buckets + 1)
102 }
103
4b628b48
JB
104 /** @inheritdoc */
105 public clearTasksQueue (): void {
106 this.tasksQueue.clear()
107 }
108
671d5154
JB
109 /** @inheritdoc */
110 public hasBackPressure (): boolean {
8735b4e5 111 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
112 }
113
3f09ed9f 114 /** @inheritdoc */
07e0c9e5
JB
115 public async terminate (): Promise<void> {
116 const waitWorkerExit = new Promise<void>(resolve => {
117 this.registerOnceWorkerEventHandler('exit', () => {
118 resolve()
119 })
120 })
121 this.closeMessageChannel()
122 this.removeAllListeners()
839b98b8
JB
123 switch (this.info.type) {
124 case WorkerTypes.thread:
d20cde84 125 this.worker.unref?.()
839b98b8
JB
126 await this.worker.terminate?.()
127 break
128 case WorkerTypes.cluster:
129 this.registerOnceWorkerEventHandler('disconnect', () => {
130 this.worker.kill?.()
131 })
132 this.worker.disconnect?.()
133 break
3f09ed9f 134 }
07e0c9e5 135 await waitWorkerExit
3f09ed9f
JB
136 }
137
c3719753
JB
138 /** @inheritdoc */
139 public registerWorkerEventHandler (
140 event: string,
3bcbd4c5 141 handler: EventHandler<Worker>
c3719753 142 ): void {
88af9bf1 143 this.worker.on(event, handler)
c3719753
JB
144 }
145
146 /** @inheritdoc */
147 public registerOnceWorkerEventHandler (
148 event: string,
3bcbd4c5 149 handler: EventHandler<Worker>
c3719753 150 ): void {
88af9bf1 151 this.worker.once(event, handler)
c3719753
JB
152 }
153
ff128cc9 154 /** @inheritdoc */
db0e38ee 155 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
31847469 156 if (!Array.isArray(this.info.taskFunctionsProperties)) {
71b2b6d8 157 throw new Error(
31847469 158 `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
71b2b6d8
JB
159 )
160 }
b558f6b5 161 if (
31847469
JB
162 Array.isArray(this.info.taskFunctionsProperties) &&
163 this.info.taskFunctionsProperties.length < 3
b558f6b5 164 ) {
db0e38ee 165 throw new Error(
31847469 166 `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
db0e38ee
JB
167 )
168 }
169 if (name === DEFAULT_TASK_NAME) {
31847469 170 name = this.info.taskFunctionsProperties[1].name
b558f6b5 171 }
db0e38ee
JB
172 if (!this.taskFunctionsUsage.has(name)) {
173 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 174 }
db0e38ee 175 return this.taskFunctionsUsage.get(name)
4b628b48
JB
176 }
177
adee6053
JB
178 /** @inheritdoc */
179 public deleteTaskFunctionWorkerUsage (name: string): boolean {
180 return this.taskFunctionsUsage.delete(name)
181 }
182
07e0c9e5
JB
183 private closeMessageChannel (): void {
184 if (this.messageChannel != null) {
185 this.messageChannel.port1.unref()
186 this.messageChannel.port2.unref()
187 this.messageChannel.port1.close()
188 this.messageChannel.port2.close()
189 delete this.messageChannel
190 }
191 }
192
75de9f41 193 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 194 return {
75de9f41 195 id: getWorkerId(worker),
67f3f2d6
JB
196 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
197 type: getWorkerType(worker)!,
4b628b48 198 dynamic: false,
5eb72b9e
JB
199 ready: false,
200 stealing: false
4b628b48
JB
201 }
202 }
203
204 private initWorkerUsage (): WorkerUsage {
205 const getTasksQueueSize = (): number => {
dd951876 206 return this.tasksQueue.size
4b628b48 207 }
bf4ef2ca 208 const getTasksQueueMaxSize = (): number => {
dd951876 209 return this.tasksQueue.maxSize
4b628b48
JB
210 }
211 return {
212 tasks: {
213 executed: 0,
214 executing: 0,
215 get queued (): number {
216 return getTasksQueueSize()
217 },
218 get maxQueued (): number {
bf4ef2ca 219 return getTasksQueueMaxSize()
4b628b48 220 },
463226a4 221 sequentiallyStolen: 0,
68cbdc84 222 stolen: 0,
4b628b48
JB
223 failed: 0
224 },
225 runTime: {
c52475b8 226 history: new CircularArray<number>()
4b628b48
JB
227 },
228 waitTime: {
c52475b8 229 history: new CircularArray<number>()
4b628b48
JB
230 },
231 elu: {
232 idle: {
c52475b8 233 history: new CircularArray<number>()
4b628b48
JB
234 },
235 active: {
c52475b8 236 history: new CircularArray<number>()
4b628b48
JB
237 }
238 }
239 }
240 }
241
db0e38ee 242 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
243 const getTaskFunctionQueueSize = (): number => {
244 let taskFunctionQueueSize = 0
b25a42cd 245 for (const task of this.tasksQueue) {
dd92a715 246 if (
e5ece61d 247 (task.name === DEFAULT_TASK_NAME &&
67f3f2d6 248 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
31847469 249 name === this.info.taskFunctionsProperties![1].name) ||
e5ece61d 250 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 251 ) {
e5ece61d 252 ++taskFunctionQueueSize
b25a42cd
JB
253 }
254 }
e5ece61d 255 return taskFunctionQueueSize
b25a42cd
JB
256 }
257 return {
258 tasks: {
259 executed: 0,
260 executing: 0,
261 get queued (): number {
e5ece61d 262 return getTaskFunctionQueueSize()
b25a42cd 263 },
463226a4 264 sequentiallyStolen: 0,
68cbdc84 265 stolen: 0,
b25a42cd
JB
266 failed: 0
267 },
268 runTime: {
c52475b8 269 history: new CircularArray<number>()
b25a42cd
JB
270 },
271 waitTime: {
c52475b8 272 history: new CircularArray<number>()
b25a42cd
JB
273 },
274 elu: {
275 idle: {
c52475b8 276 history: new CircularArray<number>()
b25a42cd
JB
277 },
278 active: {
c52475b8 279 history: new CircularArray<number>()
b25a42cd
JB
280 }
281 }
282 }
283 }
4b628b48 284}