fix: put back properties init in constructor
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
8 import {
9 DynamicClusterPool,
10 DynamicThreadPool,
11 FixedClusterPool,
12 FixedThreadPool,
13 PoolEvents,
14 PoolTypes,
15 WorkerChoiceStrategies,
16 WorkerTypes
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
23
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
26 readFileSync(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
28 'utf8'
29 )
30 ).version
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
33 isMain () {
34 return false
35 }
36 }
37
38 afterEach(() => {
39 restore()
40 })
41
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
52 expect(
53 () =>
54 new StubPoolWithIsMain(
55 numberOfWorkers,
56 './tests/worker-files/thread/testWorker.mjs',
57 {
58 errorHandler: e => console.error(e)
59 }
60 )
61 ).toThrow(
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
65 )
66 })
67
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
71 './tests/worker-files/thread/testWorker.mjs'
72 )
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
76 await pool.destroy()
77 })
78
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
96 './tests/worker-files/thread/testWorker.mjs'
97 )
98 ).toThrow(
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
109 ).toThrow(
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
120 ).toThrow(
121 new TypeError(
122 'Cannot instantiate a pool with a non safe integer number of workers'
123 )
124 )
125 })
126
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
143 it('Verify that dynamic pool sizing is checked', () => {
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
151 ).toThrow(
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
161 './tests/worker-files/thread/testWorker.mjs'
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
173 './tests/worker-files/cluster/testWorker.js'
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
180 expect(
181 () =>
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
202 )
203 )
204 expect(
205 () =>
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
211 ).toThrow(
212 new RangeError(
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
214 )
215 )
216 })
217
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
221 './tests/worker-files/thread/testWorker.mjs'
222 )
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
232 retries:
233 pool.info.maxSize +
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
242 })
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
247 retries:
248 pool.info.maxSize +
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false },
253 weights: expect.objectContaining({
254 0: expect.any(Number),
255 [pool.info.maxSize - 1]: expect.any(Number)
256 })
257 })
258 )
259 }
260 await pool.destroy()
261 const testHandler = () => console.info('test handler executed')
262 pool = new FixedThreadPool(
263 numberOfWorkers,
264 './tests/worker-files/thread/testWorker.mjs',
265 {
266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
267 workerChoiceStrategyOptions: {
268 runTime: { median: true },
269 weights: { 0: 300, 1: 200 }
270 },
271 enableEvents: false,
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
274 tasksQueueOptions: { concurrency: 2 },
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 onlineHandler: testHandler,
278 exitHandler: testHandler
279 }
280 )
281 expect(pool.emitter).toBeUndefined()
282 expect(pool.opts).toStrictEqual({
283 startWorkers: true,
284 enableEvents: false,
285 restartWorkerOnError: false,
286 enableTasksQueue: true,
287 tasksQueueOptions: {
288 concurrency: 2,
289 size: Math.pow(numberOfWorkers, 2),
290 taskStealing: true,
291 tasksStealingOnBackPressure: true,
292 tasksFinishedTimeout: 2000
293 },
294 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
295 workerChoiceStrategyOptions: {
296 runTime: { median: true },
297 weights: { 0: 300, 1: 200 }
298 },
299 onlineHandler: testHandler,
300 messageHandler: testHandler,
301 errorHandler: testHandler,
302 exitHandler: testHandler
303 })
304 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
305 retries:
306 pool.info.maxSize +
307 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
308 runTime: { median: true },
309 waitTime: { median: false },
310 elu: { median: false },
311 weights: { 0: 300, 1: 200 }
312 })
313 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
314 .workerChoiceStrategies) {
315 expect(workerChoiceStrategy.opts).toStrictEqual({
316 retries:
317 pool.info.maxSize +
318 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
319 runTime: { median: true },
320 waitTime: { median: false },
321 elu: { median: false },
322 weights: { 0: 300, 1: 200 }
323 })
324 }
325 await pool.destroy()
326 })
327
328 it('Verify that pool options are validated', () => {
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
333 './tests/worker-files/thread/testWorker.mjs',
334 {
335 workerChoiceStrategy: 'invalidStrategy'
336 }
337 )
338 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
339 expect(
340 () =>
341 new FixedThreadPool(
342 numberOfWorkers,
343 './tests/worker-files/thread/testWorker.mjs',
344 {
345 workerChoiceStrategyOptions: { weights: {} }
346 }
347 )
348 ).toThrow(
349 new Error(
350 'Invalid worker choice strategy options: must have a weight for each worker node'
351 )
352 )
353 expect(
354 () =>
355 new FixedThreadPool(
356 numberOfWorkers,
357 './tests/worker-files/thread/testWorker.mjs',
358 {
359 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
360 }
361 )
362 ).toThrow(
363 new Error(
364 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
365 )
366 )
367 expect(
368 () =>
369 new FixedThreadPool(
370 numberOfWorkers,
371 './tests/worker-files/thread/testWorker.mjs',
372 {
373 enableTasksQueue: true,
374 tasksQueueOptions: 'invalidTasksQueueOptions'
375 }
376 )
377 ).toThrow(
378 new TypeError('Invalid tasks queue options: must be a plain object')
379 )
380 expect(
381 () =>
382 new FixedThreadPool(
383 numberOfWorkers,
384 './tests/worker-files/thread/testWorker.mjs',
385 {
386 enableTasksQueue: true,
387 tasksQueueOptions: { concurrency: 0 }
388 }
389 )
390 ).toThrow(
391 new RangeError(
392 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
393 )
394 )
395 expect(
396 () =>
397 new FixedThreadPool(
398 numberOfWorkers,
399 './tests/worker-files/thread/testWorker.mjs',
400 {
401 enableTasksQueue: true,
402 tasksQueueOptions: { concurrency: -1 }
403 }
404 )
405 ).toThrow(
406 new RangeError(
407 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
408 )
409 )
410 expect(
411 () =>
412 new FixedThreadPool(
413 numberOfWorkers,
414 './tests/worker-files/thread/testWorker.mjs',
415 {
416 enableTasksQueue: true,
417 tasksQueueOptions: { concurrency: 0.2 }
418 }
419 )
420 ).toThrow(
421 new TypeError('Invalid worker node tasks concurrency: must be an integer')
422 )
423 expect(
424 () =>
425 new FixedThreadPool(
426 numberOfWorkers,
427 './tests/worker-files/thread/testWorker.mjs',
428 {
429 enableTasksQueue: true,
430 tasksQueueOptions: { size: 0 }
431 }
432 )
433 ).toThrow(
434 new RangeError(
435 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
436 )
437 )
438 expect(
439 () =>
440 new FixedThreadPool(
441 numberOfWorkers,
442 './tests/worker-files/thread/testWorker.mjs',
443 {
444 enableTasksQueue: true,
445 tasksQueueOptions: { size: -1 }
446 }
447 )
448 ).toThrow(
449 new RangeError(
450 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
451 )
452 )
453 expect(
454 () =>
455 new FixedThreadPool(
456 numberOfWorkers,
457 './tests/worker-files/thread/testWorker.mjs',
458 {
459 enableTasksQueue: true,
460 tasksQueueOptions: { size: 0.2 }
461 }
462 )
463 ).toThrow(
464 new TypeError('Invalid worker node tasks queue size: must be an integer')
465 )
466 })
467
468 it('Verify that pool worker choice strategy options can be set', async () => {
469 const pool = new FixedThreadPool(
470 numberOfWorkers,
471 './tests/worker-files/thread/testWorker.mjs',
472 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
473 )
474 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
475 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
476 retries:
477 pool.info.maxSize +
478 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
479 runTime: { median: false },
480 waitTime: { median: false },
481 elu: { median: false },
482 weights: expect.objectContaining({
483 0: expect.any(Number),
484 [pool.info.maxSize - 1]: expect.any(Number)
485 })
486 })
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
489 expect(workerChoiceStrategy.opts).toStrictEqual(
490 expect.objectContaining({
491 retries:
492 pool.info.maxSize +
493 Object.keys(workerChoiceStrategy.opts.weights).length,
494 runTime: { median: false },
495 waitTime: { median: false },
496 elu: { median: false }
497 })
498 )
499 }
500 expect(
501 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
502 ).toStrictEqual({
503 runTime: {
504 aggregate: true,
505 average: true,
506 median: false
507 },
508 waitTime: {
509 aggregate: false,
510 average: false,
511 median: false
512 },
513 elu: {
514 aggregate: true,
515 average: true,
516 median: false
517 }
518 })
519 pool.setWorkerChoiceStrategyOptions({
520 runTime: { median: true },
521 elu: { median: true }
522 })
523 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
524 runTime: { median: true },
525 elu: { median: true }
526 })
527 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
528 retries:
529 pool.info.maxSize +
530 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
531 runTime: { median: true },
532 waitTime: { median: false },
533 elu: { median: true },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
537 })
538 })
539 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
540 .workerChoiceStrategies) {
541 expect(workerChoiceStrategy.opts).toStrictEqual(
542 expect.objectContaining({
543 retries:
544 pool.info.maxSize +
545 Object.keys(workerChoiceStrategy.opts.weights).length,
546 runTime: { median: true },
547 waitTime: { median: false },
548 elu: { median: true }
549 })
550 )
551 }
552 expect(
553 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
554 ).toStrictEqual({
555 runTime: {
556 aggregate: true,
557 average: false,
558 median: true
559 },
560 waitTime: {
561 aggregate: false,
562 average: false,
563 median: false
564 },
565 elu: {
566 aggregate: true,
567 average: false,
568 median: true
569 }
570 })
571 pool.setWorkerChoiceStrategyOptions({
572 runTime: { median: false },
573 elu: { median: false }
574 })
575 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
576 runTime: { median: false },
577 elu: { median: false }
578 })
579 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
580 retries:
581 pool.info.maxSize +
582 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
583 runTime: { median: false },
584 waitTime: { median: false },
585 elu: { median: false },
586 weights: expect.objectContaining({
587 0: expect.any(Number),
588 [pool.info.maxSize - 1]: expect.any(Number)
589 })
590 })
591 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
592 .workerChoiceStrategies) {
593 expect(workerChoiceStrategy.opts).toStrictEqual(
594 expect.objectContaining({
595 retries:
596 pool.info.maxSize +
597 Object.keys(workerChoiceStrategy.opts.weights).length,
598 runTime: { median: false },
599 waitTime: { median: false },
600 elu: { median: false }
601 })
602 )
603 }
604 expect(
605 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
606 ).toStrictEqual({
607 runTime: {
608 aggregate: true,
609 average: true,
610 median: false
611 },
612 waitTime: {
613 aggregate: false,
614 average: false,
615 median: false
616 },
617 elu: {
618 aggregate: true,
619 average: true,
620 median: false
621 }
622 })
623 expect(() =>
624 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
625 ).toThrow(
626 new TypeError(
627 'Invalid worker choice strategy options: must be a plain object'
628 )
629 )
630 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
631 new Error(
632 'Invalid worker choice strategy options: must have a weight for each worker node'
633 )
634 )
635 expect(() =>
636 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
637 ).toThrow(
638 new Error(
639 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
640 )
641 )
642 await pool.destroy()
643 })
644
645 it('Verify that pool tasks queue can be enabled/disabled', async () => {
646 const pool = new FixedThreadPool(
647 numberOfWorkers,
648 './tests/worker-files/thread/testWorker.mjs'
649 )
650 expect(pool.opts.enableTasksQueue).toBe(false)
651 expect(pool.opts.tasksQueueOptions).toBeUndefined()
652 pool.enableTasksQueue(true)
653 expect(pool.opts.enableTasksQueue).toBe(true)
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
655 concurrency: 1,
656 size: Math.pow(numberOfWorkers, 2),
657 taskStealing: true,
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 2000
660 })
661 pool.enableTasksQueue(true, { concurrency: 2 })
662 expect(pool.opts.enableTasksQueue).toBe(true)
663 expect(pool.opts.tasksQueueOptions).toStrictEqual({
664 concurrency: 2,
665 size: Math.pow(numberOfWorkers, 2),
666 taskStealing: true,
667 tasksStealingOnBackPressure: true,
668 tasksFinishedTimeout: 2000
669 })
670 pool.enableTasksQueue(false)
671 expect(pool.opts.enableTasksQueue).toBe(false)
672 expect(pool.opts.tasksQueueOptions).toBeUndefined()
673 await pool.destroy()
674 })
675
676 it('Verify that pool tasks queue options can be set', async () => {
677 const pool = new FixedThreadPool(
678 numberOfWorkers,
679 './tests/worker-files/thread/testWorker.mjs',
680 { enableTasksQueue: true }
681 )
682 expect(pool.opts.tasksQueueOptions).toStrictEqual({
683 concurrency: 1,
684 size: Math.pow(numberOfWorkers, 2),
685 taskStealing: true,
686 tasksStealingOnBackPressure: true,
687 tasksFinishedTimeout: 2000
688 })
689 for (const workerNode of pool.workerNodes) {
690 expect(workerNode.tasksQueueBackPressureSize).toBe(
691 pool.opts.tasksQueueOptions.size
692 )
693 }
694 pool.setTasksQueueOptions({
695 concurrency: 2,
696 size: 2,
697 taskStealing: false,
698 tasksStealingOnBackPressure: false,
699 tasksFinishedTimeout: 3000
700 })
701 expect(pool.opts.tasksQueueOptions).toStrictEqual({
702 concurrency: 2,
703 size: 2,
704 taskStealing: false,
705 tasksStealingOnBackPressure: false,
706 tasksFinishedTimeout: 3000
707 })
708 for (const workerNode of pool.workerNodes) {
709 expect(workerNode.tasksQueueBackPressureSize).toBe(
710 pool.opts.tasksQueueOptions.size
711 )
712 }
713 pool.setTasksQueueOptions({
714 concurrency: 1,
715 taskStealing: true,
716 tasksStealingOnBackPressure: true
717 })
718 expect(pool.opts.tasksQueueOptions).toStrictEqual({
719 concurrency: 1,
720 size: Math.pow(numberOfWorkers, 2),
721 taskStealing: true,
722 tasksStealingOnBackPressure: true,
723 tasksFinishedTimeout: 2000
724 })
725 for (const workerNode of pool.workerNodes) {
726 expect(workerNode.tasksQueueBackPressureSize).toBe(
727 pool.opts.tasksQueueOptions.size
728 )
729 }
730 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
731 new TypeError('Invalid tasks queue options: must be a plain object')
732 )
733 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
734 new RangeError(
735 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
736 )
737 )
738 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
739 new RangeError(
740 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
741 )
742 )
743 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
744 new TypeError('Invalid worker node tasks concurrency: must be an integer')
745 )
746 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
747 new RangeError(
748 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
749 )
750 )
751 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
752 new RangeError(
753 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
754 )
755 )
756 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
757 new TypeError('Invalid worker node tasks queue size: must be an integer')
758 )
759 await pool.destroy()
760 })
761
762 it('Verify that pool info is set', async () => {
763 let pool = new FixedThreadPool(
764 numberOfWorkers,
765 './tests/worker-files/thread/testWorker.mjs'
766 )
767 expect(pool.info).toStrictEqual({
768 version,
769 type: PoolTypes.fixed,
770 worker: WorkerTypes.thread,
771 started: true,
772 ready: true,
773 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
774 minSize: numberOfWorkers,
775 maxSize: numberOfWorkers,
776 workerNodes: numberOfWorkers,
777 idleWorkerNodes: numberOfWorkers,
778 busyWorkerNodes: 0,
779 executedTasks: 0,
780 executingTasks: 0,
781 failedTasks: 0
782 })
783 await pool.destroy()
784 pool = new DynamicClusterPool(
785 Math.floor(numberOfWorkers / 2),
786 numberOfWorkers,
787 './tests/worker-files/cluster/testWorker.js'
788 )
789 expect(pool.info).toStrictEqual({
790 version,
791 type: PoolTypes.dynamic,
792 worker: WorkerTypes.cluster,
793 started: true,
794 ready: true,
795 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
796 minSize: Math.floor(numberOfWorkers / 2),
797 maxSize: numberOfWorkers,
798 workerNodes: Math.floor(numberOfWorkers / 2),
799 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
800 busyWorkerNodes: 0,
801 executedTasks: 0,
802 executingTasks: 0,
803 failedTasks: 0
804 })
805 await pool.destroy()
806 })
807
808 it('Verify that pool worker tasks usage are initialized', async () => {
809 const pool = new FixedClusterPool(
810 numberOfWorkers,
811 './tests/worker-files/cluster/testWorker.js'
812 )
813 for (const workerNode of pool.workerNodes) {
814 expect(workerNode).toBeInstanceOf(WorkerNode)
815 expect(workerNode.usage).toStrictEqual({
816 tasks: {
817 executed: 0,
818 executing: 0,
819 queued: 0,
820 maxQueued: 0,
821 sequentiallyStolen: 0,
822 stolen: 0,
823 failed: 0
824 },
825 runTime: {
826 history: new CircularArray()
827 },
828 waitTime: {
829 history: new CircularArray()
830 },
831 elu: {
832 idle: {
833 history: new CircularArray()
834 },
835 active: {
836 history: new CircularArray()
837 }
838 }
839 })
840 }
841 await pool.destroy()
842 })
843
844 it('Verify that pool worker tasks queue are initialized', async () => {
845 let pool = new FixedClusterPool(
846 numberOfWorkers,
847 './tests/worker-files/cluster/testWorker.js'
848 )
849 for (const workerNode of pool.workerNodes) {
850 expect(workerNode).toBeInstanceOf(WorkerNode)
851 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
854 }
855 await pool.destroy()
856 pool = new DynamicThreadPool(
857 Math.floor(numberOfWorkers / 2),
858 numberOfWorkers,
859 './tests/worker-files/thread/testWorker.mjs'
860 )
861 for (const workerNode of pool.workerNodes) {
862 expect(workerNode).toBeInstanceOf(WorkerNode)
863 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
864 expect(workerNode.tasksQueue.size).toBe(0)
865 expect(workerNode.tasksQueue.maxSize).toBe(0)
866 }
867 await pool.destroy()
868 })
869
870 it('Verify that pool worker info are initialized', async () => {
871 let pool = new FixedClusterPool(
872 numberOfWorkers,
873 './tests/worker-files/cluster/testWorker.js'
874 )
875 for (const workerNode of pool.workerNodes) {
876 expect(workerNode).toBeInstanceOf(WorkerNode)
877 expect(workerNode.info).toStrictEqual({
878 id: expect.any(Number),
879 type: WorkerTypes.cluster,
880 dynamic: false,
881 ready: true
882 })
883 }
884 await pool.destroy()
885 pool = new DynamicThreadPool(
886 Math.floor(numberOfWorkers / 2),
887 numberOfWorkers,
888 './tests/worker-files/thread/testWorker.mjs'
889 )
890 for (const workerNode of pool.workerNodes) {
891 expect(workerNode).toBeInstanceOf(WorkerNode)
892 expect(workerNode.info).toStrictEqual({
893 id: expect.any(Number),
894 type: WorkerTypes.thread,
895 dynamic: false,
896 ready: true
897 })
898 }
899 await pool.destroy()
900 })
901
902 it('Verify that pool statuses are checked at start or destroy', async () => {
903 const pool = new FixedThreadPool(
904 numberOfWorkers,
905 './tests/worker-files/thread/testWorker.mjs'
906 )
907 expect(pool.info.started).toBe(true)
908 expect(pool.info.ready).toBe(true)
909 expect(() => pool.start()).toThrow(
910 new Error('Cannot start an already started pool')
911 )
912 await pool.destroy()
913 expect(pool.info.started).toBe(false)
914 expect(pool.info.ready).toBe(false)
915 await expect(pool.destroy()).rejects.toThrow(
916 new Error('Cannot destroy an already destroyed pool')
917 )
918 })
919
920 it('Verify that pool can be started after initialization', async () => {
921 const pool = new FixedClusterPool(
922 numberOfWorkers,
923 './tests/worker-files/cluster/testWorker.js',
924 {
925 startWorkers: false
926 }
927 )
928 expect(pool.info.started).toBe(false)
929 expect(pool.info.ready).toBe(false)
930 expect(pool.readyEventEmitted).toBe(false)
931 expect(pool.workerNodes).toStrictEqual([])
932 await expect(pool.execute()).rejects.toThrow(
933 new Error('Cannot execute a task on not started pool')
934 )
935 pool.start()
936 expect(pool.info.started).toBe(true)
937 expect(pool.info.ready).toBe(true)
938 await waitPoolEvents(pool, PoolEvents.ready, 1)
939 expect(pool.readyEventEmitted).toBe(true)
940 expect(pool.workerNodes.length).toBe(numberOfWorkers)
941 for (const workerNode of pool.workerNodes) {
942 expect(workerNode).toBeInstanceOf(WorkerNode)
943 }
944 await pool.destroy()
945 })
946
947 it('Verify that pool execute() arguments are checked', async () => {
948 const pool = new FixedClusterPool(
949 numberOfWorkers,
950 './tests/worker-files/cluster/testWorker.js'
951 )
952 await expect(pool.execute(undefined, 0)).rejects.toThrow(
953 new TypeError('name argument must be a string')
954 )
955 await expect(pool.execute(undefined, '')).rejects.toThrow(
956 new TypeError('name argument must not be an empty string')
957 )
958 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
959 new TypeError('transferList argument must be an array')
960 )
961 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
962 "Task function 'unknown' not found"
963 )
964 await pool.destroy()
965 await expect(pool.execute()).rejects.toThrow(
966 new Error('Cannot execute a task on not started pool')
967 )
968 })
969
970 it('Verify that pool worker tasks usage are computed', async () => {
971 const pool = new FixedClusterPool(
972 numberOfWorkers,
973 './tests/worker-files/cluster/testWorker.js'
974 )
975 const promises = new Set()
976 const maxMultiplier = 2
977 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
978 promises.add(pool.execute())
979 }
980 for (const workerNode of pool.workerNodes) {
981 expect(workerNode.usage).toStrictEqual({
982 tasks: {
983 executed: 0,
984 executing: maxMultiplier,
985 queued: 0,
986 maxQueued: 0,
987 sequentiallyStolen: 0,
988 stolen: 0,
989 failed: 0
990 },
991 runTime: {
992 history: expect.any(CircularArray)
993 },
994 waitTime: {
995 history: expect.any(CircularArray)
996 },
997 elu: {
998 idle: {
999 history: expect.any(CircularArray)
1000 },
1001 active: {
1002 history: expect.any(CircularArray)
1003 }
1004 }
1005 })
1006 }
1007 await Promise.all(promises)
1008 for (const workerNode of pool.workerNodes) {
1009 expect(workerNode.usage).toStrictEqual({
1010 tasks: {
1011 executed: maxMultiplier,
1012 executing: 0,
1013 queued: 0,
1014 maxQueued: 0,
1015 sequentiallyStolen: 0,
1016 stolen: 0,
1017 failed: 0
1018 },
1019 runTime: {
1020 history: expect.any(CircularArray)
1021 },
1022 waitTime: {
1023 history: expect.any(CircularArray)
1024 },
1025 elu: {
1026 idle: {
1027 history: expect.any(CircularArray)
1028 },
1029 active: {
1030 history: expect.any(CircularArray)
1031 }
1032 }
1033 })
1034 }
1035 await pool.destroy()
1036 })
1037
1038 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1039 const pool = new DynamicThreadPool(
1040 Math.floor(numberOfWorkers / 2),
1041 numberOfWorkers,
1042 './tests/worker-files/thread/testWorker.mjs'
1043 )
1044 const promises = new Set()
1045 const maxMultiplier = 2
1046 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1047 promises.add(pool.execute())
1048 }
1049 await Promise.all(promises)
1050 for (const workerNode of pool.workerNodes) {
1051 expect(workerNode.usage).toStrictEqual({
1052 tasks: {
1053 executed: expect.any(Number),
1054 executing: 0,
1055 queued: 0,
1056 maxQueued: 0,
1057 sequentiallyStolen: 0,
1058 stolen: 0,
1059 failed: 0
1060 },
1061 runTime: {
1062 history: expect.any(CircularArray)
1063 },
1064 waitTime: {
1065 history: expect.any(CircularArray)
1066 },
1067 elu: {
1068 idle: {
1069 history: expect.any(CircularArray)
1070 },
1071 active: {
1072 history: expect.any(CircularArray)
1073 }
1074 }
1075 })
1076 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1077 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1078 numberOfWorkers * maxMultiplier
1079 )
1080 expect(workerNode.usage.runTime.history.length).toBe(0)
1081 expect(workerNode.usage.waitTime.history.length).toBe(0)
1082 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1083 expect(workerNode.usage.elu.active.history.length).toBe(0)
1084 }
1085 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1086 for (const workerNode of pool.workerNodes) {
1087 expect(workerNode.usage).toStrictEqual({
1088 tasks: {
1089 executed: 0,
1090 executing: 0,
1091 queued: 0,
1092 maxQueued: 0,
1093 sequentiallyStolen: 0,
1094 stolen: 0,
1095 failed: 0
1096 },
1097 runTime: {
1098 history: expect.any(CircularArray)
1099 },
1100 waitTime: {
1101 history: expect.any(CircularArray)
1102 },
1103 elu: {
1104 idle: {
1105 history: expect.any(CircularArray)
1106 },
1107 active: {
1108 history: expect.any(CircularArray)
1109 }
1110 }
1111 })
1112 expect(workerNode.usage.runTime.history.length).toBe(0)
1113 expect(workerNode.usage.waitTime.history.length).toBe(0)
1114 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1115 expect(workerNode.usage.elu.active.history.length).toBe(0)
1116 }
1117 await pool.destroy()
1118 })
1119
1120 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1121 const pool = new DynamicClusterPool(
1122 Math.floor(numberOfWorkers / 2),
1123 numberOfWorkers,
1124 './tests/worker-files/cluster/testWorker.js'
1125 )
1126 expect(pool.emitter.eventNames()).toStrictEqual([])
1127 let poolInfo
1128 let poolReady = 0
1129 pool.emitter.on(PoolEvents.ready, info => {
1130 ++poolReady
1131 poolInfo = info
1132 })
1133 await waitPoolEvents(pool, PoolEvents.ready, 1)
1134 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1135 expect(poolReady).toBe(1)
1136 expect(poolInfo).toStrictEqual({
1137 version,
1138 type: PoolTypes.dynamic,
1139 worker: WorkerTypes.cluster,
1140 started: true,
1141 ready: true,
1142 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1143 minSize: expect.any(Number),
1144 maxSize: expect.any(Number),
1145 workerNodes: expect.any(Number),
1146 idleWorkerNodes: expect.any(Number),
1147 busyWorkerNodes: expect.any(Number),
1148 executedTasks: expect.any(Number),
1149 executingTasks: expect.any(Number),
1150 failedTasks: expect.any(Number)
1151 })
1152 await pool.destroy()
1153 })
1154
1155 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1156 const pool = new FixedThreadPool(
1157 numberOfWorkers,
1158 './tests/worker-files/thread/testWorker.mjs'
1159 )
1160 expect(pool.emitter.eventNames()).toStrictEqual([])
1161 const promises = new Set()
1162 let poolBusy = 0
1163 let poolInfo
1164 pool.emitter.on(PoolEvents.busy, info => {
1165 ++poolBusy
1166 poolInfo = info
1167 })
1168 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1169 for (let i = 0; i < numberOfWorkers * 2; i++) {
1170 promises.add(pool.execute())
1171 }
1172 await Promise.all(promises)
1173 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1174 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1175 expect(poolBusy).toBe(numberOfWorkers + 1)
1176 expect(poolInfo).toStrictEqual({
1177 version,
1178 type: PoolTypes.fixed,
1179 worker: WorkerTypes.thread,
1180 started: true,
1181 ready: true,
1182 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1183 minSize: expect.any(Number),
1184 maxSize: expect.any(Number),
1185 workerNodes: expect.any(Number),
1186 idleWorkerNodes: expect.any(Number),
1187 busyWorkerNodes: expect.any(Number),
1188 executedTasks: expect.any(Number),
1189 executingTasks: expect.any(Number),
1190 failedTasks: expect.any(Number)
1191 })
1192 await pool.destroy()
1193 })
1194
1195 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1196 const pool = new DynamicThreadPool(
1197 Math.floor(numberOfWorkers / 2),
1198 numberOfWorkers,
1199 './tests/worker-files/thread/testWorker.mjs'
1200 )
1201 expect(pool.emitter.eventNames()).toStrictEqual([])
1202 const promises = new Set()
1203 let poolFull = 0
1204 let poolInfo
1205 pool.emitter.on(PoolEvents.full, info => {
1206 ++poolFull
1207 poolInfo = info
1208 })
1209 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1210 for (let i = 0; i < numberOfWorkers * 2; i++) {
1211 promises.add(pool.execute())
1212 }
1213 await Promise.all(promises)
1214 expect(poolFull).toBe(1)
1215 expect(poolInfo).toStrictEqual({
1216 version,
1217 type: PoolTypes.dynamic,
1218 worker: WorkerTypes.thread,
1219 started: true,
1220 ready: true,
1221 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1222 minSize: expect.any(Number),
1223 maxSize: expect.any(Number),
1224 workerNodes: expect.any(Number),
1225 idleWorkerNodes: expect.any(Number),
1226 busyWorkerNodes: expect.any(Number),
1227 executedTasks: expect.any(Number),
1228 executingTasks: expect.any(Number),
1229 failedTasks: expect.any(Number)
1230 })
1231 await pool.destroy()
1232 })
1233
1234 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1235 const pool = new FixedThreadPool(
1236 numberOfWorkers,
1237 './tests/worker-files/thread/testWorker.mjs',
1238 {
1239 enableTasksQueue: true
1240 }
1241 )
1242 stub(pool, 'hasBackPressure').returns(true)
1243 expect(pool.emitter.eventNames()).toStrictEqual([])
1244 const promises = new Set()
1245 let poolBackPressure = 0
1246 let poolInfo
1247 pool.emitter.on(PoolEvents.backPressure, info => {
1248 ++poolBackPressure
1249 poolInfo = info
1250 })
1251 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1252 for (let i = 0; i < numberOfWorkers + 1; i++) {
1253 promises.add(pool.execute())
1254 }
1255 await Promise.all(promises)
1256 expect(poolBackPressure).toBe(1)
1257 expect(poolInfo).toStrictEqual({
1258 version,
1259 type: PoolTypes.fixed,
1260 worker: WorkerTypes.thread,
1261 started: true,
1262 ready: true,
1263 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1264 minSize: expect.any(Number),
1265 maxSize: expect.any(Number),
1266 workerNodes: expect.any(Number),
1267 idleWorkerNodes: expect.any(Number),
1268 busyWorkerNodes: expect.any(Number),
1269 executedTasks: expect.any(Number),
1270 executingTasks: expect.any(Number),
1271 maxQueuedTasks: expect.any(Number),
1272 queuedTasks: expect.any(Number),
1273 backPressure: true,
1274 stolenTasks: expect.any(Number),
1275 failedTasks: expect.any(Number)
1276 })
1277 expect(pool.hasBackPressure.callCount).toBe(5)
1278 await pool.destroy()
1279 })
1280
1281 it('Verify that destroy() waits for queued tasks to finish', async () => {
1282 const tasksFinishedTimeout = 2500
1283 const pool = new FixedThreadPool(
1284 numberOfWorkers,
1285 './tests/worker-files/thread/asyncWorker.mjs',
1286 {
1287 enableTasksQueue: true,
1288 tasksQueueOptions: { tasksFinishedTimeout }
1289 }
1290 )
1291 const maxMultiplier = 4
1292 let tasksFinished = 0
1293 for (const workerNode of pool.workerNodes) {
1294 workerNode.on('taskFinished', () => {
1295 ++tasksFinished
1296 })
1297 }
1298 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1299 pool.execute()
1300 }
1301 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1302 const startTime = performance.now()
1303 await pool.destroy()
1304 const elapsedTime = performance.now() - startTime
1305 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1306 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1307 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1308 })
1309
1310 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1311 const tasksFinishedTimeout = 1000
1312 const pool = new FixedThreadPool(
1313 numberOfWorkers,
1314 './tests/worker-files/thread/asyncWorker.mjs',
1315 {
1316 enableTasksQueue: true,
1317 tasksQueueOptions: { tasksFinishedTimeout }
1318 }
1319 )
1320 const maxMultiplier = 4
1321 let tasksFinished = 0
1322 for (const workerNode of pool.workerNodes) {
1323 workerNode.on('taskFinished', () => {
1324 ++tasksFinished
1325 })
1326 }
1327 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1328 pool.execute()
1329 }
1330 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1331 const startTime = performance.now()
1332 await pool.destroy()
1333 const elapsedTime = performance.now() - startTime
1334 expect(tasksFinished).toBe(0)
1335 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1336 })
1337
1338 it('Verify that pool asynchronous resource track tasks execution', async () => {
1339 let taskAsyncId
1340 let initCalls = 0
1341 let beforeCalls = 0
1342 let afterCalls = 0
1343 let resolveCalls = 0
1344 const hook = createHook({
1345 init (asyncId, type) {
1346 if (type === 'poolifier:task') {
1347 initCalls++
1348 taskAsyncId = asyncId
1349 }
1350 },
1351 before (asyncId) {
1352 if (asyncId === taskAsyncId) beforeCalls++
1353 },
1354 after (asyncId) {
1355 if (asyncId === taskAsyncId) afterCalls++
1356 },
1357 promiseResolve () {
1358 if (executionAsyncId() === taskAsyncId) resolveCalls++
1359 }
1360 })
1361 const pool = new FixedThreadPool(
1362 numberOfWorkers,
1363 './tests/worker-files/thread/testWorker.mjs'
1364 )
1365 hook.enable()
1366 await pool.execute()
1367 hook.disable()
1368 expect(initCalls).toBe(1)
1369 expect(beforeCalls).toBe(1)
1370 expect(afterCalls).toBe(1)
1371 expect(resolveCalls).toBe(1)
1372 await pool.destroy()
1373 })
1374
1375 it('Verify that hasTaskFunction() is working', async () => {
1376 const dynamicThreadPool = new DynamicThreadPool(
1377 Math.floor(numberOfWorkers / 2),
1378 numberOfWorkers,
1379 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1380 )
1381 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1382 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1383 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1384 true
1385 )
1386 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1387 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1388 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1389 await dynamicThreadPool.destroy()
1390 const fixedClusterPool = new FixedClusterPool(
1391 numberOfWorkers,
1392 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1393 )
1394 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1395 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1396 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1397 true
1398 )
1399 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1400 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1401 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1402 await fixedClusterPool.destroy()
1403 })
1404
1405 it('Verify that addTaskFunction() is working', async () => {
1406 const dynamicThreadPool = new DynamicThreadPool(
1407 Math.floor(numberOfWorkers / 2),
1408 numberOfWorkers,
1409 './tests/worker-files/thread/testWorker.mjs'
1410 )
1411 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1412 await expect(
1413 dynamicThreadPool.addTaskFunction(0, () => {})
1414 ).rejects.toThrow(new TypeError('name argument must be a string'))
1415 await expect(
1416 dynamicThreadPool.addTaskFunction('', () => {})
1417 ).rejects.toThrow(
1418 new TypeError('name argument must not be an empty string')
1419 )
1420 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1421 new TypeError('fn argument must be a function')
1422 )
1423 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1424 new TypeError('fn argument must be a function')
1425 )
1426 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1427 DEFAULT_TASK_NAME,
1428 'test'
1429 ])
1430 const echoTaskFunction = data => {
1431 return data
1432 }
1433 await expect(
1434 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1435 ).resolves.toBe(true)
1436 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1437 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1438 echoTaskFunction
1439 )
1440 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1441 DEFAULT_TASK_NAME,
1442 'test',
1443 'echo'
1444 ])
1445 const taskFunctionData = { test: 'test' }
1446 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1447 expect(echoResult).toStrictEqual(taskFunctionData)
1448 for (const workerNode of dynamicThreadPool.workerNodes) {
1449 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1450 tasks: {
1451 executed: expect.any(Number),
1452 executing: 0,
1453 queued: 0,
1454 sequentiallyStolen: 0,
1455 stolen: 0,
1456 failed: 0
1457 },
1458 runTime: {
1459 history: new CircularArray()
1460 },
1461 waitTime: {
1462 history: new CircularArray()
1463 },
1464 elu: {
1465 idle: {
1466 history: new CircularArray()
1467 },
1468 active: {
1469 history: new CircularArray()
1470 }
1471 }
1472 })
1473 }
1474 await dynamicThreadPool.destroy()
1475 })
1476
1477 it('Verify that removeTaskFunction() is working', async () => {
1478 const dynamicThreadPool = new DynamicThreadPool(
1479 Math.floor(numberOfWorkers / 2),
1480 numberOfWorkers,
1481 './tests/worker-files/thread/testWorker.mjs'
1482 )
1483 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1484 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1485 DEFAULT_TASK_NAME,
1486 'test'
1487 ])
1488 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1489 new Error('Cannot remove a task function not handled on the pool side')
1490 )
1491 const echoTaskFunction = data => {
1492 return data
1493 }
1494 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1495 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1496 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1497 echoTaskFunction
1498 )
1499 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1500 DEFAULT_TASK_NAME,
1501 'test',
1502 'echo'
1503 ])
1504 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1505 true
1506 )
1507 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1508 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1509 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1510 DEFAULT_TASK_NAME,
1511 'test'
1512 ])
1513 await dynamicThreadPool.destroy()
1514 })
1515
1516 it('Verify that listTaskFunctionNames() is working', async () => {
1517 const dynamicThreadPool = new DynamicThreadPool(
1518 Math.floor(numberOfWorkers / 2),
1519 numberOfWorkers,
1520 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1521 )
1522 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1523 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1524 DEFAULT_TASK_NAME,
1525 'jsonIntegerSerialization',
1526 'factorial',
1527 'fibonacci'
1528 ])
1529 await dynamicThreadPool.destroy()
1530 const fixedClusterPool = new FixedClusterPool(
1531 numberOfWorkers,
1532 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1533 )
1534 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1535 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1536 DEFAULT_TASK_NAME,
1537 'jsonIntegerSerialization',
1538 'factorial',
1539 'fibonacci'
1540 ])
1541 await fixedClusterPool.destroy()
1542 })
1543
1544 it('Verify that setDefaultTaskFunction() is working', async () => {
1545 const dynamicThreadPool = new DynamicThreadPool(
1546 Math.floor(numberOfWorkers / 2),
1547 numberOfWorkers,
1548 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1549 )
1550 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1551 const workerId = dynamicThreadPool.workerNodes[0].info.id
1552 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1553 new Error(
1554 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1555 )
1556 )
1557 await expect(
1558 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1559 ).rejects.toThrow(
1560 new Error(
1561 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1562 )
1563 )
1564 await expect(
1565 dynamicThreadPool.setDefaultTaskFunction('unknown')
1566 ).rejects.toThrow(
1567 new Error(
1568 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1569 )
1570 )
1571 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1572 DEFAULT_TASK_NAME,
1573 'jsonIntegerSerialization',
1574 'factorial',
1575 'fibonacci'
1576 ])
1577 await expect(
1578 dynamicThreadPool.setDefaultTaskFunction('factorial')
1579 ).resolves.toBe(true)
1580 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1581 DEFAULT_TASK_NAME,
1582 'factorial',
1583 'jsonIntegerSerialization',
1584 'fibonacci'
1585 ])
1586 await expect(
1587 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1588 ).resolves.toBe(true)
1589 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1590 DEFAULT_TASK_NAME,
1591 'fibonacci',
1592 'jsonIntegerSerialization',
1593 'factorial'
1594 ])
1595 await dynamicThreadPool.destroy()
1596 })
1597
1598 it('Verify that multiple task functions worker is working', async () => {
1599 const pool = new DynamicClusterPool(
1600 Math.floor(numberOfWorkers / 2),
1601 numberOfWorkers,
1602 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1603 )
1604 const data = { n: 10 }
1605 const result0 = await pool.execute(data)
1606 expect(result0).toStrictEqual({ ok: 1 })
1607 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1608 expect(result1).toStrictEqual({ ok: 1 })
1609 const result2 = await pool.execute(data, 'factorial')
1610 expect(result2).toBe(3628800)
1611 const result3 = await pool.execute(data, 'fibonacci')
1612 expect(result3).toBe(55)
1613 expect(pool.info.executingTasks).toBe(0)
1614 expect(pool.info.executedTasks).toBe(4)
1615 for (const workerNode of pool.workerNodes) {
1616 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1617 DEFAULT_TASK_NAME,
1618 'jsonIntegerSerialization',
1619 'factorial',
1620 'fibonacci'
1621 ])
1622 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1623 for (const name of pool.listTaskFunctionNames()) {
1624 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1625 tasks: {
1626 executed: expect.any(Number),
1627 executing: 0,
1628 failed: 0,
1629 queued: 0,
1630 sequentiallyStolen: 0,
1631 stolen: 0
1632 },
1633 runTime: {
1634 history: expect.any(CircularArray)
1635 },
1636 waitTime: {
1637 history: expect.any(CircularArray)
1638 },
1639 elu: {
1640 idle: {
1641 history: expect.any(CircularArray)
1642 },
1643 active: {
1644 history: expect.any(CircularArray)
1645 }
1646 }
1647 })
1648 expect(
1649 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1650 ).toBeGreaterThan(0)
1651 }
1652 expect(
1653 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1654 ).toStrictEqual(
1655 workerNode.getTaskFunctionWorkerUsage(
1656 workerNode.info.taskFunctionNames[1]
1657 )
1658 )
1659 }
1660 await pool.destroy()
1661 })
1662
1663 it('Verify sendKillMessageToWorker()', async () => {
1664 const pool = new DynamicClusterPool(
1665 Math.floor(numberOfWorkers / 2),
1666 numberOfWorkers,
1667 './tests/worker-files/cluster/testWorker.js'
1668 )
1669 const workerNodeKey = 0
1670 await expect(
1671 pool.sendKillMessageToWorker(workerNodeKey)
1672 ).resolves.toBeUndefined()
1673 await expect(
1674 pool.sendKillMessageToWorker(numberOfWorkers)
1675 ).rejects.toStrictEqual(
1676 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1677 )
1678 await pool.destroy()
1679 })
1680
1681 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1682 const pool = new DynamicClusterPool(
1683 Math.floor(numberOfWorkers / 2),
1684 numberOfWorkers,
1685 './tests/worker-files/cluster/testWorker.js'
1686 )
1687 const workerNodeKey = 0
1688 await expect(
1689 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1690 taskFunctionOperation: 'add',
1691 taskFunctionName: 'empty',
1692 taskFunction: (() => {}).toString()
1693 })
1694 ).resolves.toBe(true)
1695 expect(
1696 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1697 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1698 await pool.destroy()
1699 })
1700
1701 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1702 const pool = new DynamicClusterPool(
1703 Math.floor(numberOfWorkers / 2),
1704 numberOfWorkers,
1705 './tests/worker-files/cluster/testWorker.js'
1706 )
1707 await expect(
1708 pool.sendTaskFunctionOperationToWorkers({
1709 taskFunctionOperation: 'add',
1710 taskFunctionName: 'empty',
1711 taskFunction: (() => {}).toString()
1712 })
1713 ).resolves.toBe(true)
1714 for (const workerNode of pool.workerNodes) {
1715 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1716 DEFAULT_TASK_NAME,
1717 'test',
1718 'empty'
1719 ])
1720 }
1721 await pool.destroy()
1722 })
1723 })