]>
Commit | Line | Data |
---|---|---|
1 | import { expect } from '@std/expect' | |
2 | import { Worker as ClusterWorker } from 'node:cluster' | |
3 | import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads' | |
4 | ||
5 | import { CircularBuffer } from '../../lib/circular-buffer.cjs' | |
6 | import { WorkerTypes } from '../../lib/index.cjs' | |
7 | import { WorkerNode } from '../../lib/pools/worker-node.cjs' | |
8 | import { MeasurementHistorySize } from '../../lib/pools/worker.cjs' | |
9 | import { PriorityQueue } from '../../lib/queues/priority-queue.cjs' | |
10 | import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' | |
11 | ||
12 | describe('Worker node test suite', () => { | |
13 | let clusterWorkerNode, threadWorkerNode | |
14 | ||
15 | before('Create worker nodes', () => { | |
16 | threadWorkerNode = new WorkerNode( | |
17 | WorkerTypes.thread, | |
18 | './tests/worker-files/thread/testWorker.mjs', | |
19 | { | |
20 | tasksQueueBackPressureSize: 12, | |
21 | tasksQueueBucketSize: 6, | |
22 | tasksQueuePriority: true, | |
23 | } | |
24 | ) | |
25 | clusterWorkerNode = new WorkerNode( | |
26 | WorkerTypes.cluster, | |
27 | './tests/worker-files/cluster/testWorker.cjs', | |
28 | { | |
29 | tasksQueueBackPressureSize: 12, | |
30 | tasksQueueBucketSize: 6, | |
31 | tasksQueuePriority: true, | |
32 | } | |
33 | ) | |
34 | }) | |
35 | ||
36 | after('Terminate worker nodes', async () => { | |
37 | await threadWorkerNode.terminate() | |
38 | await clusterWorkerNode.terminate() | |
39 | }) | |
40 | ||
41 | it('Worker node instantiation', () => { | |
42 | expect(() => new WorkerNode()).toThrow( | |
43 | new TypeError('Cannot construct a worker node without a worker type') | |
44 | ) | |
45 | expect( | |
46 | () => | |
47 | new WorkerNode( | |
48 | 'invalidWorkerType', | |
49 | './tests/worker-files/thread/testWorker.mjs' | |
50 | ) | |
51 | ).toThrow( | |
52 | new TypeError( | |
53 | "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'" | |
54 | ) | |
55 | ) | |
56 | expect( | |
57 | () => | |
58 | new WorkerNode( | |
59 | WorkerTypes.thread, | |
60 | './tests/worker-files/thread/testWorker.mjs' | |
61 | ) | |
62 | ).toThrow( | |
63 | new TypeError( | |
64 | 'Cannot construct a worker node without worker node options' | |
65 | ) | |
66 | ) | |
67 | expect( | |
68 | () => | |
69 | new WorkerNode( | |
70 | WorkerTypes.thread, | |
71 | './tests/worker-files/thread/testWorker.mjs', | |
72 | '' | |
73 | ) | |
74 | ).toThrow( | |
75 | new TypeError( | |
76 | 'Cannot construct a worker node with invalid worker node options: must be a plain object' | |
77 | ) | |
78 | ) | |
79 | expect( | |
80 | () => | |
81 | new WorkerNode( | |
82 | WorkerTypes.thread, | |
83 | './tests/worker-files/thread/testWorker.mjs', | |
84 | {} | |
85 | ) | |
86 | ).toThrow( | |
87 | new TypeError( | |
88 | 'Cannot construct a worker node without a tasks queue back pressure size option' | |
89 | ) | |
90 | ) | |
91 | expect( | |
92 | () => | |
93 | new WorkerNode( | |
94 | WorkerTypes.thread, | |
95 | './tests/worker-files/thread/testWorker.mjs', | |
96 | { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' } | |
97 | ) | |
98 | ).toThrow( | |
99 | new TypeError( | |
100 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' | |
101 | ) | |
102 | ) | |
103 | expect( | |
104 | () => | |
105 | new WorkerNode( | |
106 | WorkerTypes.thread, | |
107 | './tests/worker-files/thread/testWorker.mjs', | |
108 | { tasksQueueBackPressureSize: 0.2 } | |
109 | ) | |
110 | ).toThrow( | |
111 | new TypeError( | |
112 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer' | |
113 | ) | |
114 | ) | |
115 | expect( | |
116 | () => | |
117 | new WorkerNode( | |
118 | WorkerTypes.thread, | |
119 | './tests/worker-files/thread/testWorker.mjs', | |
120 | { tasksQueueBackPressureSize: 0 } | |
121 | ) | |
122 | ).toThrow( | |
123 | new RangeError( | |
124 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' | |
125 | ) | |
126 | ) | |
127 | expect( | |
128 | () => | |
129 | new WorkerNode( | |
130 | WorkerTypes.thread, | |
131 | './tests/worker-files/thread/testWorker.mjs', | |
132 | { tasksQueueBackPressureSize: -1 } | |
133 | ) | |
134 | ).toThrow( | |
135 | new RangeError( | |
136 | 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' | |
137 | ) | |
138 | ) | |
139 | expect( | |
140 | () => | |
141 | new WorkerNode( | |
142 | WorkerTypes.thread, | |
143 | './tests/worker-files/thread/testWorker.mjs', | |
144 | { | |
145 | tasksQueueBackPressureSize: 12, | |
146 | } | |
147 | ) | |
148 | ).toThrow( | |
149 | new TypeError( | |
150 | 'Cannot construct a worker node without a tasks queue bucket size option' | |
151 | ) | |
152 | ) | |
153 | expect( | |
154 | () => | |
155 | new WorkerNode( | |
156 | WorkerTypes.thread, | |
157 | './tests/worker-files/thread/testWorker.mjs', | |
158 | { | |
159 | tasksQueueBackPressureSize: 12, | |
160 | tasksQueueBucketSize: 'invalidTasksQueueBucketSize', | |
161 | } | |
162 | ) | |
163 | ).toThrow( | |
164 | new TypeError( | |
165 | 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' | |
166 | ) | |
167 | ) | |
168 | expect( | |
169 | () => | |
170 | new WorkerNode( | |
171 | WorkerTypes.thread, | |
172 | './tests/worker-files/thread/testWorker.mjs', | |
173 | { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 } | |
174 | ) | |
175 | ).toThrow( | |
176 | new TypeError( | |
177 | 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' | |
178 | ) | |
179 | ) | |
180 | expect( | |
181 | () => | |
182 | new WorkerNode( | |
183 | WorkerTypes.thread, | |
184 | './tests/worker-files/thread/testWorker.mjs', | |
185 | { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 } | |
186 | ) | |
187 | ).toThrow( | |
188 | new RangeError( | |
189 | 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' | |
190 | ) | |
191 | ) | |
192 | expect( | |
193 | () => | |
194 | new WorkerNode( | |
195 | WorkerTypes.thread, | |
196 | './tests/worker-files/thread/testWorker.mjs', | |
197 | { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 } | |
198 | ) | |
199 | ).toThrow( | |
200 | new RangeError( | |
201 | 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' | |
202 | ) | |
203 | ) | |
204 | expect( | |
205 | () => | |
206 | new WorkerNode( | |
207 | WorkerTypes.thread, | |
208 | './tests/worker-files/thread/testWorker.mjs', | |
209 | { | |
210 | tasksQueueBackPressureSize: 12, | |
211 | tasksQueueBucketSize: 6, | |
212 | } | |
213 | ) | |
214 | ).toThrow( | |
215 | new TypeError( | |
216 | 'Cannot construct a worker node without a tasks queue priority option' | |
217 | ) | |
218 | ) | |
219 | expect( | |
220 | () => | |
221 | new WorkerNode( | |
222 | WorkerTypes.thread, | |
223 | './tests/worker-files/thread/testWorker.mjs', | |
224 | { | |
225 | tasksQueueBackPressureSize: 12, | |
226 | tasksQueueBucketSize: 6, | |
227 | tasksQueuePriority: 'invalidTasksQueuePriority', | |
228 | } | |
229 | ) | |
230 | ).toThrow( | |
231 | new TypeError( | |
232 | 'Cannot construct a worker node with a tasks queue priority option that is not a boolean' | |
233 | ) | |
234 | ) | |
235 | expect(threadWorkerNode).toBeInstanceOf(WorkerNode) | |
236 | expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker) | |
237 | expect(threadWorkerNode.info).toStrictEqual({ | |
238 | backPressure: false, | |
239 | backPressureStealing: false, | |
240 | continuousStealing: false, | |
241 | dynamic: false, | |
242 | id: threadWorkerNode.worker.threadId, | |
243 | queuedTaskAbortion: false, | |
244 | ready: false, | |
245 | stealing: false, | |
246 | stolen: false, | |
247 | type: WorkerTypes.thread, | |
248 | }) | |
249 | expect(threadWorkerNode.usage).toStrictEqual({ | |
250 | elu: { | |
251 | active: { | |
252 | history: expect.any(CircularBuffer), | |
253 | }, | |
254 | idle: { | |
255 | history: expect.any(CircularBuffer), | |
256 | }, | |
257 | }, | |
258 | runTime: { | |
259 | history: expect.any(CircularBuffer), | |
260 | }, | |
261 | tasks: { | |
262 | executed: 0, | |
263 | executing: 0, | |
264 | failed: 0, | |
265 | maxQueued: 0, | |
266 | queued: 0, | |
267 | sequentiallyStolen: 0, | |
268 | stolen: 0, | |
269 | }, | |
270 | waitTime: { | |
271 | history: expect.any(CircularBuffer), | |
272 | }, | |
273 | }) | |
274 | expect(threadWorkerNode.usage.runTime.history.items.length).toBe( | |
275 | MeasurementHistorySize | |
276 | ) | |
277 | expect(threadWorkerNode.usage.waitTime.history.items.length).toBe( | |
278 | MeasurementHistorySize | |
279 | ) | |
280 | expect(threadWorkerNode.usage.elu.idle.history.items.length).toBe( | |
281 | MeasurementHistorySize | |
282 | ) | |
283 | expect(threadWorkerNode.usage.elu.active.history.items.length).toBe( | |
284 | MeasurementHistorySize | |
285 | ) | |
286 | expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel) | |
287 | expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12) | |
288 | expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue) | |
289 | expect(threadWorkerNode.tasksQueue.size).toBe(0) | |
290 | expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6) | |
291 | expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true) | |
292 | expect(threadWorkerNode.tasksQueueSize()).toBe( | |
293 | threadWorkerNode.tasksQueue.size | |
294 | ) | |
295 | expect(threadWorkerNode.setBackPressureFlag).toBe(false) | |
296 | expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) | |
297 | ||
298 | expect(clusterWorkerNode).toBeInstanceOf(WorkerNode) | |
299 | expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker) | |
300 | expect(clusterWorkerNode.info).toStrictEqual({ | |
301 | backPressure: false, | |
302 | backPressureStealing: false, | |
303 | continuousStealing: false, | |
304 | dynamic: false, | |
305 | id: clusterWorkerNode.worker.id, | |
306 | queuedTaskAbortion: false, | |
307 | ready: false, | |
308 | stealing: false, | |
309 | stolen: false, | |
310 | type: WorkerTypes.cluster, | |
311 | }) | |
312 | expect(clusterWorkerNode.usage).toStrictEqual({ | |
313 | elu: { | |
314 | active: { | |
315 | history: expect.any(CircularBuffer), | |
316 | }, | |
317 | idle: { | |
318 | history: expect.any(CircularBuffer), | |
319 | }, | |
320 | }, | |
321 | runTime: { | |
322 | history: expect.any(CircularBuffer), | |
323 | }, | |
324 | tasks: { | |
325 | executed: 0, | |
326 | executing: 0, | |
327 | failed: 0, | |
328 | maxQueued: 0, | |
329 | queued: 0, | |
330 | sequentiallyStolen: 0, | |
331 | stolen: 0, | |
332 | }, | |
333 | waitTime: { | |
334 | history: expect.any(CircularBuffer), | |
335 | }, | |
336 | }) | |
337 | expect(clusterWorkerNode.usage.runTime.history.items.length).toBe( | |
338 | MeasurementHistorySize | |
339 | ) | |
340 | expect(clusterWorkerNode.usage.waitTime.history.items.length).toBe( | |
341 | MeasurementHistorySize | |
342 | ) | |
343 | expect(clusterWorkerNode.usage.elu.idle.history.items.length).toBe( | |
344 | MeasurementHistorySize | |
345 | ) | |
346 | expect(clusterWorkerNode.usage.elu.active.history.items.length).toBe( | |
347 | MeasurementHistorySize | |
348 | ) | |
349 | expect(clusterWorkerNode.messageChannel).toBeUndefined() | |
350 | expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12) | |
351 | expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue) | |
352 | expect(clusterWorkerNode.tasksQueue.size).toBe(0) | |
353 | expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6) | |
354 | expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true) | |
355 | expect(clusterWorkerNode.tasksQueueSize()).toBe( | |
356 | clusterWorkerNode.tasksQueue.size | |
357 | ) | |
358 | expect(clusterWorkerNode.setBackPressureFlag).toBe(false) | |
359 | expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) | |
360 | }) | |
361 | ||
362 | it('Worker node getTaskFunctionWorkerUsage()', () => { | |
363 | expect(() => | |
364 | threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction') | |
365 | ).toThrow( | |
366 | new Error( | |
367 | "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined" | |
368 | ) | |
369 | ) | |
370 | threadWorkerNode.info.taskFunctionsProperties = [ | |
371 | { name: DEFAULT_TASK_NAME }, | |
372 | { name: 'fn1' }, | |
373 | ] | |
374 | expect(() => | |
375 | threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction') | |
376 | ).toThrow( | |
377 | new Error( | |
378 | "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements" | |
379 | ) | |
380 | ) | |
381 | threadWorkerNode.info.taskFunctionsProperties = [ | |
382 | { name: DEFAULT_TASK_NAME }, | |
383 | { name: 'fn1' }, | |
384 | { name: 'fn2' }, | |
385 | ] | |
386 | expect( | |
387 | threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) | |
388 | ).toStrictEqual({ | |
389 | elu: { | |
390 | active: { | |
391 | history: expect.any(CircularBuffer), | |
392 | }, | |
393 | idle: { | |
394 | history: expect.any(CircularBuffer), | |
395 | }, | |
396 | }, | |
397 | runTime: { | |
398 | history: expect.any(CircularBuffer), | |
399 | }, | |
400 | tasks: { | |
401 | executed: 0, | |
402 | executing: 0, | |
403 | failed: 0, | |
404 | queued: 0, | |
405 | sequentiallyStolen: 0, | |
406 | stolen: 0, | |
407 | }, | |
408 | waitTime: { | |
409 | history: expect.any(CircularBuffer), | |
410 | }, | |
411 | }) | |
412 | expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({ | |
413 | elu: { | |
414 | active: { | |
415 | history: expect.any(CircularBuffer), | |
416 | }, | |
417 | idle: { | |
418 | history: expect.any(CircularBuffer), | |
419 | }, | |
420 | }, | |
421 | runTime: { | |
422 | history: expect.any(CircularBuffer), | |
423 | }, | |
424 | tasks: { | |
425 | executed: 0, | |
426 | executing: 0, | |
427 | failed: 0, | |
428 | queued: 0, | |
429 | sequentiallyStolen: 0, | |
430 | stolen: 0, | |
431 | }, | |
432 | waitTime: { | |
433 | history: expect.any(CircularBuffer), | |
434 | }, | |
435 | }) | |
436 | expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({ | |
437 | elu: { | |
438 | active: { | |
439 | history: expect.any(CircularBuffer), | |
440 | }, | |
441 | idle: { | |
442 | history: expect.any(CircularBuffer), | |
443 | }, | |
444 | }, | |
445 | runTime: { | |
446 | history: expect.any(CircularBuffer), | |
447 | }, | |
448 | tasks: { | |
449 | executed: 0, | |
450 | executing: 0, | |
451 | failed: 0, | |
452 | queued: 0, | |
453 | sequentiallyStolen: 0, | |
454 | stolen: 0, | |
455 | }, | |
456 | waitTime: { | |
457 | history: expect.any(CircularBuffer), | |
458 | }, | |
459 | }) | |
460 | expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2) | |
461 | }) | |
462 | ||
463 | it('Worker node deleteTaskFunctionWorkerUsage()', () => { | |
464 | expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([ | |
465 | { name: DEFAULT_TASK_NAME }, | |
466 | { name: 'fn1' }, | |
467 | { name: 'fn2' }, | |
468 | ]) | |
469 | expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2) | |
470 | expect( | |
471 | threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction') | |
472 | ).toBe(false) | |
473 | expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2) | |
474 | expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true) | |
475 | expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1) | |
476 | }) | |
477 | }) |