Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48
JB
2import { CircularArray } from '../circular-array'
3import { Queue } from '../queue'
5c4d16da 4import type { Task } from '../utility-types'
b558f6b5 5import { DEFAULT_TASK_NAME } from '../utils'
4b628b48
JB
6import {
7 type IWorker,
8 type IWorkerNode,
4b628b48
JB
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13} from './worker'
14
60664f48
JB
15/**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
4b628b48
JB
21export class WorkerNode<Worker extends IWorker, Data = unknown>
22implements IWorkerNode<Worker, Data> {
23 public readonly worker: Worker
24 public readonly info: WorkerInfo
25 public usage: WorkerUsage
b558f6b5 26 public taskFunctions!: string[]
ff128cc9 27 private readonly tasksUsage: Map<string, WorkerUsage>
4b628b48
JB
28 private readonly tasksQueue: Queue<Task<Data>>
29
60664f48
JB
30 /**
31 * Constructs a new worker node.
32 *
33 * @param worker - The worker.
34 * @param workerType - The worker type.
60664f48 35 */
4b628b48
JB
36 constructor (worker: Worker, workerType: WorkerType) {
37 this.worker = worker
38 this.info = this.initWorkerInfo(worker, workerType)
39 this.usage = this.initWorkerUsage()
ff128cc9 40 this.tasksUsage = new Map<string, WorkerUsage>()
4b628b48
JB
41 this.tasksQueue = new Queue<Task<Data>>()
42 }
43
44 /** @inheritdoc */
45 public tasksQueueSize (): number {
46 return this.tasksQueue.size
47 }
48
49 /**
eb8afc8a 50 * Tasks queue maximum size.
4b628b48
JB
51 *
52 * @returns The tasks queue maximum size.
53 */
54 private tasksQueueMaxSize (): number {
55 return this.tasksQueue.maxSize
56 }
57
58 /** @inheritdoc */
59 public enqueueTask (task: Task<Data>): number {
60 return this.tasksQueue.enqueue(task)
61 }
62
63 /** @inheritdoc */
64 public dequeueTask (): Task<Data> | undefined {
65 return this.tasksQueue.dequeue()
66 }
67
68 /** @inheritdoc */
69 public clearTasksQueue (): void {
70 this.tasksQueue.clear()
71 }
72
ff128cc9 73 /** @inheritdoc */
4b628b48
JB
74 public resetUsage (): void {
75 this.usage = this.initWorkerUsage()
ff128cc9
JB
76 this.tasksUsage.clear()
77 }
78
3f09ed9f
JB
79 /** @inheritdoc */
80 public closeChannel (): void {
81 if (this.info.messageChannel != null) {
984dc9c8
JB
82 this.info.messageChannel?.port1.unref()
83 this.info.messageChannel?.port2.unref()
3f09ed9f
JB
84 this.info.messageChannel?.port1.close()
85 this.info.messageChannel?.port2.close()
86 delete this.info.messageChannel
87 }
88 }
89
ff128cc9 90 /** @inheritdoc */
ce1b31be 91 public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
b558f6b5
JB
92 if (
93 name === DEFAULT_TASK_NAME &&
94 Array.isArray(this.taskFunctions) &&
95 this.taskFunctions.length > 1
96 ) {
97 name = this.taskFunctions[1]
98 }
ff128cc9 99 if (!this.tasksUsage.has(name)) {
b25a42cd 100 this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
ff128cc9
JB
101 }
102 return this.tasksUsage.get(name)
4b628b48
JB
103 }
104
105 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
106 return {
107 id: this.getWorkerId(worker, workerType),
108 type: workerType,
109 dynamic: false,
85aeb3f3
JB
110 ready: false,
111 ...(workerType === WorkerTypes.thread && {
112 messageChannel: new MessageChannel()
113 })
4b628b48
JB
114 }
115 }
116
117 private initWorkerUsage (): WorkerUsage {
118 const getTasksQueueSize = (): number => {
119 return this.tasksQueueSize()
120 }
bf4ef2ca 121 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
122 return this.tasksQueueMaxSize()
123 }
124 return {
125 tasks: {
126 executed: 0,
127 executing: 0,
128 get queued (): number {
129 return getTasksQueueSize()
130 },
131 get maxQueued (): number {
bf4ef2ca 132 return getTasksQueueMaxSize()
4b628b48
JB
133 },
134 failed: 0
135 },
136 runTime: {
137 history: new CircularArray()
138 },
139 waitTime: {
140 history: new CircularArray()
141 },
142 elu: {
143 idle: {
144 history: new CircularArray()
145 },
146 active: {
147 history: new CircularArray()
148 }
149 }
150 }
151 }
152
b25a42cd
JB
153 private initTaskWorkerUsage (name: string): WorkerUsage {
154 const getTaskQueueSize = (): number => {
155 let taskQueueSize = 0
156 for (const task of this.tasksQueue) {
157 if (task.name === name) {
158 ++taskQueueSize
159 }
160 }
161 return taskQueueSize
162 }
163 return {
164 tasks: {
165 executed: 0,
166 executing: 0,
167 get queued (): number {
168 return getTaskQueueSize()
169 },
170 failed: 0
171 },
172 runTime: {
173 history: new CircularArray()
174 },
175 waitTime: {
176 history: new CircularArray()
177 },
178 elu: {
179 idle: {
180 history: new CircularArray()
181 },
182 active: {
183 history: new CircularArray()
184 }
185 }
186 }
187 }
188
4b628b48
JB
189 /**
190 * Gets the worker id.
191 *
192 * @param worker - The worker.
60664f48 193 * @param workerType - The worker type.
4b628b48
JB
194 * @returns The worker id.
195 */
196 private getWorkerId (
197 worker: Worker,
198 workerType: WorkerType
199 ): number | undefined {
200 if (workerType === WorkerTypes.thread) {
201 return worker.threadId
202 } else if (workerType === WorkerTypes.cluster) {
203 return worker.id
204 }
205 }
206}