fix: unref() message port at worker exit
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48
JB
2import { CircularArray } from '../circular-array'
3import { Queue } from '../queue'
5c4d16da 4import type { Task } from '../utility-types'
4b628b48
JB
5import {
6 type IWorker,
7 type IWorkerNode,
4b628b48
JB
8 type WorkerInfo,
9 type WorkerType,
10 WorkerTypes,
11 type WorkerUsage
12} from './worker'
13
60664f48
JB
14/**
15 * Worker node.
16 *
17 * @typeParam Worker - Type of worker.
18 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
19 */
4b628b48
JB
20export class WorkerNode<Worker extends IWorker, Data = unknown>
21implements IWorkerNode<Worker, Data> {
22 public readonly worker: Worker
23 public readonly info: WorkerInfo
24 public usage: WorkerUsage
ff128cc9 25 private readonly tasksUsage: Map<string, WorkerUsage>
4b628b48
JB
26 private readonly tasksQueue: Queue<Task<Data>>
27
60664f48
JB
28 /**
29 * Constructs a new worker node.
30 *
31 * @param worker - The worker.
32 * @param workerType - The worker type.
60664f48 33 */
4b628b48
JB
34 constructor (worker: Worker, workerType: WorkerType) {
35 this.worker = worker
36 this.info = this.initWorkerInfo(worker, workerType)
37 this.usage = this.initWorkerUsage()
ff128cc9 38 this.tasksUsage = new Map<string, WorkerUsage>()
4b628b48
JB
39 this.tasksQueue = new Queue<Task<Data>>()
40 }
41
42 /** @inheritdoc */
43 public tasksQueueSize (): number {
44 return this.tasksQueue.size
45 }
46
47 /**
eb8afc8a 48 * Tasks queue maximum size.
4b628b48
JB
49 *
50 * @returns The tasks queue maximum size.
51 */
52 private tasksQueueMaxSize (): number {
53 return this.tasksQueue.maxSize
54 }
55
56 /** @inheritdoc */
57 public enqueueTask (task: Task<Data>): number {
58 return this.tasksQueue.enqueue(task)
59 }
60
61 /** @inheritdoc */
62 public dequeueTask (): Task<Data> | undefined {
63 return this.tasksQueue.dequeue()
64 }
65
66 /** @inheritdoc */
67 public clearTasksQueue (): void {
68 this.tasksQueue.clear()
69 }
70
ff128cc9 71 /** @inheritdoc */
4b628b48
JB
72 public resetUsage (): void {
73 this.usage = this.initWorkerUsage()
ff128cc9
JB
74 this.tasksUsage.clear()
75 }
76
3f09ed9f
JB
77 /** @inheritdoc */
78 public closeChannel (): void {
79 if (this.info.messageChannel != null) {
984dc9c8
JB
80 this.info.messageChannel?.port1.unref()
81 this.info.messageChannel?.port2.unref()
3f09ed9f
JB
82 this.info.messageChannel?.port1.close()
83 this.info.messageChannel?.port2.close()
84 delete this.info.messageChannel
85 }
86 }
87
ff128cc9 88 /** @inheritdoc */
ce1b31be 89 public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
ff128cc9 90 if (!this.tasksUsage.has(name)) {
b25a42cd 91 this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
ff128cc9
JB
92 }
93 return this.tasksUsage.get(name)
4b628b48
JB
94 }
95
96 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
97 return {
98 id: this.getWorkerId(worker, workerType),
99 type: workerType,
100 dynamic: false,
85aeb3f3
JB
101 ready: false,
102 ...(workerType === WorkerTypes.thread && {
103 messageChannel: new MessageChannel()
104 })
4b628b48
JB
105 }
106 }
107
108 private initWorkerUsage (): WorkerUsage {
109 const getTasksQueueSize = (): number => {
110 return this.tasksQueueSize()
111 }
bf4ef2ca 112 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
113 return this.tasksQueueMaxSize()
114 }
115 return {
116 tasks: {
117 executed: 0,
118 executing: 0,
119 get queued (): number {
120 return getTasksQueueSize()
121 },
122 get maxQueued (): number {
bf4ef2ca 123 return getTasksQueueMaxSize()
4b628b48
JB
124 },
125 failed: 0
126 },
127 runTime: {
128 history: new CircularArray()
129 },
130 waitTime: {
131 history: new CircularArray()
132 },
133 elu: {
134 idle: {
135 history: new CircularArray()
136 },
137 active: {
138 history: new CircularArray()
139 }
140 }
141 }
142 }
143
b25a42cd
JB
144 private initTaskWorkerUsage (name: string): WorkerUsage {
145 const getTaskQueueSize = (): number => {
146 let taskQueueSize = 0
147 for (const task of this.tasksQueue) {
148 if (task.name === name) {
149 ++taskQueueSize
150 }
151 }
152 return taskQueueSize
153 }
154 return {
155 tasks: {
156 executed: 0,
157 executing: 0,
158 get queued (): number {
159 return getTaskQueueSize()
160 },
161 failed: 0
162 },
163 runTime: {
164 history: new CircularArray()
165 },
166 waitTime: {
167 history: new CircularArray()
168 },
169 elu: {
170 idle: {
171 history: new CircularArray()
172 },
173 active: {
174 history: new CircularArray()
175 }
176 }
177 }
178 }
179
4b628b48
JB
180 /**
181 * Gets the worker id.
182 *
183 * @param worker - The worker.
60664f48 184 * @param workerType - The worker type.
4b628b48
JB
185 * @returns The worker id.
186 */
187 private getWorkerId (
188 worker: Worker,
189 workerType: WorkerType
190 ): number | undefined {
191 if (workerType === WorkerTypes.thread) {
192 return worker.threadId
193 } else if (workerType === WorkerTypes.cluster) {
194 return worker.id
195 }
196 }
197}