refactor: cleanup worker error handling code
[poolifier.git] / tests / pools / worker-node.test.mjs
1 import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
2 import { Worker as ClusterWorker } from 'node:cluster'
3 import { expect } from 'expect'
4 import { WorkerNode } from '../../lib/pools/worker-node.js'
5 import { WorkerTypes } from '../../lib/index.js'
6 import { CircularArray } from '../../lib/circular-array.js'
7 import { Deque } from '../../lib/deque.js'
8 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
9
10 describe('Worker node test suite', () => {
11 const threadWorkerNode = new WorkerNode(
12 WorkerTypes.thread,
13 './tests/worker-files/thread/testWorker.mjs',
14 { tasksQueueBackPressureSize: 12 }
15 )
16 const clusterWorkerNode = new WorkerNode(
17 WorkerTypes.cluster,
18 './tests/worker-files/cluster/testWorker.js',
19 { tasksQueueBackPressureSize: 12 }
20 )
21
22 it('Worker node instantiation', () => {
23 expect(() => new WorkerNode()).toThrow(
24 new TypeError('Cannot construct a worker node without a worker type')
25 )
26 expect(
27 () =>
28 new WorkerNode(
29 'invalidWorkerType',
30 './tests/worker-files/thread/testWorker.mjs',
31 { tasksQueueBackPressureSize: 12 }
32 )
33 ).toThrow(
34 new TypeError(
35 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
36 )
37 )
38 expect(
39 () =>
40 new WorkerNode(
41 WorkerTypes.thread,
42 './tests/worker-files/thread/testWorker.mjs'
43 )
44 ).toThrow(
45 new TypeError(
46 'Cannot construct a worker node without worker node options'
47 )
48 )
49 expect(
50 () =>
51 new WorkerNode(
52 WorkerTypes.thread,
53 './tests/worker-files/thread/testWorker.mjs',
54 ''
55 )
56 ).toThrow(
57 new TypeError(
58 'Cannot construct a worker node with invalid options: must be a plain object'
59 )
60 )
61 expect(
62 () =>
63 new WorkerNode(
64 WorkerTypes.thread,
65 './tests/worker-files/thread/testWorker.mjs',
66 {}
67 )
68 ).toThrow(
69 new TypeError(
70 'Cannot construct a worker node without a tasks queue back pressure size option'
71 )
72 )
73 expect(
74 () =>
75 new WorkerNode(
76 WorkerTypes.thread,
77 './tests/worker-files/thread/testWorker.mjs',
78 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
79 )
80 ).toThrow(
81 new TypeError(
82 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
83 )
84 )
85 expect(
86 () =>
87 new WorkerNode(
88 WorkerTypes.thread,
89 './tests/worker-files/thread/testWorker.mjs',
90 { tasksQueueBackPressureSize: 0.2 }
91 )
92 ).toThrow(
93 new TypeError(
94 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
95 )
96 )
97 expect(
98 () =>
99 new WorkerNode(
100 WorkerTypes.thread,
101 './tests/worker-files/thread/testWorker.mjs',
102 { tasksQueueBackPressureSize: 0 }
103 )
104 ).toThrow(
105 new RangeError(
106 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
107 )
108 )
109 expect(
110 () =>
111 new WorkerNode(
112 WorkerTypes.thread,
113 './tests/worker-files/thread/testWorker.mjs',
114 { tasksQueueBackPressureSize: -1 }
115 )
116 ).toThrow(
117 new RangeError(
118 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
119 )
120 )
121 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
122 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
123 expect(threadWorkerNode.info).toStrictEqual({
124 id: threadWorkerNode.worker.threadId,
125 type: WorkerTypes.thread,
126 dynamic: false,
127 ready: false
128 })
129 expect(threadWorkerNode.usage).toStrictEqual({
130 tasks: {
131 executed: 0,
132 executing: 0,
133 queued: 0,
134 maxQueued: 0,
135 sequentiallyStolen: 0,
136 stolen: 0,
137 failed: 0
138 },
139 runTime: {
140 history: new CircularArray()
141 },
142 waitTime: {
143 history: new CircularArray()
144 },
145 elu: {
146 idle: {
147 history: new CircularArray()
148 },
149 active: {
150 history: new CircularArray()
151 }
152 }
153 })
154 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
155 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
156 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
157 expect(threadWorkerNode.tasksQueue.size).toBe(0)
158 expect(threadWorkerNode.tasksQueueSize()).toBe(
159 threadWorkerNode.tasksQueue.size
160 )
161 expect(threadWorkerNode.onBackPressureStarted).toBe(false)
162 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
163
164 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
165 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
166 expect(clusterWorkerNode.info).toStrictEqual({
167 id: clusterWorkerNode.worker.id,
168 type: WorkerTypes.cluster,
169 dynamic: false,
170 ready: false
171 })
172 expect(clusterWorkerNode.usage).toStrictEqual({
173 tasks: {
174 executed: 0,
175 executing: 0,
176 queued: 0,
177 maxQueued: 0,
178 sequentiallyStolen: 0,
179 stolen: 0,
180 failed: 0
181 },
182 runTime: {
183 history: new CircularArray()
184 },
185 waitTime: {
186 history: new CircularArray()
187 },
188 elu: {
189 idle: {
190 history: new CircularArray()
191 },
192 active: {
193 history: new CircularArray()
194 }
195 }
196 })
197 expect(clusterWorkerNode.messageChannel).toBeUndefined()
198 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
199 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
200 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
201 expect(clusterWorkerNode.tasksQueueSize()).toBe(
202 clusterWorkerNode.tasksQueue.size
203 )
204 expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
205 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
206 })
207
208 it('Worker node getTaskFunctionWorkerUsage()', () => {
209 expect(() =>
210 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
211 ).toThrow(
212 new TypeError(
213 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
214 )
215 )
216 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
217 expect(() =>
218 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
219 ).toThrow(
220 new TypeError(
221 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
222 )
223 )
224 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
225 expect(
226 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
227 ).toStrictEqual({
228 tasks: {
229 executed: 0,
230 executing: 0,
231 queued: 0,
232 sequentiallyStolen: 0,
233 stolen: 0,
234 failed: 0
235 },
236 runTime: {
237 history: new CircularArray()
238 },
239 waitTime: {
240 history: new CircularArray()
241 },
242 elu: {
243 idle: {
244 history: new CircularArray()
245 },
246 active: {
247 history: new CircularArray()
248 }
249 }
250 })
251 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
252 tasks: {
253 executed: 0,
254 executing: 0,
255 queued: 0,
256 sequentiallyStolen: 0,
257 stolen: 0,
258 failed: 0
259 },
260 runTime: {
261 history: new CircularArray()
262 },
263 waitTime: {
264 history: new CircularArray()
265 },
266 elu: {
267 idle: {
268 history: new CircularArray()
269 },
270 active: {
271 history: new CircularArray()
272 }
273 }
274 })
275 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
276 tasks: {
277 executed: 0,
278 executing: 0,
279 queued: 0,
280 sequentiallyStolen: 0,
281 stolen: 0,
282 failed: 0
283 },
284 runTime: {
285 history: new CircularArray()
286 },
287 waitTime: {
288 history: new CircularArray()
289 },
290 elu: {
291 idle: {
292 history: new CircularArray()
293 },
294 active: {
295 history: new CircularArray()
296 }
297 }
298 })
299 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
300 })
301
302 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
303 expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
304 DEFAULT_TASK_NAME,
305 'fn1',
306 'fn2'
307 ])
308 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
309 expect(
310 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
311 ).toBe(false)
312 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
313 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
314 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
315 })
316 })