fix: fix continuous tasks stealing on idle start at worker node idling
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48 2import { CircularArray } from '../circular-array'
5c4d16da 3import type { Task } from '../utility-types'
463226a4 4import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
574b351d 5import { Deque } from '../deque'
4b628b48
JB
6import {
7 type IWorker,
8 type IWorkerNode,
f3a91bac 9 type StrategyData,
4b628b48 10 type WorkerInfo,
9f95d5eb 11 type WorkerNodeEventDetail,
4b628b48
JB
12 type WorkerType,
13 WorkerTypes,
14 type WorkerUsage
15} from './worker'
9a38f99e 16import { checkWorkerNodeArguments } from './utils'
4b628b48 17
60664f48
JB
18/**
19 * Worker node.
20 *
21 * @typeParam Worker - Type of worker.
22 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
23 */
4b628b48 24export class WorkerNode<Worker extends IWorker, Data = unknown>
9f95d5eb
JB
25 extends EventTarget
26 implements IWorkerNode<Worker, Data> {
671d5154 27 /** @inheritdoc */
4b628b48 28 public readonly worker: Worker
671d5154 29 /** @inheritdoc */
4b628b48 30 public readonly info: WorkerInfo
671d5154 31 /** @inheritdoc */
4b628b48 32 public usage: WorkerUsage
20c6f652 33 /** @inheritdoc */
f3a91bac
JB
34 public strategyData?: StrategyData
35 /** @inheritdoc */
26fb3c18
JB
36 public messageChannel?: MessageChannel
37 /** @inheritdoc */
20c6f652 38 public tasksQueueBackPressureSize: number
574b351d 39 private readonly tasksQueue: Deque<Task<Data>>
47352846 40 private onBackPressureStarted: boolean
26fb3c18 41 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 42
60664f48
JB
43 /**
44 * Constructs a new worker node.
45 *
46 * @param worker - The worker.
20c6f652 47 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
60664f48 48 */
75de9f41 49 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
9f95d5eb 50 super()
9a38f99e 51 checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
4b628b48 52 this.worker = worker
75de9f41 53 this.info = this.initWorkerInfo(worker)
26fb3c18 54 this.usage = this.initWorkerUsage()
75de9f41 55 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
56 this.messageChannel = new MessageChannel()
57 }
20c6f652 58 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
26fb3c18 59 this.tasksQueue = new Deque<Task<Data>>()
47352846 60 this.onBackPressureStarted = false
26fb3c18 61 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
62 }
63
64 /** @inheritdoc */
65 public tasksQueueSize (): number {
66 return this.tasksQueue.size
67 }
68
4b628b48
JB
69 /** @inheritdoc */
70 public enqueueTask (task: Task<Data>): number {
72695f86 71 const tasksQueueSize = this.tasksQueue.push(task)
9f95d5eb 72 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 73 this.onBackPressureStarted = true
9f95d5eb 74 this.dispatchEvent(
b5e75be8 75 new CustomEvent<WorkerNodeEventDetail>('backPressure', {
9f95d5eb
JB
76 detail: { workerId: this.info.id as number }
77 })
78 )
47352846 79 this.onBackPressureStarted = false
72695f86
JB
80 }
81 return tasksQueueSize
82 }
83
84 /** @inheritdoc */
85 public unshiftTask (task: Task<Data>): number {
86 const tasksQueueSize = this.tasksQueue.unshift(task)
9f95d5eb 87 if (this.hasBackPressure() && !this.onBackPressureStarted) {
47352846 88 this.onBackPressureStarted = true
9f95d5eb 89 this.dispatchEvent(
b5e75be8 90 new CustomEvent<WorkerNodeEventDetail>('backPressure', {
9f95d5eb
JB
91 detail: { workerId: this.info.id as number }
92 })
93 )
47352846 94 this.onBackPressureStarted = false
72695f86
JB
95 }
96 return tasksQueueSize
4b628b48
JB
97 }
98
99 /** @inheritdoc */
100 public dequeueTask (): Task<Data> | undefined {
463226a4 101 return this.tasksQueue.shift()
4b628b48
JB
102 }
103
72695f86
JB
104 /** @inheritdoc */
105 public popTask (): Task<Data> | undefined {
463226a4 106 return this.tasksQueue.pop()
72695f86
JB
107 }
108
4b628b48
JB
109 /** @inheritdoc */
110 public clearTasksQueue (): void {
111 this.tasksQueue.clear()
112 }
113
671d5154
JB
114 /** @inheritdoc */
115 public hasBackPressure (): boolean {
8735b4e5 116 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
117 }
118
ff128cc9 119 /** @inheritdoc */
4b628b48
JB
120 public resetUsage (): void {
121 this.usage = this.initWorkerUsage()
db0e38ee 122 this.taskFunctionsUsage.clear()
ff128cc9
JB
123 }
124
3f09ed9f
JB
125 /** @inheritdoc */
126 public closeChannel (): void {
7884d183 127 if (this.messageChannel != null) {
3ff2b910
JB
128 this.messageChannel.port1.unref()
129 this.messageChannel.port2.unref()
130 this.messageChannel.port1.close()
131 this.messageChannel.port2.close()
7884d183 132 delete this.messageChannel
3f09ed9f
JB
133 }
134 }
135
ff128cc9 136 /** @inheritdoc */
db0e38ee 137 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 138 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 139 throw new Error(
db0e38ee 140 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
141 )
142 }
b558f6b5 143 if (
6703b9f4
JB
144 Array.isArray(this.info.taskFunctionNames) &&
145 this.info.taskFunctionNames.length < 3
b558f6b5 146 ) {
db0e38ee
JB
147 throw new Error(
148 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
149 )
150 }
151 if (name === DEFAULT_TASK_NAME) {
6703b9f4 152 name = this.info.taskFunctionNames[1]
b558f6b5 153 }
db0e38ee
JB
154 if (!this.taskFunctionsUsage.has(name)) {
155 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 156 }
db0e38ee 157 return this.taskFunctionsUsage.get(name)
4b628b48
JB
158 }
159
adee6053
JB
160 /** @inheritdoc */
161 public deleteTaskFunctionWorkerUsage (name: string): boolean {
162 return this.taskFunctionsUsage.delete(name)
163 }
164
75de9f41 165 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 166 return {
75de9f41
JB
167 id: getWorkerId(worker),
168 type: getWorkerType(worker) as WorkerType,
4b628b48 169 dynamic: false,
7884d183 170 ready: false
4b628b48
JB
171 }
172 }
173
174 private initWorkerUsage (): WorkerUsage {
175 const getTasksQueueSize = (): number => {
dd951876 176 return this.tasksQueue.size
4b628b48 177 }
bf4ef2ca 178 const getTasksQueueMaxSize = (): number => {
dd951876 179 return this.tasksQueue.maxSize
4b628b48
JB
180 }
181 return {
182 tasks: {
183 executed: 0,
184 executing: 0,
185 get queued (): number {
186 return getTasksQueueSize()
187 },
188 get maxQueued (): number {
bf4ef2ca 189 return getTasksQueueMaxSize()
4b628b48 190 },
463226a4 191 sequentiallyStolen: 0,
68cbdc84 192 stolen: 0,
4b628b48
JB
193 failed: 0
194 },
195 runTime: {
c52475b8 196 history: new CircularArray<number>()
4b628b48
JB
197 },
198 waitTime: {
c52475b8 199 history: new CircularArray<number>()
4b628b48
JB
200 },
201 elu: {
202 idle: {
c52475b8 203 history: new CircularArray<number>()
4b628b48
JB
204 },
205 active: {
c52475b8 206 history: new CircularArray<number>()
4b628b48
JB
207 }
208 }
209 }
210 }
211
db0e38ee 212 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
213 const getTaskFunctionQueueSize = (): number => {
214 let taskFunctionQueueSize = 0
b25a42cd 215 for (const task of this.tasksQueue) {
dd92a715 216 if (
e5ece61d 217 (task.name === DEFAULT_TASK_NAME &&
6703b9f4 218 name === (this.info.taskFunctionNames as string[])[1]) ||
e5ece61d 219 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 220 ) {
e5ece61d 221 ++taskFunctionQueueSize
b25a42cd
JB
222 }
223 }
e5ece61d 224 return taskFunctionQueueSize
b25a42cd
JB
225 }
226 return {
227 tasks: {
228 executed: 0,
229 executing: 0,
230 get queued (): number {
e5ece61d 231 return getTaskFunctionQueueSize()
b25a42cd 232 },
463226a4 233 sequentiallyStolen: 0,
68cbdc84 234 stolen: 0,
b25a42cd
JB
235 failed: 0
236 },
237 runTime: {
c52475b8 238 history: new CircularArray<number>()
b25a42cd
JB
239 },
240 waitTime: {
c52475b8 241 history: new CircularArray<number>()
b25a42cd
JB
242 },
243 elu: {
244 idle: {
c52475b8 245 history: new CircularArray<number>()
b25a42cd
JB
246 },
247 active: {
c52475b8 248 history: new CircularArray<number>()
b25a42cd
JB
249 }
250 }
251 }
252 }
4b628b48 253}