refactor: cleanup worker id handling code
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import { Queue } from '../queue'
4 import type { Task } from '../utility-types'
5 import {
6 type IWorker,
7 type IWorkerNode,
8 type WorkerInfo,
9 type WorkerType,
10 WorkerTypes,
11 type WorkerUsage
12 } from './worker'
13
14 /**
15 * Worker node.
16 *
17 * @typeParam Worker - Type of worker.
18 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
19 */
20 export class WorkerNode<Worker extends IWorker, Data = unknown>
21 implements IWorkerNode<Worker, Data> {
22 public readonly worker: Worker
23 public readonly info: WorkerInfo
24 public usage: WorkerUsage
25 private readonly tasksUsage: Map<string, WorkerUsage>
26 private readonly tasksQueue: Queue<Task<Data>>
27
28 /**
29 * Constructs a new worker node.
30 *
31 * @param worker - The worker.
32 * @param workerType - The worker type.
33 */
34 constructor (worker: Worker, workerType: WorkerType) {
35 this.worker = worker
36 this.info = this.initWorkerInfo(worker, workerType)
37 this.usage = this.initWorkerUsage()
38 this.tasksUsage = new Map<string, WorkerUsage>()
39 this.tasksQueue = new Queue<Task<Data>>()
40 }
41
42 /** @inheritdoc */
43 public tasksQueueSize (): number {
44 return this.tasksQueue.size
45 }
46
47 /**
48 * Tasks queue maximum size.
49 *
50 * @returns The tasks queue maximum size.
51 */
52 private tasksQueueMaxSize (): number {
53 return this.tasksQueue.maxSize
54 }
55
56 /** @inheritdoc */
57 public enqueueTask (task: Task<Data>): number {
58 return this.tasksQueue.enqueue(task)
59 }
60
61 /** @inheritdoc */
62 public dequeueTask (): Task<Data> | undefined {
63 return this.tasksQueue.dequeue()
64 }
65
66 /** @inheritdoc */
67 public clearTasksQueue (): void {
68 this.tasksQueue.clear()
69 }
70
71 /** @inheritdoc */
72 public resetUsage (): void {
73 this.usage = this.initWorkerUsage()
74 this.tasksUsage.clear()
75 }
76
77 /** @inheritdoc */
78 public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
79 if (!this.tasksUsage.has(name)) {
80 this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
81 }
82 return this.tasksUsage.get(name)
83 }
84
85 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
86 return {
87 id: this.getWorkerId(worker, workerType),
88 type: workerType,
89 dynamic: false,
90 ready: false,
91 ...(workerType === WorkerTypes.thread && {
92 messageChannel: new MessageChannel()
93 })
94 }
95 }
96
97 private initWorkerUsage (): WorkerUsage {
98 const getTasksQueueSize = (): number => {
99 return this.tasksQueueSize()
100 }
101 const getTasksQueueMaxSize = (): number => {
102 return this.tasksQueueMaxSize()
103 }
104 return {
105 tasks: {
106 executed: 0,
107 executing: 0,
108 get queued (): number {
109 return getTasksQueueSize()
110 },
111 get maxQueued (): number {
112 return getTasksQueueMaxSize()
113 },
114 failed: 0
115 },
116 runTime: {
117 history: new CircularArray()
118 },
119 waitTime: {
120 history: new CircularArray()
121 },
122 elu: {
123 idle: {
124 history: new CircularArray()
125 },
126 active: {
127 history: new CircularArray()
128 }
129 }
130 }
131 }
132
133 private initTaskWorkerUsage (name: string): WorkerUsage {
134 const getTaskQueueSize = (): number => {
135 let taskQueueSize = 0
136 for (const task of this.tasksQueue) {
137 if (task.name === name) {
138 ++taskQueueSize
139 }
140 }
141 return taskQueueSize
142 }
143 return {
144 tasks: {
145 executed: 0,
146 executing: 0,
147 get queued (): number {
148 return getTaskQueueSize()
149 },
150 failed: 0
151 },
152 runTime: {
153 history: new CircularArray()
154 },
155 waitTime: {
156 history: new CircularArray()
157 },
158 elu: {
159 idle: {
160 history: new CircularArray()
161 },
162 active: {
163 history: new CircularArray()
164 }
165 }
166 }
167 }
168
169 /**
170 * Gets the worker id.
171 *
172 * @param worker - The worker.
173 * @param workerType - The worker type.
174 * @returns The worker id.
175 */
176 private getWorkerId (
177 worker: Worker,
178 workerType: WorkerType
179 ): number | undefined {
180 if (workerType === WorkerTypes.thread) {
181 return worker.threadId
182 } else if (workerType === WorkerTypes.cluster) {
183 return worker.id
184 }
185 }
186 }