fix: fix task function names property in worker-node
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import { Queue } from '../queue'
4 import type { Task } from '../utility-types'
5 import { DEFAULT_TASK_NAME } from '../utils'
6 import {
7 type IWorker,
8 type IWorkerNode,
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13 } from './worker'
14
15 /**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
21 export class WorkerNode<Worker extends IWorker, Data = unknown>
22 implements IWorkerNode<Worker, Data> {
23 public readonly worker: Worker
24 public readonly info: WorkerInfo
25 public usage: WorkerUsage
26 private readonly tasksUsage: Map<string, WorkerUsage>
27 private readonly tasksQueue: Queue<Task<Data>>
28
29 /**
30 * Constructs a new worker node.
31 *
32 * @param worker - The worker.
33 * @param workerType - The worker type.
34 */
35 constructor (worker: Worker, workerType: WorkerType) {
36 this.worker = worker
37 this.info = this.initWorkerInfo(worker, workerType)
38 this.usage = this.initWorkerUsage()
39 this.tasksUsage = new Map<string, WorkerUsage>()
40 this.tasksQueue = new Queue<Task<Data>>()
41 }
42
43 /** @inheritdoc */
44 public tasksQueueSize (): number {
45 return this.tasksQueue.size
46 }
47
48 /**
49 * Tasks queue maximum size.
50 *
51 * @returns The tasks queue maximum size.
52 */
53 private tasksQueueMaxSize (): number {
54 return this.tasksQueue.maxSize
55 }
56
57 /** @inheritdoc */
58 public enqueueTask (task: Task<Data>): number {
59 return this.tasksQueue.enqueue(task)
60 }
61
62 /** @inheritdoc */
63 public dequeueTask (): Task<Data> | undefined {
64 return this.tasksQueue.dequeue()
65 }
66
67 /** @inheritdoc */
68 public clearTasksQueue (): void {
69 this.tasksQueue.clear()
70 }
71
72 /** @inheritdoc */
73 public resetUsage (): void {
74 this.usage = this.initWorkerUsage()
75 this.tasksUsage.clear()
76 }
77
78 /** @inheritdoc */
79 public closeChannel (): void {
80 if (this.info.messageChannel != null) {
81 this.info.messageChannel?.port1.unref()
82 this.info.messageChannel?.port2.unref()
83 this.info.messageChannel?.port1.close()
84 this.info.messageChannel?.port2.close()
85 delete this.info.messageChannel
86 }
87 }
88
89 /** @inheritdoc */
90 public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
91 if (name === DEFAULT_TASK_NAME && !Array.isArray(this.info.taskFunctions)) {
92 throw new Error(
93 'Cannot get task worker usage for default task function name when task function names list is not yet defined'
94 )
95 }
96 if (
97 name === DEFAULT_TASK_NAME &&
98 Array.isArray(this.info.taskFunctions) &&
99 this.info.taskFunctions.length > 1
100 ) {
101 name = this.info.taskFunctions[1]
102 }
103 if (!this.tasksUsage.has(name)) {
104 this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
105 }
106 return this.tasksUsage.get(name)
107 }
108
109 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
110 return {
111 id: this.getWorkerId(worker, workerType),
112 type: workerType,
113 dynamic: false,
114 ready: false,
115 ...(workerType === WorkerTypes.thread && {
116 messageChannel: new MessageChannel()
117 })
118 }
119 }
120
121 private initWorkerUsage (): WorkerUsage {
122 const getTasksQueueSize = (): number => {
123 return this.tasksQueueSize()
124 }
125 const getTasksQueueMaxSize = (): number => {
126 return this.tasksQueueMaxSize()
127 }
128 return {
129 tasks: {
130 executed: 0,
131 executing: 0,
132 get queued (): number {
133 return getTasksQueueSize()
134 },
135 get maxQueued (): number {
136 return getTasksQueueMaxSize()
137 },
138 failed: 0
139 },
140 runTime: {
141 history: new CircularArray()
142 },
143 waitTime: {
144 history: new CircularArray()
145 },
146 elu: {
147 idle: {
148 history: new CircularArray()
149 },
150 active: {
151 history: new CircularArray()
152 }
153 }
154 }
155 }
156
157 private initTaskWorkerUsage (name: string): WorkerUsage {
158 const getTaskQueueSize = (): number => {
159 let taskQueueSize = 0
160 for (const task of this.tasksQueue) {
161 if (task.name === name) {
162 ++taskQueueSize
163 }
164 }
165 return taskQueueSize
166 }
167 return {
168 tasks: {
169 executed: 0,
170 executing: 0,
171 get queued (): number {
172 return getTaskQueueSize()
173 },
174 failed: 0
175 },
176 runTime: {
177 history: new CircularArray()
178 },
179 waitTime: {
180 history: new CircularArray()
181 },
182 elu: {
183 idle: {
184 history: new CircularArray()
185 },
186 active: {
187 history: new CircularArray()
188 }
189 }
190 }
191 }
192
193 /**
194 * Gets the worker id.
195 *
196 * @param worker - The worker.
197 * @param workerType - The worker type.
198 * @returns The worker id.
199 */
200 private getWorkerId (
201 worker: Worker,
202 workerType: WorkerType
203 ): number | undefined {
204 if (workerType === WorkerTypes.thread) {
205 return worker.threadId
206 } else if (workerType === WorkerTypes.cluster) {
207 return worker.id
208 }
209 }
210 }