a5482dcd29f6778b69c70bf322c2370be13918b3
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import { Queue } from '../queue'
4 import type { Task } from '../utility-types'
5 import { DEFAULT_TASK_NAME } from '../utils'
6 import {
7 type IWorker,
8 type IWorkerNode,
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13 } from './worker'
14
15 /**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
21 export class WorkerNode<Worker extends IWorker, Data = unknown>
22 implements IWorkerNode<Worker, Data> {
23 public readonly worker: Worker
24 public readonly info: WorkerInfo
25 public messageChannel?: MessageChannel
26 public usage: WorkerUsage
27 private readonly tasksUsage: Map<string, WorkerUsage>
28 private readonly tasksQueue: Queue<Task<Data>>
29
30 /**
31 * Constructs a new worker node.
32 *
33 * @param worker - The worker.
34 * @param workerType - The worker type.
35 */
36 constructor (worker: Worker, workerType: WorkerType) {
37 this.worker = worker
38 this.info = this.initWorkerInfo(worker, workerType)
39 if (workerType === WorkerTypes.thread) {
40 this.messageChannel = new MessageChannel()
41 }
42 this.usage = this.initWorkerUsage()
43 this.tasksUsage = new Map<string, WorkerUsage>()
44 this.tasksQueue = new Queue<Task<Data>>()
45 }
46
47 /** @inheritdoc */
48 public tasksQueueSize (): number {
49 return this.tasksQueue.size
50 }
51
52 /**
53 * Tasks queue maximum size.
54 *
55 * @returns The tasks queue maximum size.
56 */
57 private tasksQueueMaxSize (): number {
58 return this.tasksQueue.maxSize
59 }
60
61 /** @inheritdoc */
62 public enqueueTask (task: Task<Data>): number {
63 return this.tasksQueue.enqueue(task)
64 }
65
66 /** @inheritdoc */
67 public dequeueTask (): Task<Data> | undefined {
68 return this.tasksQueue.dequeue()
69 }
70
71 /** @inheritdoc */
72 public clearTasksQueue (): void {
73 this.tasksQueue.clear()
74 }
75
76 /** @inheritdoc */
77 public resetUsage (): void {
78 this.usage = this.initWorkerUsage()
79 this.tasksUsage.clear()
80 }
81
82 /** @inheritdoc */
83 public closeChannel (): void {
84 if (this.messageChannel != null) {
85 this.messageChannel?.port1.unref()
86 this.messageChannel?.port2.unref()
87 this.messageChannel?.port1.close()
88 this.messageChannel?.port2.close()
89 delete this.messageChannel
90 }
91 }
92
93 /** @inheritdoc */
94 public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
95 if (!Array.isArray(this.info.taskFunctions)) {
96 throw new Error(
97 `Cannot get task worker usage for task function name '${name}' when task function names list is not yet defined`
98 )
99 }
100 if (
101 name === DEFAULT_TASK_NAME &&
102 Array.isArray(this.info.taskFunctions) &&
103 this.info.taskFunctions.length > 1
104 ) {
105 name = this.info.taskFunctions[1]
106 }
107 if (!this.tasksUsage.has(name)) {
108 this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
109 }
110 return this.tasksUsage.get(name)
111 }
112
113 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
114 return {
115 id: this.getWorkerId(worker, workerType),
116 type: workerType,
117 dynamic: false,
118 ready: false
119 }
120 }
121
122 private initWorkerUsage (): WorkerUsage {
123 const getTasksQueueSize = (): number => {
124 return this.tasksQueueSize()
125 }
126 const getTasksQueueMaxSize = (): number => {
127 return this.tasksQueueMaxSize()
128 }
129 return {
130 tasks: {
131 executed: 0,
132 executing: 0,
133 get queued (): number {
134 return getTasksQueueSize()
135 },
136 get maxQueued (): number {
137 return getTasksQueueMaxSize()
138 },
139 failed: 0
140 },
141 runTime: {
142 history: new CircularArray()
143 },
144 waitTime: {
145 history: new CircularArray()
146 },
147 elu: {
148 idle: {
149 history: new CircularArray()
150 },
151 active: {
152 history: new CircularArray()
153 }
154 }
155 }
156 }
157
158 private initTaskWorkerUsage (name: string): WorkerUsage {
159 const getTaskQueueSize = (): number => {
160 let taskQueueSize = 0
161 for (const task of this.tasksQueue) {
162 if (task.name === name) {
163 ++taskQueueSize
164 }
165 }
166 return taskQueueSize
167 }
168 return {
169 tasks: {
170 executed: 0,
171 executing: 0,
172 get queued (): number {
173 return getTaskQueueSize()
174 },
175 failed: 0
176 },
177 runTime: {
178 history: new CircularArray()
179 },
180 waitTime: {
181 history: new CircularArray()
182 },
183 elu: {
184 idle: {
185 history: new CircularArray()
186 },
187 active: {
188 history: new CircularArray()
189 }
190 }
191 }
192 }
193
194 /**
195 * Gets the worker id.
196 *
197 * @param worker - The worker.
198 * @param workerType - The worker type.
199 * @returns The worker id.
200 */
201 private getWorkerId (
202 worker: Worker,
203 workerType: WorkerType
204 ): number | undefined {
205 if (workerType === WorkerTypes.thread) {
206 return worker.threadId
207 } else if (workerType === WorkerTypes.cluster) {
208 return worker.id
209 }
210 }
211 }