fcc091378565a4b78c7f9085619c9017169474d3
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import { Queue } from '../queue'
4 import type { Task } from '../utility-types'
5 import {
6 type IWorker,
7 type IWorkerNode,
8 type WorkerInfo,
9 type WorkerType,
10 WorkerTypes,
11 type WorkerUsage
12 } from './worker'
13
14 /**
15 * Worker node.
16 *
17 * @typeParam Worker - Type of worker.
18 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
19 */
20 export class WorkerNode<Worker extends IWorker, Data = unknown>
21 implements IWorkerNode<Worker, Data> {
22 public readonly worker: Worker
23 public readonly info: WorkerInfo
24 public usage: WorkerUsage
25 private readonly tasksUsage: Map<string, WorkerUsage>
26 private readonly tasksQueue: Queue<Task<Data>>
27
28 /**
29 * Constructs a new worker node.
30 *
31 * @param worker - The worker.
32 * @param workerType - The worker type.
33 */
34 constructor (worker: Worker, workerType: WorkerType) {
35 this.worker = worker
36 this.info = this.initWorkerInfo(worker, workerType)
37 this.usage = this.initWorkerUsage()
38 this.tasksUsage = new Map<string, WorkerUsage>()
39 this.tasksQueue = new Queue<Task<Data>>()
40 }
41
42 /** @inheritdoc */
43 public tasksQueueSize (): number {
44 return this.tasksQueue.size
45 }
46
47 /**
48 * Tasks queue maximum size.
49 *
50 * @returns The tasks queue maximum size.
51 */
52 private tasksQueueMaxSize (): number {
53 return this.tasksQueue.maxSize
54 }
55
56 /** @inheritdoc */
57 public enqueueTask (task: Task<Data>): number {
58 return this.tasksQueue.enqueue(task)
59 }
60
61 /** @inheritdoc */
62 public dequeueTask (): Task<Data> | undefined {
63 return this.tasksQueue.dequeue()
64 }
65
66 /** @inheritdoc */
67 public clearTasksQueue (): void {
68 this.tasksQueue.clear()
69 }
70
71 /** @inheritdoc */
72 public resetUsage (): void {
73 this.usage = this.initWorkerUsage()
74 this.tasksUsage.clear()
75 }
76
77 /** @inheritdoc */
78 public closeChannel (): void {
79 if (this.info.messageChannel != null) {
80 this.info.messageChannel?.port1.close()
81 this.info.messageChannel?.port2.close()
82 delete this.info.messageChannel
83 }
84 }
85
86 /** @inheritdoc */
87 public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
88 if (!this.tasksUsage.has(name)) {
89 this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
90 }
91 return this.tasksUsage.get(name)
92 }
93
94 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
95 return {
96 id: this.getWorkerId(worker, workerType),
97 type: workerType,
98 dynamic: false,
99 ready: false,
100 ...(workerType === WorkerTypes.thread && {
101 messageChannel: new MessageChannel()
102 })
103 }
104 }
105
106 private initWorkerUsage (): WorkerUsage {
107 const getTasksQueueSize = (): number => {
108 return this.tasksQueueSize()
109 }
110 const getTasksQueueMaxSize = (): number => {
111 return this.tasksQueueMaxSize()
112 }
113 return {
114 tasks: {
115 executed: 0,
116 executing: 0,
117 get queued (): number {
118 return getTasksQueueSize()
119 },
120 get maxQueued (): number {
121 return getTasksQueueMaxSize()
122 },
123 failed: 0
124 },
125 runTime: {
126 history: new CircularArray()
127 },
128 waitTime: {
129 history: new CircularArray()
130 },
131 elu: {
132 idle: {
133 history: new CircularArray()
134 },
135 active: {
136 history: new CircularArray()
137 }
138 }
139 }
140 }
141
142 private initTaskWorkerUsage (name: string): WorkerUsage {
143 const getTaskQueueSize = (): number => {
144 let taskQueueSize = 0
145 for (const task of this.tasksQueue) {
146 if (task.name === name) {
147 ++taskQueueSize
148 }
149 }
150 return taskQueueSize
151 }
152 return {
153 tasks: {
154 executed: 0,
155 executing: 0,
156 get queued (): number {
157 return getTaskQueueSize()
158 },
159 failed: 0
160 },
161 runTime: {
162 history: new CircularArray()
163 },
164 waitTime: {
165 history: new CircularArray()
166 },
167 elu: {
168 idle: {
169 history: new CircularArray()
170 },
171 active: {
172 history: new CircularArray()
173 }
174 }
175 }
176 }
177
178 /**
179 * Gets the worker id.
180 *
181 * @param worker - The worker.
182 * @param workerType - The worker type.
183 * @returns The worker id.
184 */
185 private getWorkerId (
186 worker: Worker,
187 workerType: WorkerType
188 ): number | undefined {
189 if (workerType === WorkerTypes.thread) {
190 return worker.threadId
191 } else if (workerType === WorkerTypes.cluster) {
192 return worker.id
193 }
194 }
195 }