fix: fix task function names property in worker-node
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48
JB
2import { CircularArray } from '../circular-array'
3import { Queue } from '../queue'
5c4d16da 4import type { Task } from '../utility-types'
b558f6b5 5import { DEFAULT_TASK_NAME } from '../utils'
4b628b48
JB
6import {
7 type IWorker,
8 type IWorkerNode,
4b628b48
JB
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13} from './worker'
14
60664f48
JB
15/**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
4b628b48
JB
21export class WorkerNode<Worker extends IWorker, Data = unknown>
22implements IWorkerNode<Worker, Data> {
23 public readonly worker: Worker
24 public readonly info: WorkerInfo
25 public usage: WorkerUsage
ff128cc9 26 private readonly tasksUsage: Map<string, WorkerUsage>
4b628b48
JB
27 private readonly tasksQueue: Queue<Task<Data>>
28
60664f48
JB
29 /**
30 * Constructs a new worker node.
31 *
32 * @param worker - The worker.
33 * @param workerType - The worker type.
60664f48 34 */
4b628b48
JB
35 constructor (worker: Worker, workerType: WorkerType) {
36 this.worker = worker
37 this.info = this.initWorkerInfo(worker, workerType)
38 this.usage = this.initWorkerUsage()
ff128cc9 39 this.tasksUsage = new Map<string, WorkerUsage>()
4b628b48
JB
40 this.tasksQueue = new Queue<Task<Data>>()
41 }
42
43 /** @inheritdoc */
44 public tasksQueueSize (): number {
45 return this.tasksQueue.size
46 }
47
48 /**
eb8afc8a 49 * Tasks queue maximum size.
4b628b48
JB
50 *
51 * @returns The tasks queue maximum size.
52 */
53 private tasksQueueMaxSize (): number {
54 return this.tasksQueue.maxSize
55 }
56
57 /** @inheritdoc */
58 public enqueueTask (task: Task<Data>): number {
59 return this.tasksQueue.enqueue(task)
60 }
61
62 /** @inheritdoc */
63 public dequeueTask (): Task<Data> | undefined {
64 return this.tasksQueue.dequeue()
65 }
66
67 /** @inheritdoc */
68 public clearTasksQueue (): void {
69 this.tasksQueue.clear()
70 }
71
ff128cc9 72 /** @inheritdoc */
4b628b48
JB
73 public resetUsage (): void {
74 this.usage = this.initWorkerUsage()
ff128cc9
JB
75 this.tasksUsage.clear()
76 }
77
3f09ed9f
JB
78 /** @inheritdoc */
79 public closeChannel (): void {
80 if (this.info.messageChannel != null) {
984dc9c8
JB
81 this.info.messageChannel?.port1.unref()
82 this.info.messageChannel?.port2.unref()
3f09ed9f
JB
83 this.info.messageChannel?.port1.close()
84 this.info.messageChannel?.port2.close()
85 delete this.info.messageChannel
86 }
87 }
88
ff128cc9 89 /** @inheritdoc */
ce1b31be 90 public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
71b2b6d8
JB
91 if (name === DEFAULT_TASK_NAME && !Array.isArray(this.info.taskFunctions)) {
92 throw new Error(
93 'Cannot get task worker usage for default task function name when task function names list is not yet defined'
94 )
95 }
b558f6b5
JB
96 if (
97 name === DEFAULT_TASK_NAME &&
71b2b6d8
JB
98 Array.isArray(this.info.taskFunctions) &&
99 this.info.taskFunctions.length > 1
b558f6b5 100 ) {
71b2b6d8 101 name = this.info.taskFunctions[1]
b558f6b5 102 }
ff128cc9 103 if (!this.tasksUsage.has(name)) {
b25a42cd 104 this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
ff128cc9
JB
105 }
106 return this.tasksUsage.get(name)
4b628b48
JB
107 }
108
109 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
110 return {
111 id: this.getWorkerId(worker, workerType),
112 type: workerType,
113 dynamic: false,
85aeb3f3
JB
114 ready: false,
115 ...(workerType === WorkerTypes.thread && {
116 messageChannel: new MessageChannel()
117 })
4b628b48
JB
118 }
119 }
120
121 private initWorkerUsage (): WorkerUsage {
122 const getTasksQueueSize = (): number => {
123 return this.tasksQueueSize()
124 }
bf4ef2ca 125 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
126 return this.tasksQueueMaxSize()
127 }
128 return {
129 tasks: {
130 executed: 0,
131 executing: 0,
132 get queued (): number {
133 return getTasksQueueSize()
134 },
135 get maxQueued (): number {
bf4ef2ca 136 return getTasksQueueMaxSize()
4b628b48
JB
137 },
138 failed: 0
139 },
140 runTime: {
141 history: new CircularArray()
142 },
143 waitTime: {
144 history: new CircularArray()
145 },
146 elu: {
147 idle: {
148 history: new CircularArray()
149 },
150 active: {
151 history: new CircularArray()
152 }
153 }
154 }
155 }
156
b25a42cd
JB
157 private initTaskWorkerUsage (name: string): WorkerUsage {
158 const getTaskQueueSize = (): number => {
159 let taskQueueSize = 0
160 for (const task of this.tasksQueue) {
161 if (task.name === name) {
162 ++taskQueueSize
163 }
164 }
165 return taskQueueSize
166 }
167 return {
168 tasks: {
169 executed: 0,
170 executing: 0,
171 get queued (): number {
172 return getTaskQueueSize()
173 },
174 failed: 0
175 },
176 runTime: {
177 history: new CircularArray()
178 },
179 waitTime: {
180 history: new CircularArray()
181 },
182 elu: {
183 idle: {
184 history: new CircularArray()
185 },
186 active: {
187 history: new CircularArray()
188 }
189 }
190 }
191 }
192
4b628b48
JB
193 /**
194 * Gets the worker id.
195 *
196 * @param worker - The worker.
60664f48 197 * @param workerType - The worker type.
4b628b48
JB
198 * @returns The worker id.
199 */
200 private getWorkerId (
201 worker: Worker,
202 workerType: WorkerType
203 ): number | undefined {
204 if (workerType === WorkerTypes.thread) {
205 return worker.threadId
206 } else if (workerType === WorkerTypes.cluster) {
207 return worker.id
208 }
209 }
210}