refactor: add sanity check at getting worker info
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48
JB
2import { CircularArray } from '../circular-array'
3import { Queue } from '../queue'
5c4d16da 4import type { Task } from '../utility-types'
4b628b48
JB
5import {
6 type IWorker,
7 type IWorkerNode,
4b628b48
JB
8 type WorkerInfo,
9 type WorkerType,
10 WorkerTypes,
11 type WorkerUsage
12} from './worker'
13
60664f48
JB
14/**
15 * Worker node.
16 *
17 * @typeParam Worker - Type of worker.
18 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
19 */
4b628b48
JB
20export class WorkerNode<Worker extends IWorker, Data = unknown>
21implements IWorkerNode<Worker, Data> {
22 public readonly worker: Worker
23 public readonly info: WorkerInfo
24 public usage: WorkerUsage
ff128cc9 25 private readonly tasksUsage: Map<string, WorkerUsage>
4b628b48
JB
26 private readonly tasksQueue: Queue<Task<Data>>
27
60664f48
JB
28 /**
29 * Constructs a new worker node.
30 *
31 * @param worker - The worker.
32 * @param workerType - The worker type.
60664f48 33 */
4b628b48
JB
34 constructor (worker: Worker, workerType: WorkerType) {
35 this.worker = worker
36 this.info = this.initWorkerInfo(worker, workerType)
37 this.usage = this.initWorkerUsage()
ff128cc9 38 this.tasksUsage = new Map<string, WorkerUsage>()
4b628b48
JB
39 this.tasksQueue = new Queue<Task<Data>>()
40 }
41
42 /** @inheritdoc */
43 public tasksQueueSize (): number {
44 return this.tasksQueue.size
45 }
46
47 /**
eb8afc8a 48 * Tasks queue maximum size.
4b628b48
JB
49 *
50 * @returns The tasks queue maximum size.
51 */
52 private tasksQueueMaxSize (): number {
53 return this.tasksQueue.maxSize
54 }
55
56 /** @inheritdoc */
57 public enqueueTask (task: Task<Data>): number {
58 return this.tasksQueue.enqueue(task)
59 }
60
61 /** @inheritdoc */
62 public dequeueTask (): Task<Data> | undefined {
63 return this.tasksQueue.dequeue()
64 }
65
66 /** @inheritdoc */
67 public clearTasksQueue (): void {
68 this.tasksQueue.clear()
69 }
70
ff128cc9 71 /** @inheritdoc */
4b628b48
JB
72 public resetUsage (): void {
73 this.usage = this.initWorkerUsage()
ff128cc9
JB
74 this.tasksUsage.clear()
75 }
76
77 /** @inheritdoc */
ce1b31be 78 public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
ff128cc9 79 if (!this.tasksUsage.has(name)) {
b25a42cd 80 this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
ff128cc9
JB
81 }
82 return this.tasksUsage.get(name)
4b628b48
JB
83 }
84
85 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
86 return {
87 id: this.getWorkerId(worker, workerType),
88 type: workerType,
89 dynamic: false,
85aeb3f3
JB
90 ready: false,
91 ...(workerType === WorkerTypes.thread && {
92 messageChannel: new MessageChannel()
93 })
4b628b48
JB
94 }
95 }
96
97 private initWorkerUsage (): WorkerUsage {
98 const getTasksQueueSize = (): number => {
99 return this.tasksQueueSize()
100 }
bf4ef2ca 101 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
102 return this.tasksQueueMaxSize()
103 }
104 return {
105 tasks: {
106 executed: 0,
107 executing: 0,
108 get queued (): number {
109 return getTasksQueueSize()
110 },
111 get maxQueued (): number {
bf4ef2ca 112 return getTasksQueueMaxSize()
4b628b48
JB
113 },
114 failed: 0
115 },
116 runTime: {
117 history: new CircularArray()
118 },
119 waitTime: {
120 history: new CircularArray()
121 },
122 elu: {
123 idle: {
124 history: new CircularArray()
125 },
126 active: {
127 history: new CircularArray()
128 }
129 }
130 }
131 }
132
b25a42cd
JB
133 private initTaskWorkerUsage (name: string): WorkerUsage {
134 const getTaskQueueSize = (): number => {
135 let taskQueueSize = 0
136 for (const task of this.tasksQueue) {
137 if (task.name === name) {
138 ++taskQueueSize
139 }
140 }
141 return taskQueueSize
142 }
143 return {
144 tasks: {
145 executed: 0,
146 executing: 0,
147 get queued (): number {
148 return getTaskQueueSize()
149 },
150 failed: 0
151 },
152 runTime: {
153 history: new CircularArray()
154 },
155 waitTime: {
156 history: new CircularArray()
157 },
158 elu: {
159 idle: {
160 history: new CircularArray()
161 },
162 active: {
163 history: new CircularArray()
164 }
165 }
166 }
167 }
168
4b628b48
JB
169 /**
170 * Gets the worker id.
171 *
172 * @param worker - The worker.
60664f48 173 * @param workerType - The worker type.
4b628b48
JB
174 * @returns The worker id.
175 */
176 private getWorkerId (
177 worker: Worker,
178 workerType: WorkerType
179 ): number | undefined {
180 if (workerType === WorkerTypes.thread) {
181 return worker.threadId
182 } else if (workerType === WorkerTypes.cluster) {
183 return worker.id
184 }
185 }
186}