fix: unref() message port at worker exit
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import { Queue } from '../queue'
4 import type { Task } from '../utility-types'
5 import {
6 type IWorker,
7 type IWorkerNode,
8 type WorkerInfo,
9 type WorkerType,
10 WorkerTypes,
11 type WorkerUsage
12 } from './worker'
13
14 /**
15 * Worker node.
16 *
17 * @typeParam Worker - Type of worker.
18 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
19 */
20 export class WorkerNode<Worker extends IWorker, Data = unknown>
21 implements IWorkerNode<Worker, Data> {
22 public readonly worker: Worker
23 public readonly info: WorkerInfo
24 public usage: WorkerUsage
25 private readonly tasksUsage: Map<string, WorkerUsage>
26 private readonly tasksQueue: Queue<Task<Data>>
27
28 /**
29 * Constructs a new worker node.
30 *
31 * @param worker - The worker.
32 * @param workerType - The worker type.
33 */
34 constructor (worker: Worker, workerType: WorkerType) {
35 this.worker = worker
36 this.info = this.initWorkerInfo(worker, workerType)
37 this.usage = this.initWorkerUsage()
38 this.tasksUsage = new Map<string, WorkerUsage>()
39 this.tasksQueue = new Queue<Task<Data>>()
40 }
41
42 /** @inheritdoc */
43 public tasksQueueSize (): number {
44 return this.tasksQueue.size
45 }
46
47 /**
48 * Tasks queue maximum size.
49 *
50 * @returns The tasks queue maximum size.
51 */
52 private tasksQueueMaxSize (): number {
53 return this.tasksQueue.maxSize
54 }
55
56 /** @inheritdoc */
57 public enqueueTask (task: Task<Data>): number {
58 return this.tasksQueue.enqueue(task)
59 }
60
61 /** @inheritdoc */
62 public dequeueTask (): Task<Data> | undefined {
63 return this.tasksQueue.dequeue()
64 }
65
66 /** @inheritdoc */
67 public clearTasksQueue (): void {
68 this.tasksQueue.clear()
69 }
70
71 /** @inheritdoc */
72 public resetUsage (): void {
73 this.usage = this.initWorkerUsage()
74 this.tasksUsage.clear()
75 }
76
77 /** @inheritdoc */
78 public closeChannel (): void {
79 if (this.info.messageChannel != null) {
80 this.info.messageChannel?.port1.unref()
81 this.info.messageChannel?.port2.unref()
82 this.info.messageChannel?.port1.close()
83 this.info.messageChannel?.port2.close()
84 delete this.info.messageChannel
85 }
86 }
87
88 /** @inheritdoc */
89 public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
90 if (!this.tasksUsage.has(name)) {
91 this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
92 }
93 return this.tasksUsage.get(name)
94 }
95
96 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
97 return {
98 id: this.getWorkerId(worker, workerType),
99 type: workerType,
100 dynamic: false,
101 ready: false,
102 ...(workerType === WorkerTypes.thread && {
103 messageChannel: new MessageChannel()
104 })
105 }
106 }
107
108 private initWorkerUsage (): WorkerUsage {
109 const getTasksQueueSize = (): number => {
110 return this.tasksQueueSize()
111 }
112 const getTasksQueueMaxSize = (): number => {
113 return this.tasksQueueMaxSize()
114 }
115 return {
116 tasks: {
117 executed: 0,
118 executing: 0,
119 get queued (): number {
120 return getTasksQueueSize()
121 },
122 get maxQueued (): number {
123 return getTasksQueueMaxSize()
124 },
125 failed: 0
126 },
127 runTime: {
128 history: new CircularArray()
129 },
130 waitTime: {
131 history: new CircularArray()
132 },
133 elu: {
134 idle: {
135 history: new CircularArray()
136 },
137 active: {
138 history: new CircularArray()
139 }
140 }
141 }
142 }
143
144 private initTaskWorkerUsage (name: string): WorkerUsage {
145 const getTaskQueueSize = (): number => {
146 let taskQueueSize = 0
147 for (const task of this.tasksQueue) {
148 if (task.name === name) {
149 ++taskQueueSize
150 }
151 }
152 return taskQueueSize
153 }
154 return {
155 tasks: {
156 executed: 0,
157 executing: 0,
158 get queued (): number {
159 return getTaskQueueSize()
160 },
161 failed: 0
162 },
163 runTime: {
164 history: new CircularArray()
165 },
166 waitTime: {
167 history: new CircularArray()
168 },
169 elu: {
170 idle: {
171 history: new CircularArray()
172 },
173 active: {
174 history: new CircularArray()
175 }
176 }
177 }
178 }
179
180 /**
181 * Gets the worker id.
182 *
183 * @param worker - The worker.
184 * @param workerType - The worker type.
185 * @returns The worker id.
186 */
187 private getWorkerId (
188 worker: Worker,
189 workerType: WorkerType
190 ): number | undefined {
191 if (workerType === WorkerTypes.thread) {
192 return worker.threadId
193 } else if (workerType === WorkerTypes.cluster) {
194 return worker.id
195 }
196 }
197 }