test: fix pool destroy() test flakiness
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
8 import {
9 DynamicClusterPool,
10 DynamicThreadPool,
11 FixedClusterPool,
12 FixedThreadPool,
13 PoolEvents,
14 PoolTypes,
15 WorkerChoiceStrategies,
16 WorkerTypes
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
23
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
26 readFileSync(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
28 'utf8'
29 )
30 ).version
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
33 isMain () {
34 return false
35 }
36 }
37
38 afterEach(() => {
39 restore()
40 })
41
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
52 expect(
53 () =>
54 new StubPoolWithIsMain(
55 numberOfWorkers,
56 './tests/worker-files/thread/testWorker.mjs',
57 {
58 errorHandler: e => console.error(e)
59 }
60 )
61 ).toThrow(
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
65 )
66 })
67
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
71 './tests/worker-files/thread/testWorker.mjs'
72 )
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
76 await pool.destroy()
77 })
78
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
96 './tests/worker-files/thread/testWorker.mjs'
97 )
98 ).toThrow(
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
109 ).toThrow(
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
120 ).toThrow(
121 new TypeError(
122 'Cannot instantiate a pool with a non safe integer number of workers'
123 )
124 )
125 })
126
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
143 it('Verify that dynamic pool sizing is checked', () => {
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
151 ).toThrow(
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
161 './tests/worker-files/thread/testWorker.mjs'
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
173 './tests/worker-files/cluster/testWorker.js'
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
180 expect(
181 () =>
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
202 )
203 )
204 expect(
205 () =>
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
211 ).toThrow(
212 new RangeError(
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
214 )
215 )
216 })
217
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
221 './tests/worker-files/thread/testWorker.mjs'
222 )
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
232 retries:
233 pool.info.maxSize +
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
242 })
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
247 retries:
248 pool.info.maxSize +
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
253 })
254 )
255 }
256 await pool.destroy()
257 const testHandler = () => console.info('test handler executed')
258 pool = new FixedThreadPool(
259 numberOfWorkers,
260 './tests/worker-files/thread/testWorker.mjs',
261 {
262 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
263 workerChoiceStrategyOptions: {
264 runTime: { median: true },
265 weights: { 0: 300, 1: 200 }
266 },
267 enableEvents: false,
268 restartWorkerOnError: false,
269 enableTasksQueue: true,
270 tasksQueueOptions: { concurrency: 2 },
271 messageHandler: testHandler,
272 errorHandler: testHandler,
273 onlineHandler: testHandler,
274 exitHandler: testHandler
275 }
276 )
277 expect(pool.emitter).toBeUndefined()
278 expect(pool.opts).toStrictEqual({
279 startWorkers: true,
280 enableEvents: false,
281 restartWorkerOnError: false,
282 enableTasksQueue: true,
283 tasksQueueOptions: {
284 concurrency: 2,
285 size: Math.pow(numberOfWorkers, 2),
286 taskStealing: true,
287 tasksStealingOnBackPressure: true,
288 tasksFinishedTimeout: 2000
289 },
290 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
291 workerChoiceStrategyOptions: {
292 runTime: { median: true },
293 weights: { 0: 300, 1: 200 }
294 },
295 onlineHandler: testHandler,
296 messageHandler: testHandler,
297 errorHandler: testHandler,
298 exitHandler: testHandler
299 })
300 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
301 retries:
302 pool.info.maxSize +
303 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
304 runTime: { median: true },
305 waitTime: { median: false },
306 elu: { median: false },
307 weights: { 0: 300, 1: 200 }
308 })
309 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
310 .workerChoiceStrategies) {
311 expect(workerChoiceStrategy.opts).toStrictEqual({
312 retries:
313 pool.info.maxSize +
314 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
315 runTime: { median: true },
316 waitTime: { median: false },
317 elu: { median: false },
318 weights: { 0: 300, 1: 200 }
319 })
320 }
321 await pool.destroy()
322 })
323
324 it('Verify that pool options are validated', () => {
325 expect(
326 () =>
327 new FixedThreadPool(
328 numberOfWorkers,
329 './tests/worker-files/thread/testWorker.mjs',
330 {
331 workerChoiceStrategy: 'invalidStrategy'
332 }
333 )
334 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
335 expect(
336 () =>
337 new FixedThreadPool(
338 numberOfWorkers,
339 './tests/worker-files/thread/testWorker.mjs',
340 {
341 workerChoiceStrategyOptions: { weights: {} }
342 }
343 )
344 ).toThrow(
345 new Error(
346 'Invalid worker choice strategy options: must have a weight for each worker node'
347 )
348 )
349 expect(
350 () =>
351 new FixedThreadPool(
352 numberOfWorkers,
353 './tests/worker-files/thread/testWorker.mjs',
354 {
355 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
356 }
357 )
358 ).toThrow(
359 new Error(
360 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
361 )
362 )
363 expect(
364 () =>
365 new FixedThreadPool(
366 numberOfWorkers,
367 './tests/worker-files/thread/testWorker.mjs',
368 {
369 enableTasksQueue: true,
370 tasksQueueOptions: 'invalidTasksQueueOptions'
371 }
372 )
373 ).toThrow(
374 new TypeError('Invalid tasks queue options: must be a plain object')
375 )
376 expect(
377 () =>
378 new FixedThreadPool(
379 numberOfWorkers,
380 './tests/worker-files/thread/testWorker.mjs',
381 {
382 enableTasksQueue: true,
383 tasksQueueOptions: { concurrency: 0 }
384 }
385 )
386 ).toThrow(
387 new RangeError(
388 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
389 )
390 )
391 expect(
392 () =>
393 new FixedThreadPool(
394 numberOfWorkers,
395 './tests/worker-files/thread/testWorker.mjs',
396 {
397 enableTasksQueue: true,
398 tasksQueueOptions: { concurrency: -1 }
399 }
400 )
401 ).toThrow(
402 new RangeError(
403 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
404 )
405 )
406 expect(
407 () =>
408 new FixedThreadPool(
409 numberOfWorkers,
410 './tests/worker-files/thread/testWorker.mjs',
411 {
412 enableTasksQueue: true,
413 tasksQueueOptions: { concurrency: 0.2 }
414 }
415 )
416 ).toThrow(
417 new TypeError('Invalid worker node tasks concurrency: must be an integer')
418 )
419 expect(
420 () =>
421 new FixedThreadPool(
422 numberOfWorkers,
423 './tests/worker-files/thread/testWorker.mjs',
424 {
425 enableTasksQueue: true,
426 tasksQueueOptions: { size: 0 }
427 }
428 )
429 ).toThrow(
430 new RangeError(
431 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
432 )
433 )
434 expect(
435 () =>
436 new FixedThreadPool(
437 numberOfWorkers,
438 './tests/worker-files/thread/testWorker.mjs',
439 {
440 enableTasksQueue: true,
441 tasksQueueOptions: { size: -1 }
442 }
443 )
444 ).toThrow(
445 new RangeError(
446 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
447 )
448 )
449 expect(
450 () =>
451 new FixedThreadPool(
452 numberOfWorkers,
453 './tests/worker-files/thread/testWorker.mjs',
454 {
455 enableTasksQueue: true,
456 tasksQueueOptions: { size: 0.2 }
457 }
458 )
459 ).toThrow(
460 new TypeError('Invalid worker node tasks queue size: must be an integer')
461 )
462 })
463
464 it('Verify that pool worker choice strategy options can be set', async () => {
465 const pool = new FixedThreadPool(
466 numberOfWorkers,
467 './tests/worker-files/thread/testWorker.mjs',
468 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
469 )
470 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
471 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
472 retries:
473 pool.info.maxSize +
474 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
475 runTime: { median: false },
476 waitTime: { median: false },
477 elu: { median: false },
478 weights: expect.objectContaining({
479 0: expect.any(Number),
480 [pool.info.maxSize - 1]: expect.any(Number)
481 })
482 })
483 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
484 .workerChoiceStrategies) {
485 expect(workerChoiceStrategy.opts).toStrictEqual(
486 expect.objectContaining({
487 retries:
488 pool.info.maxSize +
489 Object.keys(workerChoiceStrategy.opts.weights).length,
490 runTime: { median: false },
491 waitTime: { median: false },
492 elu: { median: false }
493 })
494 )
495 }
496 expect(
497 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
498 ).toStrictEqual({
499 runTime: {
500 aggregate: true,
501 average: true,
502 median: false
503 },
504 waitTime: {
505 aggregate: false,
506 average: false,
507 median: false
508 },
509 elu: {
510 aggregate: true,
511 average: true,
512 median: false
513 }
514 })
515 pool.setWorkerChoiceStrategyOptions({
516 runTime: { median: true },
517 elu: { median: true }
518 })
519 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
520 runTime: { median: true },
521 elu: { median: true }
522 })
523 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
524 retries:
525 pool.info.maxSize +
526 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
527 runTime: { median: true },
528 waitTime: { median: false },
529 elu: { median: true },
530 weights: expect.objectContaining({
531 0: expect.any(Number),
532 [pool.info.maxSize - 1]: expect.any(Number)
533 })
534 })
535 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
536 .workerChoiceStrategies) {
537 expect(workerChoiceStrategy.opts).toStrictEqual(
538 expect.objectContaining({
539 retries:
540 pool.info.maxSize +
541 Object.keys(workerChoiceStrategy.opts.weights).length,
542 runTime: { median: true },
543 waitTime: { median: false },
544 elu: { median: true }
545 })
546 )
547 }
548 expect(
549 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
550 ).toStrictEqual({
551 runTime: {
552 aggregate: true,
553 average: false,
554 median: true
555 },
556 waitTime: {
557 aggregate: false,
558 average: false,
559 median: false
560 },
561 elu: {
562 aggregate: true,
563 average: false,
564 median: true
565 }
566 })
567 pool.setWorkerChoiceStrategyOptions({
568 runTime: { median: false },
569 elu: { median: false }
570 })
571 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
572 runTime: { median: false },
573 elu: { median: false }
574 })
575 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
576 retries:
577 pool.info.maxSize +
578 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
579 runTime: { median: false },
580 waitTime: { median: false },
581 elu: { median: false },
582 weights: expect.objectContaining({
583 0: expect.any(Number),
584 [pool.info.maxSize - 1]: expect.any(Number)
585 })
586 })
587 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
588 .workerChoiceStrategies) {
589 expect(workerChoiceStrategy.opts).toStrictEqual(
590 expect.objectContaining({
591 retries:
592 pool.info.maxSize +
593 Object.keys(workerChoiceStrategy.opts.weights).length,
594 runTime: { median: false },
595 waitTime: { median: false },
596 elu: { median: false }
597 })
598 )
599 }
600 expect(
601 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
602 ).toStrictEqual({
603 runTime: {
604 aggregate: true,
605 average: true,
606 median: false
607 },
608 waitTime: {
609 aggregate: false,
610 average: false,
611 median: false
612 },
613 elu: {
614 aggregate: true,
615 average: true,
616 median: false
617 }
618 })
619 expect(() =>
620 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
621 ).toThrow(
622 new TypeError(
623 'Invalid worker choice strategy options: must be a plain object'
624 )
625 )
626 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
627 new Error(
628 'Invalid worker choice strategy options: must have a weight for each worker node'
629 )
630 )
631 expect(() =>
632 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
633 ).toThrow(
634 new Error(
635 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
636 )
637 )
638 await pool.destroy()
639 })
640
641 it('Verify that pool tasks queue can be enabled/disabled', async () => {
642 const pool = new FixedThreadPool(
643 numberOfWorkers,
644 './tests/worker-files/thread/testWorker.mjs'
645 )
646 expect(pool.opts.enableTasksQueue).toBe(false)
647 expect(pool.opts.tasksQueueOptions).toBeUndefined()
648 pool.enableTasksQueue(true)
649 expect(pool.opts.enableTasksQueue).toBe(true)
650 expect(pool.opts.tasksQueueOptions).toStrictEqual({
651 concurrency: 1,
652 size: Math.pow(numberOfWorkers, 2),
653 taskStealing: true,
654 tasksStealingOnBackPressure: true,
655 tasksFinishedTimeout: 2000
656 })
657 pool.enableTasksQueue(true, { concurrency: 2 })
658 expect(pool.opts.enableTasksQueue).toBe(true)
659 expect(pool.opts.tasksQueueOptions).toStrictEqual({
660 concurrency: 2,
661 size: Math.pow(numberOfWorkers, 2),
662 taskStealing: true,
663 tasksStealingOnBackPressure: true,
664 tasksFinishedTimeout: 2000
665 })
666 pool.enableTasksQueue(false)
667 expect(pool.opts.enableTasksQueue).toBe(false)
668 expect(pool.opts.tasksQueueOptions).toBeUndefined()
669 await pool.destroy()
670 })
671
672 it('Verify that pool tasks queue options can be set', async () => {
673 const pool = new FixedThreadPool(
674 numberOfWorkers,
675 './tests/worker-files/thread/testWorker.mjs',
676 { enableTasksQueue: true }
677 )
678 expect(pool.opts.tasksQueueOptions).toStrictEqual({
679 concurrency: 1,
680 size: Math.pow(numberOfWorkers, 2),
681 taskStealing: true,
682 tasksStealingOnBackPressure: true,
683 tasksFinishedTimeout: 2000
684 })
685 for (const workerNode of pool.workerNodes) {
686 expect(workerNode.tasksQueueBackPressureSize).toBe(
687 pool.opts.tasksQueueOptions.size
688 )
689 }
690 pool.setTasksQueueOptions({
691 concurrency: 2,
692 size: 2,
693 taskStealing: false,
694 tasksStealingOnBackPressure: false,
695 tasksFinishedTimeout: 3000
696 })
697 expect(pool.opts.tasksQueueOptions).toStrictEqual({
698 concurrency: 2,
699 size: 2,
700 taskStealing: false,
701 tasksStealingOnBackPressure: false,
702 tasksFinishedTimeout: 3000
703 })
704 for (const workerNode of pool.workerNodes) {
705 expect(workerNode.tasksQueueBackPressureSize).toBe(
706 pool.opts.tasksQueueOptions.size
707 )
708 }
709 pool.setTasksQueueOptions({
710 concurrency: 1,
711 taskStealing: true,
712 tasksStealingOnBackPressure: true
713 })
714 expect(pool.opts.tasksQueueOptions).toStrictEqual({
715 concurrency: 1,
716 size: Math.pow(numberOfWorkers, 2),
717 taskStealing: true,
718 tasksStealingOnBackPressure: true,
719 tasksFinishedTimeout: 2000
720 })
721 for (const workerNode of pool.workerNodes) {
722 expect(workerNode.tasksQueueBackPressureSize).toBe(
723 pool.opts.tasksQueueOptions.size
724 )
725 }
726 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
727 new TypeError('Invalid tasks queue options: must be a plain object')
728 )
729 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
730 new RangeError(
731 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
732 )
733 )
734 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
735 new RangeError(
736 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
737 )
738 )
739 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
740 new TypeError('Invalid worker node tasks concurrency: must be an integer')
741 )
742 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
743 new RangeError(
744 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
745 )
746 )
747 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
748 new RangeError(
749 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
750 )
751 )
752 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
753 new TypeError('Invalid worker node tasks queue size: must be an integer')
754 )
755 await pool.destroy()
756 })
757
758 it('Verify that pool info is set', async () => {
759 let pool = new FixedThreadPool(
760 numberOfWorkers,
761 './tests/worker-files/thread/testWorker.mjs'
762 )
763 expect(pool.info).toStrictEqual({
764 version,
765 type: PoolTypes.fixed,
766 worker: WorkerTypes.thread,
767 started: true,
768 ready: true,
769 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
770 minSize: numberOfWorkers,
771 maxSize: numberOfWorkers,
772 workerNodes: numberOfWorkers,
773 idleWorkerNodes: numberOfWorkers,
774 busyWorkerNodes: 0,
775 executedTasks: 0,
776 executingTasks: 0,
777 failedTasks: 0
778 })
779 await pool.destroy()
780 pool = new DynamicClusterPool(
781 Math.floor(numberOfWorkers / 2),
782 numberOfWorkers,
783 './tests/worker-files/cluster/testWorker.js'
784 )
785 expect(pool.info).toStrictEqual({
786 version,
787 type: PoolTypes.dynamic,
788 worker: WorkerTypes.cluster,
789 started: true,
790 ready: true,
791 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
792 minSize: Math.floor(numberOfWorkers / 2),
793 maxSize: numberOfWorkers,
794 workerNodes: Math.floor(numberOfWorkers / 2),
795 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
796 busyWorkerNodes: 0,
797 executedTasks: 0,
798 executingTasks: 0,
799 failedTasks: 0
800 })
801 await pool.destroy()
802 })
803
804 it('Verify that pool worker tasks usage are initialized', async () => {
805 const pool = new FixedClusterPool(
806 numberOfWorkers,
807 './tests/worker-files/cluster/testWorker.js'
808 )
809 for (const workerNode of pool.workerNodes) {
810 expect(workerNode).toBeInstanceOf(WorkerNode)
811 expect(workerNode.usage).toStrictEqual({
812 tasks: {
813 executed: 0,
814 executing: 0,
815 queued: 0,
816 maxQueued: 0,
817 sequentiallyStolen: 0,
818 stolen: 0,
819 failed: 0
820 },
821 runTime: {
822 history: new CircularArray()
823 },
824 waitTime: {
825 history: new CircularArray()
826 },
827 elu: {
828 idle: {
829 history: new CircularArray()
830 },
831 active: {
832 history: new CircularArray()
833 }
834 }
835 })
836 }
837 await pool.destroy()
838 })
839
840 it('Verify that pool worker tasks queue are initialized', async () => {
841 let pool = new FixedClusterPool(
842 numberOfWorkers,
843 './tests/worker-files/cluster/testWorker.js'
844 )
845 for (const workerNode of pool.workerNodes) {
846 expect(workerNode).toBeInstanceOf(WorkerNode)
847 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
848 expect(workerNode.tasksQueue.size).toBe(0)
849 expect(workerNode.tasksQueue.maxSize).toBe(0)
850 }
851 await pool.destroy()
852 pool = new DynamicThreadPool(
853 Math.floor(numberOfWorkers / 2),
854 numberOfWorkers,
855 './tests/worker-files/thread/testWorker.mjs'
856 )
857 for (const workerNode of pool.workerNodes) {
858 expect(workerNode).toBeInstanceOf(WorkerNode)
859 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
860 expect(workerNode.tasksQueue.size).toBe(0)
861 expect(workerNode.tasksQueue.maxSize).toBe(0)
862 }
863 await pool.destroy()
864 })
865
866 it('Verify that pool worker info are initialized', async () => {
867 let pool = new FixedClusterPool(
868 numberOfWorkers,
869 './tests/worker-files/cluster/testWorker.js'
870 )
871 for (const workerNode of pool.workerNodes) {
872 expect(workerNode).toBeInstanceOf(WorkerNode)
873 expect(workerNode.info).toStrictEqual({
874 id: expect.any(Number),
875 type: WorkerTypes.cluster,
876 dynamic: false,
877 ready: true
878 })
879 }
880 await pool.destroy()
881 pool = new DynamicThreadPool(
882 Math.floor(numberOfWorkers / 2),
883 numberOfWorkers,
884 './tests/worker-files/thread/testWorker.mjs'
885 )
886 for (const workerNode of pool.workerNodes) {
887 expect(workerNode).toBeInstanceOf(WorkerNode)
888 expect(workerNode.info).toStrictEqual({
889 id: expect.any(Number),
890 type: WorkerTypes.thread,
891 dynamic: false,
892 ready: true
893 })
894 }
895 await pool.destroy()
896 })
897
898 it('Verify that pool statuses are checked at start or destroy', async () => {
899 const pool = new FixedThreadPool(
900 numberOfWorkers,
901 './tests/worker-files/thread/testWorker.mjs'
902 )
903 expect(pool.info.started).toBe(true)
904 expect(pool.info.ready).toBe(true)
905 expect(() => pool.start()).toThrow(
906 new Error('Cannot start an already started pool')
907 )
908 await pool.destroy()
909 expect(pool.info.started).toBe(false)
910 expect(pool.info.ready).toBe(false)
911 await expect(pool.destroy()).rejects.toThrow(
912 new Error('Cannot destroy an already destroyed pool')
913 )
914 })
915
916 it('Verify that pool can be started after initialization', async () => {
917 const pool = new FixedClusterPool(
918 numberOfWorkers,
919 './tests/worker-files/cluster/testWorker.js',
920 {
921 startWorkers: false
922 }
923 )
924 expect(pool.info.started).toBe(false)
925 expect(pool.info.ready).toBe(false)
926 expect(pool.readyEventEmitted).toBe(false)
927 expect(pool.workerNodes).toStrictEqual([])
928 await expect(pool.execute()).rejects.toThrow(
929 new Error('Cannot execute a task on not started pool')
930 )
931 pool.start()
932 expect(pool.info.started).toBe(true)
933 expect(pool.info.ready).toBe(true)
934 await waitPoolEvents(pool, PoolEvents.ready, 1)
935 expect(pool.readyEventEmitted).toBe(true)
936 expect(pool.workerNodes.length).toBe(numberOfWorkers)
937 for (const workerNode of pool.workerNodes) {
938 expect(workerNode).toBeInstanceOf(WorkerNode)
939 }
940 await pool.destroy()
941 })
942
943 it('Verify that pool execute() arguments are checked', async () => {
944 const pool = new FixedClusterPool(
945 numberOfWorkers,
946 './tests/worker-files/cluster/testWorker.js'
947 )
948 await expect(pool.execute(undefined, 0)).rejects.toThrow(
949 new TypeError('name argument must be a string')
950 )
951 await expect(pool.execute(undefined, '')).rejects.toThrow(
952 new TypeError('name argument must not be an empty string')
953 )
954 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
955 new TypeError('transferList argument must be an array')
956 )
957 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
958 "Task function 'unknown' not found"
959 )
960 await pool.destroy()
961 await expect(pool.execute()).rejects.toThrow(
962 new Error('Cannot execute a task on not started pool')
963 )
964 })
965
966 it('Verify that pool worker tasks usage are computed', async () => {
967 const pool = new FixedClusterPool(
968 numberOfWorkers,
969 './tests/worker-files/cluster/testWorker.js'
970 )
971 const promises = new Set()
972 const maxMultiplier = 2
973 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
974 promises.add(pool.execute())
975 }
976 for (const workerNode of pool.workerNodes) {
977 expect(workerNode.usage).toStrictEqual({
978 tasks: {
979 executed: 0,
980 executing: maxMultiplier,
981 queued: 0,
982 maxQueued: 0,
983 sequentiallyStolen: 0,
984 stolen: 0,
985 failed: 0
986 },
987 runTime: {
988 history: expect.any(CircularArray)
989 },
990 waitTime: {
991 history: expect.any(CircularArray)
992 },
993 elu: {
994 idle: {
995 history: expect.any(CircularArray)
996 },
997 active: {
998 history: expect.any(CircularArray)
999 }
1000 }
1001 })
1002 }
1003 await Promise.all(promises)
1004 for (const workerNode of pool.workerNodes) {
1005 expect(workerNode.usage).toStrictEqual({
1006 tasks: {
1007 executed: maxMultiplier,
1008 executing: 0,
1009 queued: 0,
1010 maxQueued: 0,
1011 sequentiallyStolen: 0,
1012 stolen: 0,
1013 failed: 0
1014 },
1015 runTime: {
1016 history: expect.any(CircularArray)
1017 },
1018 waitTime: {
1019 history: expect.any(CircularArray)
1020 },
1021 elu: {
1022 idle: {
1023 history: expect.any(CircularArray)
1024 },
1025 active: {
1026 history: expect.any(CircularArray)
1027 }
1028 }
1029 })
1030 }
1031 await pool.destroy()
1032 })
1033
1034 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1035 const pool = new DynamicThreadPool(
1036 Math.floor(numberOfWorkers / 2),
1037 numberOfWorkers,
1038 './tests/worker-files/thread/testWorker.mjs'
1039 )
1040 const promises = new Set()
1041 const maxMultiplier = 2
1042 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1043 promises.add(pool.execute())
1044 }
1045 await Promise.all(promises)
1046 for (const workerNode of pool.workerNodes) {
1047 expect(workerNode.usage).toStrictEqual({
1048 tasks: {
1049 executed: expect.any(Number),
1050 executing: 0,
1051 queued: 0,
1052 maxQueued: 0,
1053 sequentiallyStolen: 0,
1054 stolen: 0,
1055 failed: 0
1056 },
1057 runTime: {
1058 history: expect.any(CircularArray)
1059 },
1060 waitTime: {
1061 history: expect.any(CircularArray)
1062 },
1063 elu: {
1064 idle: {
1065 history: expect.any(CircularArray)
1066 },
1067 active: {
1068 history: expect.any(CircularArray)
1069 }
1070 }
1071 })
1072 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1073 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1074 numberOfWorkers * maxMultiplier
1075 )
1076 expect(workerNode.usage.runTime.history.length).toBe(0)
1077 expect(workerNode.usage.waitTime.history.length).toBe(0)
1078 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1079 expect(workerNode.usage.elu.active.history.length).toBe(0)
1080 }
1081 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1082 for (const workerNode of pool.workerNodes) {
1083 expect(workerNode.usage).toStrictEqual({
1084 tasks: {
1085 executed: 0,
1086 executing: 0,
1087 queued: 0,
1088 maxQueued: 0,
1089 sequentiallyStolen: 0,
1090 stolen: 0,
1091 failed: 0
1092 },
1093 runTime: {
1094 history: expect.any(CircularArray)
1095 },
1096 waitTime: {
1097 history: expect.any(CircularArray)
1098 },
1099 elu: {
1100 idle: {
1101 history: expect.any(CircularArray)
1102 },
1103 active: {
1104 history: expect.any(CircularArray)
1105 }
1106 }
1107 })
1108 expect(workerNode.usage.runTime.history.length).toBe(0)
1109 expect(workerNode.usage.waitTime.history.length).toBe(0)
1110 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1111 expect(workerNode.usage.elu.active.history.length).toBe(0)
1112 }
1113 await pool.destroy()
1114 })
1115
1116 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1117 const pool = new DynamicClusterPool(
1118 Math.floor(numberOfWorkers / 2),
1119 numberOfWorkers,
1120 './tests/worker-files/cluster/testWorker.js'
1121 )
1122 expect(pool.emitter.eventNames()).toStrictEqual([])
1123 let poolInfo
1124 let poolReady = 0
1125 pool.emitter.on(PoolEvents.ready, info => {
1126 ++poolReady
1127 poolInfo = info
1128 })
1129 await waitPoolEvents(pool, PoolEvents.ready, 1)
1130 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1131 expect(poolReady).toBe(1)
1132 expect(poolInfo).toStrictEqual({
1133 version,
1134 type: PoolTypes.dynamic,
1135 worker: WorkerTypes.cluster,
1136 started: true,
1137 ready: true,
1138 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1139 minSize: expect.any(Number),
1140 maxSize: expect.any(Number),
1141 workerNodes: expect.any(Number),
1142 idleWorkerNodes: expect.any(Number),
1143 busyWorkerNodes: expect.any(Number),
1144 executedTasks: expect.any(Number),
1145 executingTasks: expect.any(Number),
1146 failedTasks: expect.any(Number)
1147 })
1148 await pool.destroy()
1149 })
1150
1151 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1152 const pool = new FixedThreadPool(
1153 numberOfWorkers,
1154 './tests/worker-files/thread/testWorker.mjs'
1155 )
1156 expect(pool.emitter.eventNames()).toStrictEqual([])
1157 const promises = new Set()
1158 let poolBusy = 0
1159 let poolInfo
1160 pool.emitter.on(PoolEvents.busy, info => {
1161 ++poolBusy
1162 poolInfo = info
1163 })
1164 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1165 for (let i = 0; i < numberOfWorkers * 2; i++) {
1166 promises.add(pool.execute())
1167 }
1168 await Promise.all(promises)
1169 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1170 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1171 expect(poolBusy).toBe(numberOfWorkers + 1)
1172 expect(poolInfo).toStrictEqual({
1173 version,
1174 type: PoolTypes.fixed,
1175 worker: WorkerTypes.thread,
1176 started: true,
1177 ready: true,
1178 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1179 minSize: expect.any(Number),
1180 maxSize: expect.any(Number),
1181 workerNodes: expect.any(Number),
1182 idleWorkerNodes: expect.any(Number),
1183 busyWorkerNodes: expect.any(Number),
1184 executedTasks: expect.any(Number),
1185 executingTasks: expect.any(Number),
1186 failedTasks: expect.any(Number)
1187 })
1188 await pool.destroy()
1189 })
1190
1191 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1192 const pool = new DynamicThreadPool(
1193 Math.floor(numberOfWorkers / 2),
1194 numberOfWorkers,
1195 './tests/worker-files/thread/testWorker.mjs'
1196 )
1197 expect(pool.emitter.eventNames()).toStrictEqual([])
1198 const promises = new Set()
1199 let poolFull = 0
1200 let poolInfo
1201 pool.emitter.on(PoolEvents.full, info => {
1202 ++poolFull
1203 poolInfo = info
1204 })
1205 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1206 for (let i = 0; i < numberOfWorkers * 2; i++) {
1207 promises.add(pool.execute())
1208 }
1209 await Promise.all(promises)
1210 expect(poolFull).toBe(1)
1211 expect(poolInfo).toStrictEqual({
1212 version,
1213 type: PoolTypes.dynamic,
1214 worker: WorkerTypes.thread,
1215 started: true,
1216 ready: true,
1217 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1218 minSize: expect.any(Number),
1219 maxSize: expect.any(Number),
1220 workerNodes: expect.any(Number),
1221 idleWorkerNodes: expect.any(Number),
1222 busyWorkerNodes: expect.any(Number),
1223 executedTasks: expect.any(Number),
1224 executingTasks: expect.any(Number),
1225 failedTasks: expect.any(Number)
1226 })
1227 await pool.destroy()
1228 })
1229
1230 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1231 const pool = new FixedThreadPool(
1232 numberOfWorkers,
1233 './tests/worker-files/thread/testWorker.mjs',
1234 {
1235 enableTasksQueue: true
1236 }
1237 )
1238 stub(pool, 'hasBackPressure').returns(true)
1239 expect(pool.emitter.eventNames()).toStrictEqual([])
1240 const promises = new Set()
1241 let poolBackPressure = 0
1242 let poolInfo
1243 pool.emitter.on(PoolEvents.backPressure, info => {
1244 ++poolBackPressure
1245 poolInfo = info
1246 })
1247 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1248 for (let i = 0; i < numberOfWorkers + 1; i++) {
1249 promises.add(pool.execute())
1250 }
1251 await Promise.all(promises)
1252 expect(poolBackPressure).toBe(1)
1253 expect(poolInfo).toStrictEqual({
1254 version,
1255 type: PoolTypes.fixed,
1256 worker: WorkerTypes.thread,
1257 started: true,
1258 ready: true,
1259 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1260 minSize: expect.any(Number),
1261 maxSize: expect.any(Number),
1262 workerNodes: expect.any(Number),
1263 idleWorkerNodes: expect.any(Number),
1264 busyWorkerNodes: expect.any(Number),
1265 executedTasks: expect.any(Number),
1266 executingTasks: expect.any(Number),
1267 maxQueuedTasks: expect.any(Number),
1268 queuedTasks: expect.any(Number),
1269 backPressure: true,
1270 stolenTasks: expect.any(Number),
1271 failedTasks: expect.any(Number)
1272 })
1273 expect(pool.hasBackPressure.callCount).toBe(5)
1274 await pool.destroy()
1275 })
1276
1277 it('Verify that destroy() waits for queued tasks to finish', async () => {
1278 const tasksFinishedTimeout = 2500
1279 const pool = new FixedThreadPool(
1280 numberOfWorkers,
1281 './tests/worker-files/thread/asyncWorker.mjs',
1282 {
1283 enableTasksQueue: true,
1284 tasksQueueOptions: { tasksFinishedTimeout }
1285 }
1286 )
1287 const maxMultiplier = 4
1288 let tasksFinished = 0
1289 for (const workerNode of pool.workerNodes) {
1290 workerNode.on('taskFinished', () => {
1291 ++tasksFinished
1292 })
1293 }
1294 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1295 pool.execute()
1296 }
1297 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1298 const startTime = performance.now()
1299 await pool.destroy()
1300 const elapsedTime = performance.now() - startTime
1301 expect(tasksFinished).toBeGreaterThanOrEqual(numberOfWorkers)
1302 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1303 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1304 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1305 })
1306
1307 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1308 const tasksFinishedTimeout = 1000
1309 const pool = new FixedThreadPool(
1310 numberOfWorkers,
1311 './tests/worker-files/thread/asyncWorker.mjs',
1312 {
1313 enableTasksQueue: true,
1314 tasksQueueOptions: { tasksFinishedTimeout }
1315 }
1316 )
1317 const maxMultiplier = 4
1318 let tasksFinished = 0
1319 for (const workerNode of pool.workerNodes) {
1320 workerNode.on('taskFinished', () => {
1321 ++tasksFinished
1322 })
1323 }
1324 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1325 pool.execute()
1326 }
1327 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1328 const startTime = performance.now()
1329 await pool.destroy()
1330 const elapsedTime = performance.now() - startTime
1331 expect(tasksFinished).toBe(0)
1332 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1333 })
1334
1335 it('Verify that pool asynchronous resource track tasks execution', async () => {
1336 let taskAsyncId
1337 let initCalls = 0
1338 let beforeCalls = 0
1339 let afterCalls = 0
1340 let resolveCalls = 0
1341 const hook = createHook({
1342 init (asyncId, type) {
1343 if (type === 'poolifier:task') {
1344 initCalls++
1345 taskAsyncId = asyncId
1346 }
1347 },
1348 before (asyncId) {
1349 if (asyncId === taskAsyncId) beforeCalls++
1350 },
1351 after (asyncId) {
1352 if (asyncId === taskAsyncId) afterCalls++
1353 },
1354 promiseResolve () {
1355 if (executionAsyncId() === taskAsyncId) resolveCalls++
1356 }
1357 })
1358 const pool = new FixedThreadPool(
1359 numberOfWorkers,
1360 './tests/worker-files/thread/testWorker.mjs'
1361 )
1362 hook.enable()
1363 await pool.execute()
1364 hook.disable()
1365 expect(initCalls).toBe(1)
1366 expect(beforeCalls).toBe(1)
1367 expect(afterCalls).toBe(1)
1368 expect(resolveCalls).toBe(1)
1369 await pool.destroy()
1370 })
1371
1372 it('Verify that hasTaskFunction() is working', async () => {
1373 const dynamicThreadPool = new DynamicThreadPool(
1374 Math.floor(numberOfWorkers / 2),
1375 numberOfWorkers,
1376 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1377 )
1378 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1379 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1380 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1381 true
1382 )
1383 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1384 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1385 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1386 await dynamicThreadPool.destroy()
1387 const fixedClusterPool = new FixedClusterPool(
1388 numberOfWorkers,
1389 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1390 )
1391 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1392 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1393 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1394 true
1395 )
1396 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1397 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1398 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1399 await fixedClusterPool.destroy()
1400 })
1401
1402 it('Verify that addTaskFunction() is working', async () => {
1403 const dynamicThreadPool = new DynamicThreadPool(
1404 Math.floor(numberOfWorkers / 2),
1405 numberOfWorkers,
1406 './tests/worker-files/thread/testWorker.mjs'
1407 )
1408 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1409 await expect(
1410 dynamicThreadPool.addTaskFunction(0, () => {})
1411 ).rejects.toThrow(new TypeError('name argument must be a string'))
1412 await expect(
1413 dynamicThreadPool.addTaskFunction('', () => {})
1414 ).rejects.toThrow(
1415 new TypeError('name argument must not be an empty string')
1416 )
1417 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1418 new TypeError('fn argument must be a function')
1419 )
1420 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1421 new TypeError('fn argument must be a function')
1422 )
1423 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1424 DEFAULT_TASK_NAME,
1425 'test'
1426 ])
1427 const echoTaskFunction = data => {
1428 return data
1429 }
1430 await expect(
1431 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1432 ).resolves.toBe(true)
1433 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1434 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1435 echoTaskFunction
1436 )
1437 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1438 DEFAULT_TASK_NAME,
1439 'test',
1440 'echo'
1441 ])
1442 const taskFunctionData = { test: 'test' }
1443 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1444 expect(echoResult).toStrictEqual(taskFunctionData)
1445 for (const workerNode of dynamicThreadPool.workerNodes) {
1446 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1447 tasks: {
1448 executed: expect.any(Number),
1449 executing: 0,
1450 queued: 0,
1451 sequentiallyStolen: 0,
1452 stolen: 0,
1453 failed: 0
1454 },
1455 runTime: {
1456 history: new CircularArray()
1457 },
1458 waitTime: {
1459 history: new CircularArray()
1460 },
1461 elu: {
1462 idle: {
1463 history: new CircularArray()
1464 },
1465 active: {
1466 history: new CircularArray()
1467 }
1468 }
1469 })
1470 }
1471 await dynamicThreadPool.destroy()
1472 })
1473
1474 it('Verify that removeTaskFunction() is working', async () => {
1475 const dynamicThreadPool = new DynamicThreadPool(
1476 Math.floor(numberOfWorkers / 2),
1477 numberOfWorkers,
1478 './tests/worker-files/thread/testWorker.mjs'
1479 )
1480 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1481 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1482 DEFAULT_TASK_NAME,
1483 'test'
1484 ])
1485 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1486 new Error('Cannot remove a task function not handled on the pool side')
1487 )
1488 const echoTaskFunction = data => {
1489 return data
1490 }
1491 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1492 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1493 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1494 echoTaskFunction
1495 )
1496 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1497 DEFAULT_TASK_NAME,
1498 'test',
1499 'echo'
1500 ])
1501 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1502 true
1503 )
1504 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1505 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1506 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1507 DEFAULT_TASK_NAME,
1508 'test'
1509 ])
1510 await dynamicThreadPool.destroy()
1511 })
1512
1513 it('Verify that listTaskFunctionNames() is working', async () => {
1514 const dynamicThreadPool = new DynamicThreadPool(
1515 Math.floor(numberOfWorkers / 2),
1516 numberOfWorkers,
1517 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1518 )
1519 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1520 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1521 DEFAULT_TASK_NAME,
1522 'jsonIntegerSerialization',
1523 'factorial',
1524 'fibonacci'
1525 ])
1526 await dynamicThreadPool.destroy()
1527 const fixedClusterPool = new FixedClusterPool(
1528 numberOfWorkers,
1529 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1530 )
1531 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1532 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1533 DEFAULT_TASK_NAME,
1534 'jsonIntegerSerialization',
1535 'factorial',
1536 'fibonacci'
1537 ])
1538 await fixedClusterPool.destroy()
1539 })
1540
1541 it('Verify that setDefaultTaskFunction() is working', async () => {
1542 const dynamicThreadPool = new DynamicThreadPool(
1543 Math.floor(numberOfWorkers / 2),
1544 numberOfWorkers,
1545 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1546 )
1547 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1548 const workerId = dynamicThreadPool.workerNodes[0].info.id
1549 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1550 new Error(
1551 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1552 )
1553 )
1554 await expect(
1555 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1556 ).rejects.toThrow(
1557 new Error(
1558 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1559 )
1560 )
1561 await expect(
1562 dynamicThreadPool.setDefaultTaskFunction('unknown')
1563 ).rejects.toThrow(
1564 new Error(
1565 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1566 )
1567 )
1568 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1569 DEFAULT_TASK_NAME,
1570 'jsonIntegerSerialization',
1571 'factorial',
1572 'fibonacci'
1573 ])
1574 await expect(
1575 dynamicThreadPool.setDefaultTaskFunction('factorial')
1576 ).resolves.toBe(true)
1577 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1578 DEFAULT_TASK_NAME,
1579 'factorial',
1580 'jsonIntegerSerialization',
1581 'fibonacci'
1582 ])
1583 await expect(
1584 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1585 ).resolves.toBe(true)
1586 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1587 DEFAULT_TASK_NAME,
1588 'fibonacci',
1589 'jsonIntegerSerialization',
1590 'factorial'
1591 ])
1592 await dynamicThreadPool.destroy()
1593 })
1594
1595 it('Verify that multiple task functions worker is working', async () => {
1596 const pool = new DynamicClusterPool(
1597 Math.floor(numberOfWorkers / 2),
1598 numberOfWorkers,
1599 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1600 )
1601 const data = { n: 10 }
1602 const result0 = await pool.execute(data)
1603 expect(result0).toStrictEqual({ ok: 1 })
1604 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1605 expect(result1).toStrictEqual({ ok: 1 })
1606 const result2 = await pool.execute(data, 'factorial')
1607 expect(result2).toBe(3628800)
1608 const result3 = await pool.execute(data, 'fibonacci')
1609 expect(result3).toBe(55)
1610 expect(pool.info.executingTasks).toBe(0)
1611 expect(pool.info.executedTasks).toBe(4)
1612 for (const workerNode of pool.workerNodes) {
1613 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1614 DEFAULT_TASK_NAME,
1615 'jsonIntegerSerialization',
1616 'factorial',
1617 'fibonacci'
1618 ])
1619 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1620 for (const name of pool.listTaskFunctionNames()) {
1621 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1622 tasks: {
1623 executed: expect.any(Number),
1624 executing: 0,
1625 failed: 0,
1626 queued: 0,
1627 sequentiallyStolen: 0,
1628 stolen: 0
1629 },
1630 runTime: {
1631 history: expect.any(CircularArray)
1632 },
1633 waitTime: {
1634 history: expect.any(CircularArray)
1635 },
1636 elu: {
1637 idle: {
1638 history: expect.any(CircularArray)
1639 },
1640 active: {
1641 history: expect.any(CircularArray)
1642 }
1643 }
1644 })
1645 expect(
1646 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1647 ).toBeGreaterThan(0)
1648 }
1649 expect(
1650 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1651 ).toStrictEqual(
1652 workerNode.getTaskFunctionWorkerUsage(
1653 workerNode.info.taskFunctionNames[1]
1654 )
1655 )
1656 }
1657 await pool.destroy()
1658 })
1659
1660 it('Verify sendKillMessageToWorker()', async () => {
1661 const pool = new DynamicClusterPool(
1662 Math.floor(numberOfWorkers / 2),
1663 numberOfWorkers,
1664 './tests/worker-files/cluster/testWorker.js'
1665 )
1666 const workerNodeKey = 0
1667 await expect(
1668 pool.sendKillMessageToWorker(workerNodeKey)
1669 ).resolves.toBeUndefined()
1670 await pool.destroy()
1671 })
1672
1673 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1674 const pool = new DynamicClusterPool(
1675 Math.floor(numberOfWorkers / 2),
1676 numberOfWorkers,
1677 './tests/worker-files/cluster/testWorker.js'
1678 )
1679 const workerNodeKey = 0
1680 await expect(
1681 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1682 taskFunctionOperation: 'add',
1683 taskFunctionName: 'empty',
1684 taskFunction: (() => {}).toString()
1685 })
1686 ).resolves.toBe(true)
1687 expect(
1688 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1689 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1690 await pool.destroy()
1691 })
1692
1693 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1694 const pool = new DynamicClusterPool(
1695 Math.floor(numberOfWorkers / 2),
1696 numberOfWorkers,
1697 './tests/worker-files/cluster/testWorker.js'
1698 )
1699 await expect(
1700 pool.sendTaskFunctionOperationToWorkers({
1701 taskFunctionOperation: 'add',
1702 taskFunctionName: 'empty',
1703 taskFunction: (() => {}).toString()
1704 })
1705 ).resolves.toBe(true)
1706 for (const workerNode of pool.workerNodes) {
1707 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1708 DEFAULT_TASK_NAME,
1709 'test',
1710 'empty'
1711 ])
1712 }
1713 await pool.destroy()
1714 })
1715 })