refactor: move message channel property from worker info to node
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48
JB
2import { CircularArray } from '../circular-array'
3import { Queue } from '../queue'
5c4d16da 4import type { Task } from '../utility-types'
b558f6b5 5import { DEFAULT_TASK_NAME } from '../utils'
4b628b48
JB
6import {
7 type IWorker,
8 type IWorkerNode,
4b628b48
JB
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13} from './worker'
14
60664f48
JB
15/**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
4b628b48
JB
21export class WorkerNode<Worker extends IWorker, Data = unknown>
22implements IWorkerNode<Worker, Data> {
23 public readonly worker: Worker
24 public readonly info: WorkerInfo
7884d183 25 public messageChannel?: MessageChannel
4b628b48 26 public usage: WorkerUsage
ff128cc9 27 private readonly tasksUsage: Map<string, WorkerUsage>
4b628b48
JB
28 private readonly tasksQueue: Queue<Task<Data>>
29
60664f48
JB
30 /**
31 * Constructs a new worker node.
32 *
33 * @param worker - The worker.
34 * @param workerType - The worker type.
60664f48 35 */
4b628b48
JB
36 constructor (worker: Worker, workerType: WorkerType) {
37 this.worker = worker
38 this.info = this.initWorkerInfo(worker, workerType)
7884d183
JB
39 if (workerType === WorkerTypes.thread) {
40 this.messageChannel = new MessageChannel()
41 }
4b628b48 42 this.usage = this.initWorkerUsage()
ff128cc9 43 this.tasksUsage = new Map<string, WorkerUsage>()
4b628b48
JB
44 this.tasksQueue = new Queue<Task<Data>>()
45 }
46
47 /** @inheritdoc */
48 public tasksQueueSize (): number {
49 return this.tasksQueue.size
50 }
51
52 /**
eb8afc8a 53 * Tasks queue maximum size.
4b628b48
JB
54 *
55 * @returns The tasks queue maximum size.
56 */
57 private tasksQueueMaxSize (): number {
58 return this.tasksQueue.maxSize
59 }
60
61 /** @inheritdoc */
62 public enqueueTask (task: Task<Data>): number {
63 return this.tasksQueue.enqueue(task)
64 }
65
66 /** @inheritdoc */
67 public dequeueTask (): Task<Data> | undefined {
68 return this.tasksQueue.dequeue()
69 }
70
71 /** @inheritdoc */
72 public clearTasksQueue (): void {
73 this.tasksQueue.clear()
74 }
75
ff128cc9 76 /** @inheritdoc */
4b628b48
JB
77 public resetUsage (): void {
78 this.usage = this.initWorkerUsage()
ff128cc9
JB
79 this.tasksUsage.clear()
80 }
81
3f09ed9f
JB
82 /** @inheritdoc */
83 public closeChannel (): void {
7884d183
JB
84 if (this.messageChannel != null) {
85 this.messageChannel?.port1.unref()
86 this.messageChannel?.port2.unref()
87 this.messageChannel?.port1.close()
88 this.messageChannel?.port2.close()
89 delete this.messageChannel
3f09ed9f
JB
90 }
91 }
92
ff128cc9 93 /** @inheritdoc */
ce1b31be 94 public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
a5d15204 95 if (!Array.isArray(this.info.taskFunctions)) {
71b2b6d8 96 throw new Error(
a5d15204 97 `Cannot get task worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
98 )
99 }
b558f6b5
JB
100 if (
101 name === DEFAULT_TASK_NAME &&
71b2b6d8
JB
102 Array.isArray(this.info.taskFunctions) &&
103 this.info.taskFunctions.length > 1
b558f6b5 104 ) {
71b2b6d8 105 name = this.info.taskFunctions[1]
b558f6b5 106 }
ff128cc9 107 if (!this.tasksUsage.has(name)) {
b25a42cd 108 this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
ff128cc9
JB
109 }
110 return this.tasksUsage.get(name)
4b628b48
JB
111 }
112
113 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
114 return {
115 id: this.getWorkerId(worker, workerType),
116 type: workerType,
117 dynamic: false,
7884d183 118 ready: false
4b628b48
JB
119 }
120 }
121
122 private initWorkerUsage (): WorkerUsage {
123 const getTasksQueueSize = (): number => {
124 return this.tasksQueueSize()
125 }
bf4ef2ca 126 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
127 return this.tasksQueueMaxSize()
128 }
129 return {
130 tasks: {
131 executed: 0,
132 executing: 0,
133 get queued (): number {
134 return getTasksQueueSize()
135 },
136 get maxQueued (): number {
bf4ef2ca 137 return getTasksQueueMaxSize()
4b628b48
JB
138 },
139 failed: 0
140 },
141 runTime: {
142 history: new CircularArray()
143 },
144 waitTime: {
145 history: new CircularArray()
146 },
147 elu: {
148 idle: {
149 history: new CircularArray()
150 },
151 active: {
152 history: new CircularArray()
153 }
154 }
155 }
156 }
157
b25a42cd
JB
158 private initTaskWorkerUsage (name: string): WorkerUsage {
159 const getTaskQueueSize = (): number => {
160 let taskQueueSize = 0
161 for (const task of this.tasksQueue) {
162 if (task.name === name) {
163 ++taskQueueSize
164 }
165 }
166 return taskQueueSize
167 }
168 return {
169 tasks: {
170 executed: 0,
171 executing: 0,
172 get queued (): number {
173 return getTaskQueueSize()
174 },
175 failed: 0
176 },
177 runTime: {
178 history: new CircularArray()
179 },
180 waitTime: {
181 history: new CircularArray()
182 },
183 elu: {
184 idle: {
185 history: new CircularArray()
186 },
187 active: {
188 history: new CircularArray()
189 }
190 }
191 }
192 }
193
4b628b48
JB
194 /**
195 * Gets the worker id.
196 *
197 * @param worker - The worker.
60664f48 198 * @param workerType - The worker type.
4b628b48
JB
199 * @returns The worker id.
200 */
201 private getWorkerId (
202 worker: Worker,
203 workerType: WorkerType
204 ): number | undefined {
205 if (workerType === WorkerTypes.thread) {
206 return worker.threadId
207 } else if (workerType === WorkerTypes.cluster) {
208 return worker.id
209 }
210 }
211}