fix: fix continuous tasks stealing on idle start at worker node idling
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import type { Task } from '../utility-types'
4 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
5 import { Deque } from '../deque'
6 import {
7 type IWorker,
8 type IWorkerNode,
9 type StrategyData,
10 type WorkerInfo,
11 type WorkerNodeEventDetail,
12 type WorkerType,
13 WorkerTypes,
14 type WorkerUsage
15 } from './worker'
16 import { checkWorkerNodeArguments } from './utils'
17
18 /**
19 * Worker node.
20 *
21 * @typeParam Worker - Type of worker.
22 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
23 */
24 export class WorkerNode<Worker extends IWorker, Data = unknown>
25 extends EventTarget
26 implements IWorkerNode<Worker, Data> {
27 /** @inheritdoc */
28 public readonly worker: Worker
29 /** @inheritdoc */
30 public readonly info: WorkerInfo
31 /** @inheritdoc */
32 public usage: WorkerUsage
33 /** @inheritdoc */
34 public strategyData?: StrategyData
35 /** @inheritdoc */
36 public messageChannel?: MessageChannel
37 /** @inheritdoc */
38 public tasksQueueBackPressureSize: number
39 private readonly tasksQueue: Deque<Task<Data>>
40 private onBackPressureStarted: boolean
41 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
42
43 /**
44 * Constructs a new worker node.
45 *
46 * @param worker - The worker.
47 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
48 */
49 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
50 super()
51 checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
52 this.worker = worker
53 this.info = this.initWorkerInfo(worker)
54 this.usage = this.initWorkerUsage()
55 if (this.info.type === WorkerTypes.thread) {
56 this.messageChannel = new MessageChannel()
57 }
58 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
59 this.tasksQueue = new Deque<Task<Data>>()
60 this.onBackPressureStarted = false
61 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
62 }
63
64 /** @inheritdoc */
65 public tasksQueueSize (): number {
66 return this.tasksQueue.size
67 }
68
69 /** @inheritdoc */
70 public enqueueTask (task: Task<Data>): number {
71 const tasksQueueSize = this.tasksQueue.push(task)
72 if (this.hasBackPressure() && !this.onBackPressureStarted) {
73 this.onBackPressureStarted = true
74 this.dispatchEvent(
75 new CustomEvent<WorkerNodeEventDetail>('backPressure', {
76 detail: { workerId: this.info.id as number }
77 })
78 )
79 this.onBackPressureStarted = false
80 }
81 return tasksQueueSize
82 }
83
84 /** @inheritdoc */
85 public unshiftTask (task: Task<Data>): number {
86 const tasksQueueSize = this.tasksQueue.unshift(task)
87 if (this.hasBackPressure() && !this.onBackPressureStarted) {
88 this.onBackPressureStarted = true
89 this.dispatchEvent(
90 new CustomEvent<WorkerNodeEventDetail>('backPressure', {
91 detail: { workerId: this.info.id as number }
92 })
93 )
94 this.onBackPressureStarted = false
95 }
96 return tasksQueueSize
97 }
98
99 /** @inheritdoc */
100 public dequeueTask (): Task<Data> | undefined {
101 return this.tasksQueue.shift()
102 }
103
104 /** @inheritdoc */
105 public popTask (): Task<Data> | undefined {
106 return this.tasksQueue.pop()
107 }
108
109 /** @inheritdoc */
110 public clearTasksQueue (): void {
111 this.tasksQueue.clear()
112 }
113
114 /** @inheritdoc */
115 public hasBackPressure (): boolean {
116 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
117 }
118
119 /** @inheritdoc */
120 public resetUsage (): void {
121 this.usage = this.initWorkerUsage()
122 this.taskFunctionsUsage.clear()
123 }
124
125 /** @inheritdoc */
126 public closeChannel (): void {
127 if (this.messageChannel != null) {
128 this.messageChannel.port1.unref()
129 this.messageChannel.port2.unref()
130 this.messageChannel.port1.close()
131 this.messageChannel.port2.close()
132 delete this.messageChannel
133 }
134 }
135
136 /** @inheritdoc */
137 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
138 if (!Array.isArray(this.info.taskFunctionNames)) {
139 throw new Error(
140 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
141 )
142 }
143 if (
144 Array.isArray(this.info.taskFunctionNames) &&
145 this.info.taskFunctionNames.length < 3
146 ) {
147 throw new Error(
148 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
149 )
150 }
151 if (name === DEFAULT_TASK_NAME) {
152 name = this.info.taskFunctionNames[1]
153 }
154 if (!this.taskFunctionsUsage.has(name)) {
155 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
156 }
157 return this.taskFunctionsUsage.get(name)
158 }
159
160 /** @inheritdoc */
161 public deleteTaskFunctionWorkerUsage (name: string): boolean {
162 return this.taskFunctionsUsage.delete(name)
163 }
164
165 private initWorkerInfo (worker: Worker): WorkerInfo {
166 return {
167 id: getWorkerId(worker),
168 type: getWorkerType(worker) as WorkerType,
169 dynamic: false,
170 ready: false
171 }
172 }
173
174 private initWorkerUsage (): WorkerUsage {
175 const getTasksQueueSize = (): number => {
176 return this.tasksQueue.size
177 }
178 const getTasksQueueMaxSize = (): number => {
179 return this.tasksQueue.maxSize
180 }
181 return {
182 tasks: {
183 executed: 0,
184 executing: 0,
185 get queued (): number {
186 return getTasksQueueSize()
187 },
188 get maxQueued (): number {
189 return getTasksQueueMaxSize()
190 },
191 sequentiallyStolen: 0,
192 stolen: 0,
193 failed: 0
194 },
195 runTime: {
196 history: new CircularArray<number>()
197 },
198 waitTime: {
199 history: new CircularArray<number>()
200 },
201 elu: {
202 idle: {
203 history: new CircularArray<number>()
204 },
205 active: {
206 history: new CircularArray<number>()
207 }
208 }
209 }
210 }
211
212 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
213 const getTaskFunctionQueueSize = (): number => {
214 let taskFunctionQueueSize = 0
215 for (const task of this.tasksQueue) {
216 if (
217 (task.name === DEFAULT_TASK_NAME &&
218 name === (this.info.taskFunctionNames as string[])[1]) ||
219 (task.name !== DEFAULT_TASK_NAME && name === task.name)
220 ) {
221 ++taskFunctionQueueSize
222 }
223 }
224 return taskFunctionQueueSize
225 }
226 return {
227 tasks: {
228 executed: 0,
229 executing: 0,
230 get queued (): number {
231 return getTaskFunctionQueueSize()
232 },
233 sequentiallyStolen: 0,
234 stolen: 0,
235 failed: 0
236 },
237 runTime: {
238 history: new CircularArray<number>()
239 },
240 waitTime: {
241 history: new CircularArray<number>()
242 },
243 elu: {
244 idle: {
245 history: new CircularArray<number>()
246 },
247 active: {
248 history: new CircularArray<number>()
249 }
250 }
251 }
252 }
253 }