refactor: code cleanups
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 // eslint-disable-next-line n/no-unsupported-features/node-builtins
2 import { createHook, executionAsyncId } from 'node:async_hooks'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { readFileSync } from 'node:fs'
5 import { dirname, join } from 'node:path'
6 import { fileURLToPath } from 'node:url'
7
8 import { expect } from 'expect'
9 import { restore, stub } from 'sinon'
10
11 import { CircularArray } from '../../lib/circular-array.cjs'
12 import {
13 DynamicClusterPool,
14 DynamicThreadPool,
15 FixedClusterPool,
16 FixedThreadPool,
17 PoolEvents,
18 PoolTypes,
19 WorkerChoiceStrategies,
20 WorkerTypes
21 } from '../../lib/index.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23 import { PriorityQueue } from '../../lib/priority-queue.cjs'
24 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
25 import { waitPoolEvents } from '../test-utils.cjs'
26
27 describe('Abstract pool test suite', () => {
28 const version = JSON.parse(
29 readFileSync(
30 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 'utf8'
32 )
33 ).version
34 const numberOfWorkers = 2
35 class StubPoolWithIsMain extends FixedThreadPool {
36 isMain () {
37 return false
38 }
39 }
40
41 afterEach(() => {
42 restore()
43 })
44
45 it('Verify that pool can be created and destroyed', async () => {
46 const pool = new FixedThreadPool(
47 numberOfWorkers,
48 './tests/worker-files/thread/testWorker.mjs'
49 )
50 expect(pool).toBeInstanceOf(FixedThreadPool)
51 await pool.destroy()
52 })
53
54 it('Verify that pool cannot be created from a non main thread/process', () => {
55 expect(
56 () =>
57 new StubPoolWithIsMain(
58 numberOfWorkers,
59 './tests/worker-files/thread/testWorker.mjs',
60 {
61 errorHandler: e => console.error(e)
62 }
63 )
64 ).toThrow(
65 new Error(
66 'Cannot start a pool from a worker with the same type as the pool'
67 )
68 )
69 })
70
71 it('Verify that pool statuses properties are set', async () => {
72 const pool = new FixedThreadPool(
73 numberOfWorkers,
74 './tests/worker-files/thread/testWorker.mjs'
75 )
76 expect(pool.started).toBe(true)
77 expect(pool.starting).toBe(false)
78 expect(pool.destroying).toBe(false)
79 await pool.destroy()
80 })
81
82 it('Verify that filePath is checked', () => {
83 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
84 new TypeError('The worker file path must be specified')
85 )
86 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
87 new TypeError('The worker file path must be a string')
88 )
89 expect(
90 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
91 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
92 })
93
94 it('Verify that numberOfWorkers is checked', () => {
95 expect(
96 () =>
97 new FixedThreadPool(
98 undefined,
99 './tests/worker-files/thread/testWorker.mjs'
100 )
101 ).toThrow(
102 new Error(
103 'Cannot instantiate a pool without specifying the number of workers'
104 )
105 )
106 })
107
108 it('Verify that a negative number of workers is checked', () => {
109 expect(
110 () =>
111 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
112 ).toThrow(
113 new RangeError(
114 'Cannot instantiate a pool with a negative number of workers'
115 )
116 )
117 })
118
119 it('Verify that a non integer number of workers is checked', () => {
120 expect(
121 () =>
122 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
123 ).toThrow(
124 new TypeError(
125 'Cannot instantiate a pool with a non safe integer number of workers'
126 )
127 )
128 })
129
130 it('Verify that pool arguments number and pool type are checked', () => {
131 expect(
132 () =>
133 new FixedThreadPool(
134 numberOfWorkers,
135 './tests/worker-files/thread/testWorker.mjs',
136 undefined,
137 numberOfWorkers * 2
138 )
139 ).toThrow(
140 new Error(
141 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
142 )
143 )
144 })
145
146 it('Verify that dynamic pool sizing is checked', () => {
147 expect(
148 () =>
149 new DynamicClusterPool(
150 1,
151 undefined,
152 './tests/worker-files/cluster/testWorker.cjs'
153 )
154 ).toThrow(
155 new TypeError(
156 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
157 )
158 )
159 expect(
160 () =>
161 new DynamicThreadPool(
162 0.5,
163 1,
164 './tests/worker-files/thread/testWorker.mjs'
165 )
166 ).toThrow(
167 new TypeError(
168 'Cannot instantiate a pool with a non safe integer number of workers'
169 )
170 )
171 expect(
172 () =>
173 new DynamicClusterPool(
174 0,
175 0.5,
176 './tests/worker-files/cluster/testWorker.cjs'
177 )
178 ).toThrow(
179 new TypeError(
180 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
181 )
182 )
183 expect(
184 () =>
185 new DynamicThreadPool(
186 2,
187 1,
188 './tests/worker-files/thread/testWorker.mjs'
189 )
190 ).toThrow(
191 new RangeError(
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
193 )
194 )
195 expect(
196 () =>
197 new DynamicThreadPool(
198 0,
199 0,
200 './tests/worker-files/thread/testWorker.mjs'
201 )
202 ).toThrow(
203 new RangeError(
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
205 )
206 )
207 expect(
208 () =>
209 new DynamicClusterPool(
210 1,
211 1,
212 './tests/worker-files/cluster/testWorker.cjs'
213 )
214 ).toThrow(
215 new RangeError(
216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
217 )
218 )
219 })
220
221 it('Verify that pool options are checked', async () => {
222 let pool = new FixedThreadPool(
223 numberOfWorkers,
224 './tests/worker-files/thread/testWorker.mjs'
225 )
226 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
227 expect(pool.emitter.eventNames()).toStrictEqual([])
228 expect(pool.opts).toStrictEqual({
229 startWorkers: true,
230 enableEvents: true,
231 restartWorkerOnError: true,
232 enableTasksQueue: false,
233 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
234 })
235 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
236 .workerChoiceStrategies) {
237 expect(workerChoiceStrategy.opts).toStrictEqual({
238 runTime: { median: false },
239 waitTime: { median: false },
240 elu: { median: false },
241 weights: expect.objectContaining({
242 0: expect.any(Number),
243 [pool.info.maxSize - 1]: expect.any(Number)
244 })
245 })
246 }
247 await pool.destroy()
248 const testHandler = () => console.info('test handler executed')
249 pool = new FixedThreadPool(
250 numberOfWorkers,
251 './tests/worker-files/thread/testWorker.mjs',
252 {
253 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
254 workerChoiceStrategyOptions: {
255 runTime: { median: true },
256 weights: { 0: 300, 1: 200 }
257 },
258 enableEvents: false,
259 restartWorkerOnError: false,
260 enableTasksQueue: true,
261 tasksQueueOptions: { concurrency: 2 },
262 messageHandler: testHandler,
263 errorHandler: testHandler,
264 onlineHandler: testHandler,
265 exitHandler: testHandler
266 }
267 )
268 expect(pool.emitter).toBeUndefined()
269 expect(pool.opts).toStrictEqual({
270 startWorkers: true,
271 enableEvents: false,
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
274 tasksQueueOptions: {
275 concurrency: 2,
276 size: Math.pow(numberOfWorkers, 2),
277 taskStealing: true,
278 tasksStealingOnBackPressure: false,
279 tasksFinishedTimeout: 2000
280 },
281 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
282 workerChoiceStrategyOptions: {
283 runTime: { median: true },
284 weights: { 0: 300, 1: 200 }
285 },
286 onlineHandler: testHandler,
287 messageHandler: testHandler,
288 errorHandler: testHandler,
289 exitHandler: testHandler
290 })
291 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
292 .workerChoiceStrategies) {
293 expect(workerChoiceStrategy.opts).toStrictEqual({
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
298 })
299 }
300 await pool.destroy()
301 })
302
303 it('Verify that pool options are validated', () => {
304 expect(
305 () =>
306 new FixedThreadPool(
307 numberOfWorkers,
308 './tests/worker-files/thread/testWorker.mjs',
309 {
310 workerChoiceStrategy: 'invalidStrategy'
311 }
312 )
313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
314 expect(
315 () =>
316 new FixedThreadPool(
317 numberOfWorkers,
318 './tests/worker-files/thread/testWorker.mjs',
319 {
320 workerChoiceStrategyOptions: { weights: {} }
321 }
322 )
323 ).toThrow(
324 new Error(
325 'Invalid worker choice strategy options: must have a weight for each worker node'
326 )
327 )
328 expect(
329 () =>
330 new FixedThreadPool(
331 numberOfWorkers,
332 './tests/worker-files/thread/testWorker.mjs',
333 {
334 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
335 }
336 )
337 ).toThrow(
338 new Error(
339 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
340 )
341 )
342 expect(
343 () =>
344 new FixedThreadPool(
345 numberOfWorkers,
346 './tests/worker-files/thread/testWorker.mjs',
347 {
348 enableTasksQueue: true,
349 tasksQueueOptions: 'invalidTasksQueueOptions'
350 }
351 )
352 ).toThrow(
353 new TypeError('Invalid tasks queue options: must be a plain object')
354 )
355 expect(
356 () =>
357 new FixedThreadPool(
358 numberOfWorkers,
359 './tests/worker-files/thread/testWorker.mjs',
360 {
361 enableTasksQueue: true,
362 tasksQueueOptions: { concurrency: 0 }
363 }
364 )
365 ).toThrow(
366 new RangeError(
367 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
368 )
369 )
370 expect(
371 () =>
372 new FixedThreadPool(
373 numberOfWorkers,
374 './tests/worker-files/thread/testWorker.mjs',
375 {
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: -1 }
378 }
379 )
380 ).toThrow(
381 new RangeError(
382 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
383 )
384 )
385 expect(
386 () =>
387 new FixedThreadPool(
388 numberOfWorkers,
389 './tests/worker-files/thread/testWorker.mjs',
390 {
391 enableTasksQueue: true,
392 tasksQueueOptions: { concurrency: 0.2 }
393 }
394 )
395 ).toThrow(
396 new TypeError('Invalid worker node tasks concurrency: must be an integer')
397 )
398 expect(
399 () =>
400 new FixedThreadPool(
401 numberOfWorkers,
402 './tests/worker-files/thread/testWorker.mjs',
403 {
404 enableTasksQueue: true,
405 tasksQueueOptions: { size: 0 }
406 }
407 )
408 ).toThrow(
409 new RangeError(
410 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
411 )
412 )
413 expect(
414 () =>
415 new FixedThreadPool(
416 numberOfWorkers,
417 './tests/worker-files/thread/testWorker.mjs',
418 {
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: -1 }
421 }
422 )
423 ).toThrow(
424 new RangeError(
425 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
426 )
427 )
428 expect(
429 () =>
430 new FixedThreadPool(
431 numberOfWorkers,
432 './tests/worker-files/thread/testWorker.mjs',
433 {
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: 0.2 }
436 }
437 )
438 ).toThrow(
439 new TypeError('Invalid worker node tasks queue size: must be an integer')
440 )
441 })
442
443 it('Verify that pool worker choice strategy options can be set', async () => {
444 const pool = new FixedThreadPool(
445 numberOfWorkers,
446 './tests/worker-files/thread/testWorker.mjs',
447 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
448 )
449 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
450 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
451 .workerChoiceStrategies) {
452 expect(workerChoiceStrategy.opts).toStrictEqual({
453 runTime: { median: false },
454 waitTime: { median: false },
455 elu: { median: false },
456 weights: expect.objectContaining({
457 0: expect.any(Number),
458 [pool.info.maxSize - 1]: expect.any(Number)
459 })
460 })
461 }
462 expect(
463 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
464 ).toStrictEqual({
465 runTime: {
466 aggregate: true,
467 average: true,
468 median: false
469 },
470 waitTime: {
471 aggregate: true,
472 average: true,
473 median: false
474 },
475 elu: {
476 aggregate: true,
477 average: true,
478 median: false
479 }
480 })
481 pool.setWorkerChoiceStrategyOptions({
482 runTime: { median: true },
483 elu: { median: true }
484 })
485 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
486 runTime: { median: true },
487 elu: { median: true }
488 })
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
490 .workerChoiceStrategies) {
491 expect(workerChoiceStrategy.opts).toStrictEqual({
492 runTime: { median: true },
493 waitTime: { median: false },
494 elu: { median: true },
495 weights: expect.objectContaining({
496 0: expect.any(Number),
497 [pool.info.maxSize - 1]: expect.any(Number)
498 })
499 })
500 }
501 expect(
502 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
503 ).toStrictEqual({
504 runTime: {
505 aggregate: true,
506 average: false,
507 median: true
508 },
509 waitTime: {
510 aggregate: true,
511 average: true,
512 median: false
513 },
514 elu: {
515 aggregate: true,
516 average: false,
517 median: true
518 }
519 })
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: false },
522 elu: { median: false }
523 })
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
525 runTime: { median: false },
526 elu: { median: false }
527 })
528 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
529 .workerChoiceStrategies) {
530 expect(workerChoiceStrategy.opts).toStrictEqual({
531 runTime: { median: false },
532 waitTime: { median: false },
533 elu: { median: false },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
537 })
538 })
539 }
540 expect(
541 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
542 ).toStrictEqual({
543 runTime: {
544 aggregate: true,
545 average: true,
546 median: false
547 },
548 waitTime: {
549 aggregate: true,
550 average: true,
551 median: false
552 },
553 elu: {
554 aggregate: true,
555 average: true,
556 median: false
557 }
558 })
559 expect(() =>
560 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
561 ).toThrow(
562 new TypeError(
563 'Invalid worker choice strategy options: must be a plain object'
564 )
565 )
566 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
567 new Error(
568 'Invalid worker choice strategy options: must have a weight for each worker node'
569 )
570 )
571 expect(() =>
572 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
573 ).toThrow(
574 new Error(
575 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
576 )
577 )
578 await pool.destroy()
579 })
580
581 it('Verify that pool tasks queue can be enabled/disabled', async () => {
582 const pool = new FixedThreadPool(
583 numberOfWorkers,
584 './tests/worker-files/thread/testWorker.mjs'
585 )
586 expect(pool.opts.enableTasksQueue).toBe(false)
587 expect(pool.opts.tasksQueueOptions).toBeUndefined()
588 pool.enableTasksQueue(true)
589 expect(pool.opts.enableTasksQueue).toBe(true)
590 expect(pool.opts.tasksQueueOptions).toStrictEqual({
591 concurrency: 1,
592 size: Math.pow(numberOfWorkers, 2),
593 taskStealing: true,
594 tasksStealingOnBackPressure: false,
595 tasksFinishedTimeout: 2000
596 })
597 pool.enableTasksQueue(true, { concurrency: 2 })
598 expect(pool.opts.enableTasksQueue).toBe(true)
599 expect(pool.opts.tasksQueueOptions).toStrictEqual({
600 concurrency: 2,
601 size: Math.pow(numberOfWorkers, 2),
602 taskStealing: true,
603 tasksStealingOnBackPressure: false,
604 tasksFinishedTimeout: 2000
605 })
606 pool.enableTasksQueue(false)
607 expect(pool.opts.enableTasksQueue).toBe(false)
608 expect(pool.opts.tasksQueueOptions).toBeUndefined()
609 await pool.destroy()
610 })
611
612 it('Verify that pool tasks queue options can be set', async () => {
613 const pool = new FixedThreadPool(
614 numberOfWorkers,
615 './tests/worker-files/thread/testWorker.mjs',
616 { enableTasksQueue: true }
617 )
618 expect(pool.opts.tasksQueueOptions).toStrictEqual({
619 concurrency: 1,
620 size: Math.pow(numberOfWorkers, 2),
621 taskStealing: true,
622 tasksStealingOnBackPressure: false,
623 tasksFinishedTimeout: 2000
624 })
625 for (const workerNode of pool.workerNodes) {
626 expect(workerNode.tasksQueueBackPressureSize).toBe(
627 pool.opts.tasksQueueOptions.size
628 )
629 }
630 pool.setTasksQueueOptions({
631 concurrency: 2,
632 size: 2,
633 taskStealing: false,
634 tasksStealingOnBackPressure: false,
635 tasksFinishedTimeout: 3000
636 })
637 expect(pool.opts.tasksQueueOptions).toStrictEqual({
638 concurrency: 2,
639 size: 2,
640 taskStealing: false,
641 tasksStealingOnBackPressure: false,
642 tasksFinishedTimeout: 3000
643 })
644 for (const workerNode of pool.workerNodes) {
645 expect(workerNode.tasksQueueBackPressureSize).toBe(
646 pool.opts.tasksQueueOptions.size
647 )
648 }
649 pool.setTasksQueueOptions({
650 concurrency: 1,
651 taskStealing: true,
652 tasksStealingOnBackPressure: true
653 })
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
655 concurrency: 1,
656 size: Math.pow(numberOfWorkers, 2),
657 taskStealing: true,
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 2000
660 })
661 for (const workerNode of pool.workerNodes) {
662 expect(workerNode.tasksQueueBackPressureSize).toBe(
663 pool.opts.tasksQueueOptions.size
664 )
665 }
666 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
667 new TypeError('Invalid tasks queue options: must be a plain object')
668 )
669 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
670 new RangeError(
671 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
672 )
673 )
674 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
675 new RangeError(
676 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
677 )
678 )
679 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
680 new TypeError('Invalid worker node tasks concurrency: must be an integer')
681 )
682 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
683 new RangeError(
684 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
685 )
686 )
687 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
688 new RangeError(
689 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
690 )
691 )
692 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
693 new TypeError('Invalid worker node tasks queue size: must be an integer')
694 )
695 await pool.destroy()
696 })
697
698 it('Verify that pool info is set', async () => {
699 let pool = new FixedThreadPool(
700 numberOfWorkers,
701 './tests/worker-files/thread/testWorker.mjs'
702 )
703 expect(pool.info).toStrictEqual({
704 version,
705 type: PoolTypes.fixed,
706 worker: WorkerTypes.thread,
707 started: true,
708 ready: true,
709 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
710 strategyRetries: 0,
711 minSize: numberOfWorkers,
712 maxSize: numberOfWorkers,
713 workerNodes: numberOfWorkers,
714 idleWorkerNodes: numberOfWorkers,
715 busyWorkerNodes: 0,
716 executedTasks: 0,
717 executingTasks: 0,
718 failedTasks: 0
719 })
720 await pool.destroy()
721 pool = new DynamicClusterPool(
722 Math.floor(numberOfWorkers / 2),
723 numberOfWorkers,
724 './tests/worker-files/cluster/testWorker.cjs'
725 )
726 expect(pool.info).toStrictEqual({
727 version,
728 type: PoolTypes.dynamic,
729 worker: WorkerTypes.cluster,
730 started: true,
731 ready: true,
732 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
733 strategyRetries: 0,
734 minSize: Math.floor(numberOfWorkers / 2),
735 maxSize: numberOfWorkers,
736 workerNodes: Math.floor(numberOfWorkers / 2),
737 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
738 busyWorkerNodes: 0,
739 executedTasks: 0,
740 executingTasks: 0,
741 failedTasks: 0
742 })
743 await pool.destroy()
744 })
745
746 it('Verify that pool worker tasks usage are initialized', async () => {
747 const pool = new FixedClusterPool(
748 numberOfWorkers,
749 './tests/worker-files/cluster/testWorker.cjs'
750 )
751 for (const workerNode of pool.workerNodes) {
752 expect(workerNode).toBeInstanceOf(WorkerNode)
753 expect(workerNode.usage).toStrictEqual({
754 tasks: {
755 executed: 0,
756 executing: 0,
757 queued: 0,
758 maxQueued: 0,
759 sequentiallyStolen: 0,
760 stolen: 0,
761 failed: 0
762 },
763 runTime: {
764 history: new CircularArray()
765 },
766 waitTime: {
767 history: new CircularArray()
768 },
769 elu: {
770 idle: {
771 history: new CircularArray()
772 },
773 active: {
774 history: new CircularArray()
775 }
776 }
777 })
778 }
779 await pool.destroy()
780 })
781
782 it('Verify that pool worker tasks queue are initialized', async () => {
783 let pool = new FixedClusterPool(
784 numberOfWorkers,
785 './tests/worker-files/cluster/testWorker.cjs'
786 )
787 for (const workerNode of pool.workerNodes) {
788 expect(workerNode).toBeInstanceOf(WorkerNode)
789 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
790 expect(workerNode.tasksQueue.size).toBe(0)
791 expect(workerNode.tasksQueue.maxSize).toBe(0)
792 expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2)
793 }
794 await pool.destroy()
795 pool = new DynamicThreadPool(
796 Math.floor(numberOfWorkers / 2),
797 numberOfWorkers,
798 './tests/worker-files/thread/testWorker.mjs'
799 )
800 for (const workerNode of pool.workerNodes) {
801 expect(workerNode).toBeInstanceOf(WorkerNode)
802 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
803 expect(workerNode.tasksQueue.size).toBe(0)
804 expect(workerNode.tasksQueue.maxSize).toBe(0)
805 expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2)
806 }
807 await pool.destroy()
808 })
809
810 it('Verify that pool worker info are initialized', async () => {
811 let pool = new FixedClusterPool(
812 numberOfWorkers,
813 './tests/worker-files/cluster/testWorker.cjs'
814 )
815 for (const workerNode of pool.workerNodes) {
816 expect(workerNode).toBeInstanceOf(WorkerNode)
817 expect(workerNode.info).toStrictEqual({
818 id: expect.any(Number),
819 type: WorkerTypes.cluster,
820 dynamic: false,
821 ready: true,
822 stealing: false,
823 backPressure: false
824 })
825 }
826 await pool.destroy()
827 pool = new DynamicThreadPool(
828 Math.floor(numberOfWorkers / 2),
829 numberOfWorkers,
830 './tests/worker-files/thread/testWorker.mjs'
831 )
832 for (const workerNode of pool.workerNodes) {
833 expect(workerNode).toBeInstanceOf(WorkerNode)
834 expect(workerNode.info).toStrictEqual({
835 id: expect.any(Number),
836 type: WorkerTypes.thread,
837 dynamic: false,
838 ready: true,
839 stealing: false,
840 backPressure: false
841 })
842 }
843 await pool.destroy()
844 })
845
846 it('Verify that pool statuses are checked at start or destroy', async () => {
847 const pool = new FixedThreadPool(
848 numberOfWorkers,
849 './tests/worker-files/thread/testWorker.mjs'
850 )
851 expect(pool.info.started).toBe(true)
852 expect(pool.info.ready).toBe(true)
853 expect(() => pool.start()).toThrow(
854 new Error('Cannot start an already started pool')
855 )
856 await pool.destroy()
857 expect(pool.info.started).toBe(false)
858 expect(pool.info.ready).toBe(false)
859 await expect(pool.destroy()).rejects.toThrow(
860 new Error('Cannot destroy an already destroyed pool')
861 )
862 })
863
864 it('Verify that pool can be started after initialization', async () => {
865 const pool = new FixedClusterPool(
866 numberOfWorkers,
867 './tests/worker-files/cluster/testWorker.cjs',
868 {
869 startWorkers: false
870 }
871 )
872 expect(pool.info.started).toBe(false)
873 expect(pool.info.ready).toBe(false)
874 expect(pool.workerNodes).toStrictEqual([])
875 expect(pool.readyEventEmitted).toBe(false)
876 await expect(pool.execute()).rejects.toThrow(
877 new Error('Cannot execute a task on not started pool')
878 )
879 pool.start()
880 expect(pool.info.started).toBe(true)
881 expect(pool.info.ready).toBe(true)
882 await waitPoolEvents(pool, PoolEvents.ready, 1)
883 expect(pool.readyEventEmitted).toBe(true)
884 expect(pool.workerNodes.length).toBe(numberOfWorkers)
885 for (const workerNode of pool.workerNodes) {
886 expect(workerNode).toBeInstanceOf(WorkerNode)
887 }
888 await pool.destroy()
889 })
890
891 it('Verify that pool execute() arguments are checked', async () => {
892 const pool = new FixedClusterPool(
893 numberOfWorkers,
894 './tests/worker-files/cluster/testWorker.cjs'
895 )
896 await expect(pool.execute(undefined, 0)).rejects.toThrow(
897 new TypeError('name argument must be a string')
898 )
899 await expect(pool.execute(undefined, '')).rejects.toThrow(
900 new TypeError('name argument must not be an empty string')
901 )
902 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
903 new TypeError('transferList argument must be an array')
904 )
905 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
906 "Task function 'unknown' not found"
907 )
908 await pool.destroy()
909 await expect(pool.execute()).rejects.toThrow(
910 new Error('Cannot execute a task on not started pool')
911 )
912 })
913
914 it('Verify that pool worker tasks usage are computed', async () => {
915 const pool = new FixedClusterPool(
916 numberOfWorkers,
917 './tests/worker-files/cluster/testWorker.cjs'
918 )
919 const promises = new Set()
920 const maxMultiplier = 2
921 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
922 promises.add(pool.execute())
923 }
924 for (const workerNode of pool.workerNodes) {
925 expect(workerNode.usage).toStrictEqual({
926 tasks: {
927 executed: 0,
928 executing: maxMultiplier,
929 queued: 0,
930 maxQueued: 0,
931 sequentiallyStolen: 0,
932 stolen: 0,
933 failed: 0
934 },
935 runTime: {
936 history: expect.any(CircularArray)
937 },
938 waitTime: {
939 history: expect.any(CircularArray)
940 },
941 elu: {
942 idle: {
943 history: expect.any(CircularArray)
944 },
945 active: {
946 history: expect.any(CircularArray)
947 }
948 }
949 })
950 }
951 await Promise.all(promises)
952 for (const workerNode of pool.workerNodes) {
953 expect(workerNode.usage).toStrictEqual({
954 tasks: {
955 executed: maxMultiplier,
956 executing: 0,
957 queued: 0,
958 maxQueued: 0,
959 sequentiallyStolen: 0,
960 stolen: 0,
961 failed: 0
962 },
963 runTime: {
964 history: expect.any(CircularArray)
965 },
966 waitTime: {
967 history: expect.any(CircularArray)
968 },
969 elu: {
970 idle: {
971 history: expect.any(CircularArray)
972 },
973 active: {
974 history: expect.any(CircularArray)
975 }
976 }
977 })
978 }
979 await pool.destroy()
980 })
981
982 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
983 const pool = new DynamicThreadPool(
984 Math.floor(numberOfWorkers / 2),
985 numberOfWorkers,
986 './tests/worker-files/thread/testWorker.mjs'
987 )
988 const promises = new Set()
989 const maxMultiplier = 2
990 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
991 promises.add(pool.execute())
992 }
993 await Promise.all(promises)
994 for (const workerNode of pool.workerNodes) {
995 expect(workerNode.usage).toStrictEqual({
996 tasks: {
997 executed: expect.any(Number),
998 executing: 0,
999 queued: 0,
1000 maxQueued: 0,
1001 sequentiallyStolen: 0,
1002 stolen: 0,
1003 failed: 0
1004 },
1005 runTime: {
1006 history: expect.any(CircularArray)
1007 },
1008 waitTime: {
1009 history: expect.any(CircularArray)
1010 },
1011 elu: {
1012 idle: {
1013 history: expect.any(CircularArray)
1014 },
1015 active: {
1016 history: expect.any(CircularArray)
1017 }
1018 }
1019 })
1020 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1021 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1022 numberOfWorkers * maxMultiplier
1023 )
1024 expect(workerNode.usage.runTime.history.length).toBe(0)
1025 expect(workerNode.usage.waitTime.history.length).toBe(0)
1026 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1027 expect(workerNode.usage.elu.active.history.length).toBe(0)
1028 }
1029 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1030 for (const workerNode of pool.workerNodes) {
1031 expect(workerNode.usage).toStrictEqual({
1032 tasks: {
1033 executed: expect.any(Number),
1034 executing: 0,
1035 queued: 0,
1036 maxQueued: 0,
1037 sequentiallyStolen: 0,
1038 stolen: 0,
1039 failed: 0
1040 },
1041 runTime: {
1042 history: expect.any(CircularArray)
1043 },
1044 waitTime: {
1045 history: expect.any(CircularArray)
1046 },
1047 elu: {
1048 idle: {
1049 history: expect.any(CircularArray)
1050 },
1051 active: {
1052 history: expect.any(CircularArray)
1053 }
1054 }
1055 })
1056 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1057 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1058 numberOfWorkers * maxMultiplier
1059 )
1060 expect(workerNode.usage.runTime.history.length).toBe(0)
1061 expect(workerNode.usage.waitTime.history.length).toBe(0)
1062 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1063 expect(workerNode.usage.elu.active.history.length).toBe(0)
1064 }
1065 await pool.destroy()
1066 })
1067
1068 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1069 const pool = new DynamicClusterPool(
1070 Math.floor(numberOfWorkers / 2),
1071 numberOfWorkers,
1072 './tests/worker-files/cluster/testWorker.cjs'
1073 )
1074 expect(pool.emitter.eventNames()).toStrictEqual([])
1075 let poolInfo
1076 let poolReady = 0
1077 pool.emitter.on(PoolEvents.ready, info => {
1078 ++poolReady
1079 poolInfo = info
1080 })
1081 await waitPoolEvents(pool, PoolEvents.ready, 1)
1082 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1083 expect(poolReady).toBe(1)
1084 expect(poolInfo).toStrictEqual({
1085 version,
1086 type: PoolTypes.dynamic,
1087 worker: WorkerTypes.cluster,
1088 started: true,
1089 ready: true,
1090 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1091 strategyRetries: expect.any(Number),
1092 minSize: expect.any(Number),
1093 maxSize: expect.any(Number),
1094 workerNodes: expect.any(Number),
1095 idleWorkerNodes: expect.any(Number),
1096 busyWorkerNodes: expect.any(Number),
1097 executedTasks: expect.any(Number),
1098 executingTasks: expect.any(Number),
1099 failedTasks: expect.any(Number)
1100 })
1101 await pool.destroy()
1102 })
1103
1104 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1105 const pool = new FixedThreadPool(
1106 numberOfWorkers,
1107 './tests/worker-files/thread/testWorker.mjs'
1108 )
1109 expect(pool.emitter.eventNames()).toStrictEqual([])
1110 const promises = new Set()
1111 let poolBusy = 0
1112 let poolInfo
1113 pool.emitter.on(PoolEvents.busy, info => {
1114 ++poolBusy
1115 poolInfo = info
1116 })
1117 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1118 for (let i = 0; i < numberOfWorkers * 2; i++) {
1119 promises.add(pool.execute())
1120 }
1121 await Promise.all(promises)
1122 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1123 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1124 expect(poolBusy).toBe(numberOfWorkers + 1)
1125 expect(poolInfo).toStrictEqual({
1126 version,
1127 type: PoolTypes.fixed,
1128 worker: WorkerTypes.thread,
1129 started: true,
1130 ready: true,
1131 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1132 strategyRetries: expect.any(Number),
1133 minSize: expect.any(Number),
1134 maxSize: expect.any(Number),
1135 workerNodes: expect.any(Number),
1136 idleWorkerNodes: expect.any(Number),
1137 busyWorkerNodes: expect.any(Number),
1138 executedTasks: expect.any(Number),
1139 executingTasks: expect.any(Number),
1140 failedTasks: expect.any(Number)
1141 })
1142 await pool.destroy()
1143 })
1144
1145 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1146 const pool = new DynamicThreadPool(
1147 Math.floor(numberOfWorkers / 2),
1148 numberOfWorkers,
1149 './tests/worker-files/thread/testWorker.mjs'
1150 )
1151 expect(pool.emitter.eventNames()).toStrictEqual([])
1152 const promises = new Set()
1153 let poolFull = 0
1154 let poolInfo
1155 pool.emitter.on(PoolEvents.full, info => {
1156 ++poolFull
1157 poolInfo = info
1158 })
1159 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1160 for (let i = 0; i < numberOfWorkers * 2; i++) {
1161 promises.add(pool.execute())
1162 }
1163 await Promise.all(promises)
1164 expect(poolFull).toBe(1)
1165 expect(poolInfo).toStrictEqual({
1166 version,
1167 type: PoolTypes.dynamic,
1168 worker: WorkerTypes.thread,
1169 started: true,
1170 ready: true,
1171 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1172 strategyRetries: expect.any(Number),
1173 minSize: expect.any(Number),
1174 maxSize: expect.any(Number),
1175 workerNodes: expect.any(Number),
1176 idleWorkerNodes: expect.any(Number),
1177 busyWorkerNodes: expect.any(Number),
1178 executedTasks: expect.any(Number),
1179 executingTasks: expect.any(Number),
1180 failedTasks: expect.any(Number)
1181 })
1182 await pool.destroy()
1183 })
1184
1185 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1186 const pool = new FixedThreadPool(
1187 numberOfWorkers,
1188 './tests/worker-files/thread/testWorker.mjs',
1189 {
1190 enableTasksQueue: true
1191 }
1192 )
1193 stub(pool, 'hasBackPressure').returns(true)
1194 expect(pool.emitter.eventNames()).toStrictEqual([])
1195 const promises = new Set()
1196 let poolBackPressure = 0
1197 let poolInfo
1198 pool.emitter.on(PoolEvents.backPressure, info => {
1199 ++poolBackPressure
1200 poolInfo = info
1201 })
1202 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1203 for (let i = 0; i < numberOfWorkers + 1; i++) {
1204 promises.add(pool.execute())
1205 }
1206 await Promise.all(promises)
1207 expect(poolBackPressure).toBe(1)
1208 expect(poolInfo).toStrictEqual({
1209 version,
1210 type: PoolTypes.fixed,
1211 worker: WorkerTypes.thread,
1212 started: true,
1213 ready: true,
1214 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1215 strategyRetries: expect.any(Number),
1216 minSize: expect.any(Number),
1217 maxSize: expect.any(Number),
1218 workerNodes: expect.any(Number),
1219 idleWorkerNodes: expect.any(Number),
1220 stealingWorkerNodes: expect.any(Number),
1221 busyWorkerNodes: expect.any(Number),
1222 executedTasks: expect.any(Number),
1223 executingTasks: expect.any(Number),
1224 maxQueuedTasks: expect.any(Number),
1225 queuedTasks: expect.any(Number),
1226 backPressure: true,
1227 stolenTasks: expect.any(Number),
1228 failedTasks: expect.any(Number)
1229 })
1230 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1231 await pool.destroy()
1232 })
1233
1234 it('Verify that destroy() waits for queued tasks to finish', async () => {
1235 const tasksFinishedTimeout = 2500
1236 const pool = new FixedThreadPool(
1237 numberOfWorkers,
1238 './tests/worker-files/thread/asyncWorker.mjs',
1239 {
1240 enableTasksQueue: true,
1241 tasksQueueOptions: { tasksFinishedTimeout }
1242 }
1243 )
1244 const maxMultiplier = 4
1245 let tasksFinished = 0
1246 for (const workerNode of pool.workerNodes) {
1247 workerNode.on('taskFinished', () => {
1248 ++tasksFinished
1249 })
1250 }
1251 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1252 pool.execute()
1253 }
1254 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1255 const startTime = performance.now()
1256 await pool.destroy()
1257 const elapsedTime = performance.now() - startTime
1258 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1259 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1260 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1261 })
1262
1263 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1264 const tasksFinishedTimeout = 1000
1265 const pool = new FixedThreadPool(
1266 numberOfWorkers,
1267 './tests/worker-files/thread/asyncWorker.mjs',
1268 {
1269 enableTasksQueue: true,
1270 tasksQueueOptions: { tasksFinishedTimeout }
1271 }
1272 )
1273 const maxMultiplier = 4
1274 let tasksFinished = 0
1275 for (const workerNode of pool.workerNodes) {
1276 workerNode.on('taskFinished', () => {
1277 ++tasksFinished
1278 })
1279 }
1280 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1281 pool.execute()
1282 }
1283 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1284 const startTime = performance.now()
1285 await pool.destroy()
1286 const elapsedTime = performance.now() - startTime
1287 expect(tasksFinished).toBe(0)
1288 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1289 })
1290
1291 it('Verify that pool asynchronous resource track tasks execution', async () => {
1292 let taskAsyncId
1293 let initCalls = 0
1294 let beforeCalls = 0
1295 let afterCalls = 0
1296 let resolveCalls = 0
1297 const hook = createHook({
1298 init (asyncId, type) {
1299 if (type === 'poolifier:task') {
1300 initCalls++
1301 taskAsyncId = asyncId
1302 }
1303 },
1304 before (asyncId) {
1305 if (asyncId === taskAsyncId) beforeCalls++
1306 },
1307 after (asyncId) {
1308 if (asyncId === taskAsyncId) afterCalls++
1309 },
1310 promiseResolve () {
1311 if (executionAsyncId() === taskAsyncId) resolveCalls++
1312 }
1313 })
1314 const pool = new FixedThreadPool(
1315 numberOfWorkers,
1316 './tests/worker-files/thread/testWorker.mjs'
1317 )
1318 hook.enable()
1319 await pool.execute()
1320 hook.disable()
1321 expect(initCalls).toBe(1)
1322 expect(beforeCalls).toBe(1)
1323 expect(afterCalls).toBe(1)
1324 expect(resolveCalls).toBe(1)
1325 await pool.destroy()
1326 })
1327
1328 it('Verify that hasTaskFunction() is working', async () => {
1329 const dynamicThreadPool = new DynamicThreadPool(
1330 Math.floor(numberOfWorkers / 2),
1331 numberOfWorkers,
1332 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1333 )
1334 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1335 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1336 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1337 true
1338 )
1339 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1340 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1341 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1342 await dynamicThreadPool.destroy()
1343 const fixedClusterPool = new FixedClusterPool(
1344 numberOfWorkers,
1345 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1346 )
1347 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1348 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1349 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1350 true
1351 )
1352 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1353 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1354 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1355 await fixedClusterPool.destroy()
1356 })
1357
1358 it('Verify that addTaskFunction() is working', async () => {
1359 const dynamicThreadPool = new DynamicThreadPool(
1360 Math.floor(numberOfWorkers / 2),
1361 numberOfWorkers,
1362 './tests/worker-files/thread/testWorker.mjs'
1363 )
1364 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1365 await expect(
1366 dynamicThreadPool.addTaskFunction(0, () => {})
1367 ).rejects.toThrow(new TypeError('name argument must be a string'))
1368 await expect(
1369 dynamicThreadPool.addTaskFunction('', () => {})
1370 ).rejects.toThrow(
1371 new TypeError('name argument must not be an empty string')
1372 )
1373 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1374 new TypeError('taskFunction property must be a function')
1375 )
1376 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1377 new TypeError('taskFunction property must be a function')
1378 )
1379 await expect(
1380 dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
1381 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1382 await expect(
1383 dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
1384 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1385 await expect(
1386 dynamicThreadPool.addTaskFunction('test', {
1387 taskFunction: () => {},
1388 priority: -21
1389 })
1390 ).rejects.toThrow(
1391 new RangeError("Property 'priority' must be between -20 and 19")
1392 )
1393 await expect(
1394 dynamicThreadPool.addTaskFunction('test', {
1395 taskFunction: () => {},
1396 priority: 20
1397 })
1398 ).rejects.toThrow(
1399 new RangeError("Property 'priority' must be between -20 and 19")
1400 )
1401 await expect(
1402 dynamicThreadPool.addTaskFunction('test', {
1403 taskFunction: () => {},
1404 strategy: 'invalidStrategy'
1405 })
1406 ).rejects.toThrow(
1407 new Error("Invalid worker choice strategy 'invalidStrategy'")
1408 )
1409 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1410 { name: DEFAULT_TASK_NAME },
1411 { name: 'test' }
1412 ])
1413 expect([
1414 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1415 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1416 const echoTaskFunction = data => {
1417 return data
1418 }
1419 await expect(
1420 dynamicThreadPool.addTaskFunction('echo', {
1421 taskFunction: echoTaskFunction,
1422 strategy: WorkerChoiceStrategies.LEAST_ELU
1423 })
1424 ).resolves.toBe(true)
1425 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1426 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1427 taskFunction: echoTaskFunction,
1428 strategy: WorkerChoiceStrategies.LEAST_ELU
1429 })
1430 expect([
1431 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1432 ]).toStrictEqual([
1433 WorkerChoiceStrategies.ROUND_ROBIN,
1434 WorkerChoiceStrategies.LEAST_ELU
1435 ])
1436 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1437 { name: DEFAULT_TASK_NAME },
1438 { name: 'test' },
1439 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
1440 ])
1441 const taskFunctionData = { test: 'test' }
1442 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1443 expect(echoResult).toStrictEqual(taskFunctionData)
1444 for (const workerNode of dynamicThreadPool.workerNodes) {
1445 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1446 tasks: {
1447 executed: expect.any(Number),
1448 executing: 0,
1449 queued: 0,
1450 sequentiallyStolen: 0,
1451 stolen: 0,
1452 failed: 0
1453 },
1454 runTime: {
1455 history: new CircularArray()
1456 },
1457 waitTime: {
1458 history: new CircularArray()
1459 },
1460 elu: expect.objectContaining({
1461 idle: expect.objectContaining({
1462 history: expect.any(CircularArray)
1463 }),
1464 active: expect.objectContaining({
1465 history: expect.any(CircularArray)
1466 })
1467 })
1468 })
1469 expect(
1470 workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
1471 ).toBeGreaterThan(0)
1472 if (
1473 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
1474 null
1475 ) {
1476 expect(
1477 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1478 ).toBeUndefined()
1479 } else {
1480 expect(
1481 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1482 ).toBeGreaterThan(0)
1483 }
1484 if (
1485 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
1486 ) {
1487 expect(
1488 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1489 ).toBeUndefined()
1490 } else {
1491 expect(
1492 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1493 ).toBeGreaterThanOrEqual(0)
1494 }
1495 if (
1496 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
1497 ) {
1498 expect(
1499 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1500 ).toBeUndefined()
1501 } else {
1502 expect(
1503 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1504 ).toBeGreaterThanOrEqual(0)
1505 expect(
1506 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1507 ).toBeLessThanOrEqual(1)
1508 }
1509 }
1510 await dynamicThreadPool.destroy()
1511 })
1512
1513 it('Verify that removeTaskFunction() is working', async () => {
1514 const dynamicThreadPool = new DynamicThreadPool(
1515 Math.floor(numberOfWorkers / 2),
1516 numberOfWorkers,
1517 './tests/worker-files/thread/testWorker.mjs'
1518 )
1519 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1520 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1521 { name: DEFAULT_TASK_NAME },
1522 { name: 'test' }
1523 ])
1524 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1525 new Error('Cannot remove a task function not handled on the pool side')
1526 )
1527 const echoTaskFunction = data => {
1528 return data
1529 }
1530 await dynamicThreadPool.addTaskFunction('echo', {
1531 taskFunction: echoTaskFunction,
1532 strategy: WorkerChoiceStrategies.LEAST_ELU
1533 })
1534 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1535 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1536 taskFunction: echoTaskFunction,
1537 strategy: WorkerChoiceStrategies.LEAST_ELU
1538 })
1539 expect([
1540 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1541 ]).toStrictEqual([
1542 WorkerChoiceStrategies.ROUND_ROBIN,
1543 WorkerChoiceStrategies.LEAST_ELU
1544 ])
1545 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1546 { name: DEFAULT_TASK_NAME },
1547 { name: 'test' },
1548 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
1549 ])
1550 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1551 true
1552 )
1553 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1554 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1555 expect([
1556 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1557 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1558 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1559 { name: DEFAULT_TASK_NAME },
1560 { name: 'test' }
1561 ])
1562 await dynamicThreadPool.destroy()
1563 })
1564
1565 it('Verify that listTaskFunctionsProperties() is working', async () => {
1566 const dynamicThreadPool = new DynamicThreadPool(
1567 Math.floor(numberOfWorkers / 2),
1568 numberOfWorkers,
1569 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1570 )
1571 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1572 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1573 { name: DEFAULT_TASK_NAME },
1574 { name: 'jsonIntegerSerialization' },
1575 { name: 'factorial' },
1576 { name: 'fibonacci' }
1577 ])
1578 await dynamicThreadPool.destroy()
1579 const fixedClusterPool = new FixedClusterPool(
1580 numberOfWorkers,
1581 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1582 )
1583 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1584 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1585 { name: DEFAULT_TASK_NAME },
1586 { name: 'jsonIntegerSerialization' },
1587 { name: 'factorial' },
1588 { name: 'fibonacci' }
1589 ])
1590 await fixedClusterPool.destroy()
1591 })
1592
1593 it('Verify that setDefaultTaskFunction() is working', async () => {
1594 const dynamicThreadPool = new DynamicThreadPool(
1595 Math.floor(numberOfWorkers / 2),
1596 numberOfWorkers,
1597 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1598 )
1599 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1600 const workerId = dynamicThreadPool.workerNodes[0].info.id
1601 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1602 new Error(
1603 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1604 )
1605 )
1606 await expect(
1607 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1608 ).rejects.toThrow(
1609 new Error(
1610 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1611 )
1612 )
1613 await expect(
1614 dynamicThreadPool.setDefaultTaskFunction('unknown')
1615 ).rejects.toThrow(
1616 new Error(
1617 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1618 )
1619 )
1620 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1621 { name: DEFAULT_TASK_NAME },
1622 { name: 'jsonIntegerSerialization' },
1623 { name: 'factorial' },
1624 { name: 'fibonacci' }
1625 ])
1626 await expect(
1627 dynamicThreadPool.setDefaultTaskFunction('factorial')
1628 ).resolves.toBe(true)
1629 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1630 { name: DEFAULT_TASK_NAME },
1631 { name: 'factorial' },
1632 { name: 'jsonIntegerSerialization' },
1633 { name: 'fibonacci' }
1634 ])
1635 await expect(
1636 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1637 ).resolves.toBe(true)
1638 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1639 { name: DEFAULT_TASK_NAME },
1640 { name: 'fibonacci' },
1641 { name: 'jsonIntegerSerialization' },
1642 { name: 'factorial' }
1643 ])
1644 await dynamicThreadPool.destroy()
1645 })
1646
1647 it('Verify that multiple task functions worker is working', async () => {
1648 const pool = new DynamicClusterPool(
1649 Math.floor(numberOfWorkers / 2),
1650 numberOfWorkers,
1651 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1652 )
1653 const data = { n: 10 }
1654 const result0 = await pool.execute(data)
1655 expect(result0).toStrictEqual({ ok: 1 })
1656 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1657 expect(result1).toStrictEqual({ ok: 1 })
1658 const result2 = await pool.execute(data, 'factorial')
1659 expect(result2).toBe(3628800)
1660 const result3 = await pool.execute(data, 'fibonacci')
1661 expect(result3).toBe(55)
1662 expect(pool.info.executingTasks).toBe(0)
1663 expect(pool.info.executedTasks).toBe(4)
1664 for (const workerNode of pool.workerNodes) {
1665 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1666 { name: DEFAULT_TASK_NAME },
1667 { name: 'jsonIntegerSerialization' },
1668 { name: 'factorial' },
1669 { name: 'fibonacci' }
1670 ])
1671 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1672 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1673 expect(
1674 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1675 ).toStrictEqual({
1676 tasks: {
1677 executed: expect.any(Number),
1678 executing: 0,
1679 failed: 0,
1680 queued: 0,
1681 sequentiallyStolen: 0,
1682 stolen: 0
1683 },
1684 runTime: {
1685 history: expect.any(CircularArray)
1686 },
1687 waitTime: {
1688 history: expect.any(CircularArray)
1689 },
1690 elu: {
1691 idle: {
1692 history: expect.any(CircularArray)
1693 },
1694 active: {
1695 history: expect.any(CircularArray)
1696 }
1697 }
1698 })
1699 expect(
1700 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1701 .tasks.executed
1702 ).toBeGreaterThan(0)
1703 }
1704 expect(
1705 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1706 ).toStrictEqual(
1707 workerNode.getTaskFunctionWorkerUsage(
1708 workerNode.info.taskFunctionsProperties[1].name
1709 )
1710 )
1711 }
1712 await pool.destroy()
1713 })
1714
1715 it('Verify sendKillMessageToWorker()', async () => {
1716 const pool = new DynamicClusterPool(
1717 Math.floor(numberOfWorkers / 2),
1718 numberOfWorkers,
1719 './tests/worker-files/cluster/testWorker.cjs'
1720 )
1721 const workerNodeKey = 0
1722 await expect(
1723 pool.sendKillMessageToWorker(workerNodeKey)
1724 ).resolves.toBeUndefined()
1725 await pool.destroy()
1726 })
1727
1728 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1729 const pool = new DynamicClusterPool(
1730 Math.floor(numberOfWorkers / 2),
1731 numberOfWorkers,
1732 './tests/worker-files/cluster/testWorker.cjs'
1733 )
1734 const workerNodeKey = 0
1735 await expect(
1736 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1737 taskFunctionOperation: 'add',
1738 taskFunctionProperties: { name: 'empty' },
1739 taskFunction: (() => {}).toString()
1740 })
1741 ).resolves.toBe(true)
1742 expect(
1743 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
1744 ).toStrictEqual([
1745 { name: DEFAULT_TASK_NAME },
1746 { name: 'test' },
1747 { name: 'empty' }
1748 ])
1749 await pool.destroy()
1750 })
1751
1752 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1753 const pool = new DynamicClusterPool(
1754 Math.floor(numberOfWorkers / 2),
1755 numberOfWorkers,
1756 './tests/worker-files/cluster/testWorker.cjs'
1757 )
1758 await expect(
1759 pool.sendTaskFunctionOperationToWorkers({
1760 taskFunctionOperation: 'add',
1761 taskFunctionProperties: { name: 'empty' },
1762 taskFunction: (() => {}).toString()
1763 })
1764 ).resolves.toBe(true)
1765 for (const workerNode of pool.workerNodes) {
1766 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1767 { name: DEFAULT_TASK_NAME },
1768 { name: 'test' },
1769 { name: 'empty' }
1770 ])
1771 }
1772 await pool.destroy()
1773 })
1774 })