test: fix test expectation
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
8 import {
9 DynamicClusterPool,
10 DynamicThreadPool,
11 FixedClusterPool,
12 FixedThreadPool,
13 PoolEvents,
14 PoolTypes,
15 WorkerChoiceStrategies,
16 WorkerTypes
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
23
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
26 readFileSync(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
28 'utf8'
29 )
30 ).version
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
33 isMain () {
34 return false
35 }
36 }
37
38 afterEach(() => {
39 restore()
40 })
41
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
52 expect(
53 () =>
54 new StubPoolWithIsMain(
55 numberOfWorkers,
56 './tests/worker-files/thread/testWorker.mjs',
57 {
58 errorHandler: e => console.error(e)
59 }
60 )
61 ).toThrow(
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
65 )
66 })
67
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
71 './tests/worker-files/thread/testWorker.mjs'
72 )
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
76 await pool.destroy()
77 })
78
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
96 './tests/worker-files/thread/testWorker.mjs'
97 )
98 ).toThrow(
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
109 ).toThrow(
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
120 ).toThrow(
121 new TypeError(
122 'Cannot instantiate a pool with a non safe integer number of workers'
123 )
124 )
125 })
126
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
143 it('Verify that dynamic pool sizing is checked', () => {
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
151 ).toThrow(
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
161 './tests/worker-files/thread/testWorker.mjs'
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
173 './tests/worker-files/cluster/testWorker.js'
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
180 expect(
181 () =>
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
202 )
203 )
204 expect(
205 () =>
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
211 ).toThrow(
212 new RangeError(
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
214 )
215 )
216 })
217
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
221 './tests/worker-files/thread/testWorker.mjs'
222 )
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
232 retries:
233 pool.info.maxSize +
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
242 })
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual({
246 retries:
247 pool.info.maxSize +
248 Object.keys(workerChoiceStrategy.opts.weights).length,
249 runTime: { median: false },
250 waitTime: { median: false },
251 elu: { median: false },
252 weights: expect.objectContaining({
253 0: expect.any(Number),
254 [pool.info.maxSize - 1]: expect.any(Number)
255 })
256 })
257 }
258 await pool.destroy()
259 const testHandler = () => console.info('test handler executed')
260 pool = new FixedThreadPool(
261 numberOfWorkers,
262 './tests/worker-files/thread/testWorker.mjs',
263 {
264 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
265 workerChoiceStrategyOptions: {
266 runTime: { median: true },
267 weights: { 0: 300, 1: 200 }
268 },
269 enableEvents: false,
270 restartWorkerOnError: false,
271 enableTasksQueue: true,
272 tasksQueueOptions: { concurrency: 2 },
273 messageHandler: testHandler,
274 errorHandler: testHandler,
275 onlineHandler: testHandler,
276 exitHandler: testHandler
277 }
278 )
279 expect(pool.emitter).toBeUndefined()
280 expect(pool.opts).toStrictEqual({
281 startWorkers: true,
282 enableEvents: false,
283 restartWorkerOnError: false,
284 enableTasksQueue: true,
285 tasksQueueOptions: {
286 concurrency: 2,
287 size: Math.pow(numberOfWorkers, 2),
288 taskStealing: true,
289 tasksStealingOnBackPressure: true,
290 tasksFinishedTimeout: 2000
291 },
292 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
293 workerChoiceStrategyOptions: {
294 runTime: { median: true },
295 weights: { 0: 300, 1: 200 }
296 },
297 onlineHandler: testHandler,
298 messageHandler: testHandler,
299 errorHandler: testHandler,
300 exitHandler: testHandler
301 })
302 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
303 retries:
304 pool.info.maxSize +
305 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
306 runTime: { median: true },
307 waitTime: { median: false },
308 elu: { median: false },
309 weights: { 0: 300, 1: 200 }
310 })
311 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
312 .workerChoiceStrategies) {
313 expect(workerChoiceStrategy.opts).toStrictEqual({
314 retries:
315 pool.info.maxSize +
316 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
317 runTime: { median: true },
318 waitTime: { median: false },
319 elu: { median: false },
320 weights: { 0: 300, 1: 200 }
321 })
322 }
323 await pool.destroy()
324 })
325
326 it('Verify that pool options are validated', () => {
327 expect(
328 () =>
329 new FixedThreadPool(
330 numberOfWorkers,
331 './tests/worker-files/thread/testWorker.mjs',
332 {
333 workerChoiceStrategy: 'invalidStrategy'
334 }
335 )
336 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
337 expect(
338 () =>
339 new FixedThreadPool(
340 numberOfWorkers,
341 './tests/worker-files/thread/testWorker.mjs',
342 {
343 workerChoiceStrategyOptions: { weights: {} }
344 }
345 )
346 ).toThrow(
347 new Error(
348 'Invalid worker choice strategy options: must have a weight for each worker node'
349 )
350 )
351 expect(
352 () =>
353 new FixedThreadPool(
354 numberOfWorkers,
355 './tests/worker-files/thread/testWorker.mjs',
356 {
357 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
358 }
359 )
360 ).toThrow(
361 new Error(
362 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
363 )
364 )
365 expect(
366 () =>
367 new FixedThreadPool(
368 numberOfWorkers,
369 './tests/worker-files/thread/testWorker.mjs',
370 {
371 enableTasksQueue: true,
372 tasksQueueOptions: 'invalidTasksQueueOptions'
373 }
374 )
375 ).toThrow(
376 new TypeError('Invalid tasks queue options: must be a plain object')
377 )
378 expect(
379 () =>
380 new FixedThreadPool(
381 numberOfWorkers,
382 './tests/worker-files/thread/testWorker.mjs',
383 {
384 enableTasksQueue: true,
385 tasksQueueOptions: { concurrency: 0 }
386 }
387 )
388 ).toThrow(
389 new RangeError(
390 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
391 )
392 )
393 expect(
394 () =>
395 new FixedThreadPool(
396 numberOfWorkers,
397 './tests/worker-files/thread/testWorker.mjs',
398 {
399 enableTasksQueue: true,
400 tasksQueueOptions: { concurrency: -1 }
401 }
402 )
403 ).toThrow(
404 new RangeError(
405 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
406 )
407 )
408 expect(
409 () =>
410 new FixedThreadPool(
411 numberOfWorkers,
412 './tests/worker-files/thread/testWorker.mjs',
413 {
414 enableTasksQueue: true,
415 tasksQueueOptions: { concurrency: 0.2 }
416 }
417 )
418 ).toThrow(
419 new TypeError('Invalid worker node tasks concurrency: must be an integer')
420 )
421 expect(
422 () =>
423 new FixedThreadPool(
424 numberOfWorkers,
425 './tests/worker-files/thread/testWorker.mjs',
426 {
427 enableTasksQueue: true,
428 tasksQueueOptions: { size: 0 }
429 }
430 )
431 ).toThrow(
432 new RangeError(
433 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
434 )
435 )
436 expect(
437 () =>
438 new FixedThreadPool(
439 numberOfWorkers,
440 './tests/worker-files/thread/testWorker.mjs',
441 {
442 enableTasksQueue: true,
443 tasksQueueOptions: { size: -1 }
444 }
445 )
446 ).toThrow(
447 new RangeError(
448 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
449 )
450 )
451 expect(
452 () =>
453 new FixedThreadPool(
454 numberOfWorkers,
455 './tests/worker-files/thread/testWorker.mjs',
456 {
457 enableTasksQueue: true,
458 tasksQueueOptions: { size: 0.2 }
459 }
460 )
461 ).toThrow(
462 new TypeError('Invalid worker node tasks queue size: must be an integer')
463 )
464 })
465
466 it('Verify that pool worker choice strategy options can be set', async () => {
467 const pool = new FixedThreadPool(
468 numberOfWorkers,
469 './tests/worker-files/thread/testWorker.mjs',
470 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
471 )
472 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
473 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
474 retries:
475 pool.info.maxSize +
476 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
477 runTime: { median: false },
478 waitTime: { median: false },
479 elu: { median: false },
480 weights: expect.objectContaining({
481 0: expect.any(Number),
482 [pool.info.maxSize - 1]: expect.any(Number)
483 })
484 })
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
487 expect(workerChoiceStrategy.opts).toStrictEqual(
488 expect.objectContaining({
489 retries:
490 pool.info.maxSize +
491 Object.keys(workerChoiceStrategy.opts.weights).length,
492 runTime: { median: false },
493 waitTime: { median: false },
494 elu: { median: false }
495 })
496 )
497 }
498 expect(
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
500 ).toStrictEqual({
501 runTime: {
502 aggregate: true,
503 average: true,
504 median: false
505 },
506 waitTime: {
507 aggregate: false,
508 average: false,
509 median: false
510 },
511 elu: {
512 aggregate: true,
513 average: true,
514 median: false
515 }
516 })
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: true },
519 elu: { median: true }
520 })
521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
522 runTime: { median: true },
523 elu: { median: true }
524 })
525 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
526 retries:
527 pool.info.maxSize +
528 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
529 runTime: { median: true },
530 waitTime: { median: false },
531 elu: { median: true },
532 weights: expect.objectContaining({
533 0: expect.any(Number),
534 [pool.info.maxSize - 1]: expect.any(Number)
535 })
536 })
537 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
538 .workerChoiceStrategies) {
539 expect(workerChoiceStrategy.opts).toStrictEqual(
540 expect.objectContaining({
541 retries:
542 pool.info.maxSize +
543 Object.keys(workerChoiceStrategy.opts.weights).length,
544 runTime: { median: true },
545 waitTime: { median: false },
546 elu: { median: true }
547 })
548 )
549 }
550 expect(
551 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
552 ).toStrictEqual({
553 runTime: {
554 aggregate: true,
555 average: false,
556 median: true
557 },
558 waitTime: {
559 aggregate: false,
560 average: false,
561 median: false
562 },
563 elu: {
564 aggregate: true,
565 average: false,
566 median: true
567 }
568 })
569 pool.setWorkerChoiceStrategyOptions({
570 runTime: { median: false },
571 elu: { median: false }
572 })
573 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
574 runTime: { median: false },
575 elu: { median: false }
576 })
577 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
578 retries:
579 pool.info.maxSize +
580 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
581 runTime: { median: false },
582 waitTime: { median: false },
583 elu: { median: false },
584 weights: expect.objectContaining({
585 0: expect.any(Number),
586 [pool.info.maxSize - 1]: expect.any(Number)
587 })
588 })
589 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
590 .workerChoiceStrategies) {
591 expect(workerChoiceStrategy.opts).toStrictEqual(
592 expect.objectContaining({
593 retries:
594 pool.info.maxSize +
595 Object.keys(workerChoiceStrategy.opts.weights).length,
596 runTime: { median: false },
597 waitTime: { median: false },
598 elu: { median: false }
599 })
600 )
601 }
602 expect(
603 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
604 ).toStrictEqual({
605 runTime: {
606 aggregate: true,
607 average: true,
608 median: false
609 },
610 waitTime: {
611 aggregate: false,
612 average: false,
613 median: false
614 },
615 elu: {
616 aggregate: true,
617 average: true,
618 median: false
619 }
620 })
621 expect(() =>
622 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
623 ).toThrow(
624 new TypeError(
625 'Invalid worker choice strategy options: must be a plain object'
626 )
627 )
628 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
629 new Error(
630 'Invalid worker choice strategy options: must have a weight for each worker node'
631 )
632 )
633 expect(() =>
634 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
635 ).toThrow(
636 new Error(
637 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
638 )
639 )
640 await pool.destroy()
641 })
642
643 it('Verify that pool tasks queue can be enabled/disabled', async () => {
644 const pool = new FixedThreadPool(
645 numberOfWorkers,
646 './tests/worker-files/thread/testWorker.mjs'
647 )
648 expect(pool.opts.enableTasksQueue).toBe(false)
649 expect(pool.opts.tasksQueueOptions).toBeUndefined()
650 pool.enableTasksQueue(true)
651 expect(pool.opts.enableTasksQueue).toBe(true)
652 expect(pool.opts.tasksQueueOptions).toStrictEqual({
653 concurrency: 1,
654 size: Math.pow(numberOfWorkers, 2),
655 taskStealing: true,
656 tasksStealingOnBackPressure: true,
657 tasksFinishedTimeout: 2000
658 })
659 pool.enableTasksQueue(true, { concurrency: 2 })
660 expect(pool.opts.enableTasksQueue).toBe(true)
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
662 concurrency: 2,
663 size: Math.pow(numberOfWorkers, 2),
664 taskStealing: true,
665 tasksStealingOnBackPressure: true,
666 tasksFinishedTimeout: 2000
667 })
668 pool.enableTasksQueue(false)
669 expect(pool.opts.enableTasksQueue).toBe(false)
670 expect(pool.opts.tasksQueueOptions).toBeUndefined()
671 await pool.destroy()
672 })
673
674 it('Verify that pool tasks queue options can be set', async () => {
675 const pool = new FixedThreadPool(
676 numberOfWorkers,
677 './tests/worker-files/thread/testWorker.mjs',
678 { enableTasksQueue: true }
679 )
680 expect(pool.opts.tasksQueueOptions).toStrictEqual({
681 concurrency: 1,
682 size: Math.pow(numberOfWorkers, 2),
683 taskStealing: true,
684 tasksStealingOnBackPressure: true,
685 tasksFinishedTimeout: 2000
686 })
687 for (const workerNode of pool.workerNodes) {
688 expect(workerNode.tasksQueueBackPressureSize).toBe(
689 pool.opts.tasksQueueOptions.size
690 )
691 }
692 pool.setTasksQueueOptions({
693 concurrency: 2,
694 size: 2,
695 taskStealing: false,
696 tasksStealingOnBackPressure: false,
697 tasksFinishedTimeout: 3000
698 })
699 expect(pool.opts.tasksQueueOptions).toStrictEqual({
700 concurrency: 2,
701 size: 2,
702 taskStealing: false,
703 tasksStealingOnBackPressure: false,
704 tasksFinishedTimeout: 3000
705 })
706 for (const workerNode of pool.workerNodes) {
707 expect(workerNode.tasksQueueBackPressureSize).toBe(
708 pool.opts.tasksQueueOptions.size
709 )
710 }
711 pool.setTasksQueueOptions({
712 concurrency: 1,
713 taskStealing: true,
714 tasksStealingOnBackPressure: true
715 })
716 expect(pool.opts.tasksQueueOptions).toStrictEqual({
717 concurrency: 1,
718 size: Math.pow(numberOfWorkers, 2),
719 taskStealing: true,
720 tasksStealingOnBackPressure: true,
721 tasksFinishedTimeout: 2000
722 })
723 for (const workerNode of pool.workerNodes) {
724 expect(workerNode.tasksQueueBackPressureSize).toBe(
725 pool.opts.tasksQueueOptions.size
726 )
727 }
728 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
729 new TypeError('Invalid tasks queue options: must be a plain object')
730 )
731 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
732 new RangeError(
733 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
734 )
735 )
736 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
737 new RangeError(
738 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
739 )
740 )
741 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
742 new TypeError('Invalid worker node tasks concurrency: must be an integer')
743 )
744 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
745 new RangeError(
746 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
747 )
748 )
749 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
750 new RangeError(
751 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
752 )
753 )
754 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
755 new TypeError('Invalid worker node tasks queue size: must be an integer')
756 )
757 await pool.destroy()
758 })
759
760 it('Verify that pool info is set', async () => {
761 let pool = new FixedThreadPool(
762 numberOfWorkers,
763 './tests/worker-files/thread/testWorker.mjs'
764 )
765 expect(pool.info).toStrictEqual({
766 version,
767 type: PoolTypes.fixed,
768 worker: WorkerTypes.thread,
769 started: true,
770 ready: true,
771 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
772 minSize: numberOfWorkers,
773 maxSize: numberOfWorkers,
774 workerNodes: numberOfWorkers,
775 idleWorkerNodes: numberOfWorkers,
776 busyWorkerNodes: 0,
777 executedTasks: 0,
778 executingTasks: 0,
779 failedTasks: 0
780 })
781 await pool.destroy()
782 pool = new DynamicClusterPool(
783 Math.floor(numberOfWorkers / 2),
784 numberOfWorkers,
785 './tests/worker-files/cluster/testWorker.js'
786 )
787 expect(pool.info).toStrictEqual({
788 version,
789 type: PoolTypes.dynamic,
790 worker: WorkerTypes.cluster,
791 started: true,
792 ready: true,
793 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
794 minSize: Math.floor(numberOfWorkers / 2),
795 maxSize: numberOfWorkers,
796 workerNodes: Math.floor(numberOfWorkers / 2),
797 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
798 busyWorkerNodes: 0,
799 executedTasks: 0,
800 executingTasks: 0,
801 failedTasks: 0
802 })
803 await pool.destroy()
804 })
805
806 it('Verify that pool worker tasks usage are initialized', async () => {
807 const pool = new FixedClusterPool(
808 numberOfWorkers,
809 './tests/worker-files/cluster/testWorker.js'
810 )
811 for (const workerNode of pool.workerNodes) {
812 expect(workerNode).toBeInstanceOf(WorkerNode)
813 expect(workerNode.usage).toStrictEqual({
814 tasks: {
815 executed: 0,
816 executing: 0,
817 queued: 0,
818 maxQueued: 0,
819 sequentiallyStolen: 0,
820 stolen: 0,
821 failed: 0
822 },
823 runTime: {
824 history: new CircularArray()
825 },
826 waitTime: {
827 history: new CircularArray()
828 },
829 elu: {
830 idle: {
831 history: new CircularArray()
832 },
833 active: {
834 history: new CircularArray()
835 }
836 }
837 })
838 }
839 await pool.destroy()
840 })
841
842 it('Verify that pool worker tasks queue are initialized', async () => {
843 let pool = new FixedClusterPool(
844 numberOfWorkers,
845 './tests/worker-files/cluster/testWorker.js'
846 )
847 for (const workerNode of pool.workerNodes) {
848 expect(workerNode).toBeInstanceOf(WorkerNode)
849 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
850 expect(workerNode.tasksQueue.size).toBe(0)
851 expect(workerNode.tasksQueue.maxSize).toBe(0)
852 }
853 await pool.destroy()
854 pool = new DynamicThreadPool(
855 Math.floor(numberOfWorkers / 2),
856 numberOfWorkers,
857 './tests/worker-files/thread/testWorker.mjs'
858 )
859 for (const workerNode of pool.workerNodes) {
860 expect(workerNode).toBeInstanceOf(WorkerNode)
861 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
862 expect(workerNode.tasksQueue.size).toBe(0)
863 expect(workerNode.tasksQueue.maxSize).toBe(0)
864 }
865 await pool.destroy()
866 })
867
868 it('Verify that pool worker info are initialized', async () => {
869 let pool = new FixedClusterPool(
870 numberOfWorkers,
871 './tests/worker-files/cluster/testWorker.js'
872 )
873 for (const workerNode of pool.workerNodes) {
874 expect(workerNode).toBeInstanceOf(WorkerNode)
875 expect(workerNode.info).toStrictEqual({
876 id: expect.any(Number),
877 type: WorkerTypes.cluster,
878 dynamic: false,
879 ready: true
880 })
881 }
882 await pool.destroy()
883 pool = new DynamicThreadPool(
884 Math.floor(numberOfWorkers / 2),
885 numberOfWorkers,
886 './tests/worker-files/thread/testWorker.mjs'
887 )
888 for (const workerNode of pool.workerNodes) {
889 expect(workerNode).toBeInstanceOf(WorkerNode)
890 expect(workerNode.info).toStrictEqual({
891 id: expect.any(Number),
892 type: WorkerTypes.thread,
893 dynamic: false,
894 ready: true
895 })
896 }
897 await pool.destroy()
898 })
899
900 it('Verify that pool statuses are checked at start or destroy', async () => {
901 const pool = new FixedThreadPool(
902 numberOfWorkers,
903 './tests/worker-files/thread/testWorker.mjs'
904 )
905 expect(pool.info.started).toBe(true)
906 expect(pool.info.ready).toBe(true)
907 expect(() => pool.start()).toThrow(
908 new Error('Cannot start an already started pool')
909 )
910 await pool.destroy()
911 expect(pool.info.started).toBe(false)
912 expect(pool.info.ready).toBe(false)
913 await expect(pool.destroy()).rejects.toThrow(
914 new Error('Cannot destroy an already destroyed pool')
915 )
916 })
917
918 it('Verify that pool can be started after initialization', async () => {
919 const pool = new FixedClusterPool(
920 numberOfWorkers,
921 './tests/worker-files/cluster/testWorker.js',
922 {
923 startWorkers: false
924 }
925 )
926 expect(pool.info.started).toBe(false)
927 expect(pool.info.ready).toBe(false)
928 expect(pool.readyEventEmitted).toBe(false)
929 expect(pool.workerNodes).toStrictEqual([])
930 await expect(pool.execute()).rejects.toThrow(
931 new Error('Cannot execute a task on not started pool')
932 )
933 pool.start()
934 expect(pool.info.started).toBe(true)
935 expect(pool.info.ready).toBe(true)
936 await waitPoolEvents(pool, PoolEvents.ready, 1)
937 expect(pool.readyEventEmitted).toBe(true)
938 expect(pool.workerNodes.length).toBe(numberOfWorkers)
939 for (const workerNode of pool.workerNodes) {
940 expect(workerNode).toBeInstanceOf(WorkerNode)
941 }
942 await pool.destroy()
943 })
944
945 it('Verify that pool execute() arguments are checked', async () => {
946 const pool = new FixedClusterPool(
947 numberOfWorkers,
948 './tests/worker-files/cluster/testWorker.js'
949 )
950 await expect(pool.execute(undefined, 0)).rejects.toThrow(
951 new TypeError('name argument must be a string')
952 )
953 await expect(pool.execute(undefined, '')).rejects.toThrow(
954 new TypeError('name argument must not be an empty string')
955 )
956 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
957 new TypeError('transferList argument must be an array')
958 )
959 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
960 "Task function 'unknown' not found"
961 )
962 await pool.destroy()
963 await expect(pool.execute()).rejects.toThrow(
964 new Error('Cannot execute a task on not started pool')
965 )
966 })
967
968 it('Verify that pool worker tasks usage are computed', async () => {
969 const pool = new FixedClusterPool(
970 numberOfWorkers,
971 './tests/worker-files/cluster/testWorker.js'
972 )
973 const promises = new Set()
974 const maxMultiplier = 2
975 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
976 promises.add(pool.execute())
977 }
978 for (const workerNode of pool.workerNodes) {
979 expect(workerNode.usage).toStrictEqual({
980 tasks: {
981 executed: 0,
982 executing: maxMultiplier,
983 queued: 0,
984 maxQueued: 0,
985 sequentiallyStolen: 0,
986 stolen: 0,
987 failed: 0
988 },
989 runTime: {
990 history: expect.any(CircularArray)
991 },
992 waitTime: {
993 history: expect.any(CircularArray)
994 },
995 elu: {
996 idle: {
997 history: expect.any(CircularArray)
998 },
999 active: {
1000 history: expect.any(CircularArray)
1001 }
1002 }
1003 })
1004 }
1005 await Promise.all(promises)
1006 for (const workerNode of pool.workerNodes) {
1007 expect(workerNode.usage).toStrictEqual({
1008 tasks: {
1009 executed: maxMultiplier,
1010 executing: 0,
1011 queued: 0,
1012 maxQueued: 0,
1013 sequentiallyStolen: 0,
1014 stolen: 0,
1015 failed: 0
1016 },
1017 runTime: {
1018 history: expect.any(CircularArray)
1019 },
1020 waitTime: {
1021 history: expect.any(CircularArray)
1022 },
1023 elu: {
1024 idle: {
1025 history: expect.any(CircularArray)
1026 },
1027 active: {
1028 history: expect.any(CircularArray)
1029 }
1030 }
1031 })
1032 }
1033 await pool.destroy()
1034 })
1035
1036 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1037 const pool = new DynamicThreadPool(
1038 Math.floor(numberOfWorkers / 2),
1039 numberOfWorkers,
1040 './tests/worker-files/thread/testWorker.mjs'
1041 )
1042 const promises = new Set()
1043 const maxMultiplier = 2
1044 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1045 promises.add(pool.execute())
1046 }
1047 await Promise.all(promises)
1048 for (const workerNode of pool.workerNodes) {
1049 expect(workerNode.usage).toStrictEqual({
1050 tasks: {
1051 executed: expect.any(Number),
1052 executing: 0,
1053 queued: 0,
1054 maxQueued: 0,
1055 sequentiallyStolen: 0,
1056 stolen: 0,
1057 failed: 0
1058 },
1059 runTime: {
1060 history: expect.any(CircularArray)
1061 },
1062 waitTime: {
1063 history: expect.any(CircularArray)
1064 },
1065 elu: {
1066 idle: {
1067 history: expect.any(CircularArray)
1068 },
1069 active: {
1070 history: expect.any(CircularArray)
1071 }
1072 }
1073 })
1074 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1075 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1076 numberOfWorkers * maxMultiplier
1077 )
1078 expect(workerNode.usage.runTime.history.length).toBe(0)
1079 expect(workerNode.usage.waitTime.history.length).toBe(0)
1080 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1081 expect(workerNode.usage.elu.active.history.length).toBe(0)
1082 }
1083 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1084 for (const workerNode of pool.workerNodes) {
1085 expect(workerNode.usage).toStrictEqual({
1086 tasks: {
1087 executed: 0,
1088 executing: 0,
1089 queued: 0,
1090 maxQueued: 0,
1091 sequentiallyStolen: 0,
1092 stolen: 0,
1093 failed: 0
1094 },
1095 runTime: {
1096 history: expect.any(CircularArray)
1097 },
1098 waitTime: {
1099 history: expect.any(CircularArray)
1100 },
1101 elu: {
1102 idle: {
1103 history: expect.any(CircularArray)
1104 },
1105 active: {
1106 history: expect.any(CircularArray)
1107 }
1108 }
1109 })
1110 expect(workerNode.usage.runTime.history.length).toBe(0)
1111 expect(workerNode.usage.waitTime.history.length).toBe(0)
1112 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1113 expect(workerNode.usage.elu.active.history.length).toBe(0)
1114 }
1115 await pool.destroy()
1116 })
1117
1118 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1119 const pool = new DynamicClusterPool(
1120 Math.floor(numberOfWorkers / 2),
1121 numberOfWorkers,
1122 './tests/worker-files/cluster/testWorker.js'
1123 )
1124 expect(pool.emitter.eventNames()).toStrictEqual([])
1125 let poolInfo
1126 let poolReady = 0
1127 pool.emitter.on(PoolEvents.ready, info => {
1128 ++poolReady
1129 poolInfo = info
1130 })
1131 await waitPoolEvents(pool, PoolEvents.ready, 1)
1132 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1133 expect(poolReady).toBe(1)
1134 expect(poolInfo).toStrictEqual({
1135 version,
1136 type: PoolTypes.dynamic,
1137 worker: WorkerTypes.cluster,
1138 started: true,
1139 ready: true,
1140 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1141 minSize: expect.any(Number),
1142 maxSize: expect.any(Number),
1143 workerNodes: expect.any(Number),
1144 idleWorkerNodes: expect.any(Number),
1145 busyWorkerNodes: expect.any(Number),
1146 executedTasks: expect.any(Number),
1147 executingTasks: expect.any(Number),
1148 failedTasks: expect.any(Number)
1149 })
1150 await pool.destroy()
1151 })
1152
1153 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1154 const pool = new FixedThreadPool(
1155 numberOfWorkers,
1156 './tests/worker-files/thread/testWorker.mjs'
1157 )
1158 expect(pool.emitter.eventNames()).toStrictEqual([])
1159 const promises = new Set()
1160 let poolBusy = 0
1161 let poolInfo
1162 pool.emitter.on(PoolEvents.busy, info => {
1163 ++poolBusy
1164 poolInfo = info
1165 })
1166 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1167 for (let i = 0; i < numberOfWorkers * 2; i++) {
1168 promises.add(pool.execute())
1169 }
1170 await Promise.all(promises)
1171 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1172 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1173 expect(poolBusy).toBe(numberOfWorkers + 1)
1174 expect(poolInfo).toStrictEqual({
1175 version,
1176 type: PoolTypes.fixed,
1177 worker: WorkerTypes.thread,
1178 started: true,
1179 ready: true,
1180 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1181 minSize: expect.any(Number),
1182 maxSize: expect.any(Number),
1183 workerNodes: expect.any(Number),
1184 idleWorkerNodes: expect.any(Number),
1185 busyWorkerNodes: expect.any(Number),
1186 executedTasks: expect.any(Number),
1187 executingTasks: expect.any(Number),
1188 failedTasks: expect.any(Number)
1189 })
1190 await pool.destroy()
1191 })
1192
1193 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1194 const pool = new DynamicThreadPool(
1195 Math.floor(numberOfWorkers / 2),
1196 numberOfWorkers,
1197 './tests/worker-files/thread/testWorker.mjs'
1198 )
1199 expect(pool.emitter.eventNames()).toStrictEqual([])
1200 const promises = new Set()
1201 let poolFull = 0
1202 let poolInfo
1203 pool.emitter.on(PoolEvents.full, info => {
1204 ++poolFull
1205 poolInfo = info
1206 })
1207 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1208 for (let i = 0; i < numberOfWorkers * 2; i++) {
1209 promises.add(pool.execute())
1210 }
1211 await Promise.all(promises)
1212 expect(poolFull).toBe(1)
1213 expect(poolInfo).toStrictEqual({
1214 version,
1215 type: PoolTypes.dynamic,
1216 worker: WorkerTypes.thread,
1217 started: true,
1218 ready: true,
1219 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1220 minSize: expect.any(Number),
1221 maxSize: expect.any(Number),
1222 workerNodes: expect.any(Number),
1223 idleWorkerNodes: expect.any(Number),
1224 busyWorkerNodes: expect.any(Number),
1225 executedTasks: expect.any(Number),
1226 executingTasks: expect.any(Number),
1227 failedTasks: expect.any(Number)
1228 })
1229 await pool.destroy()
1230 })
1231
1232 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1233 const pool = new FixedThreadPool(
1234 numberOfWorkers,
1235 './tests/worker-files/thread/testWorker.mjs',
1236 {
1237 enableTasksQueue: true
1238 }
1239 )
1240 stub(pool, 'hasBackPressure').returns(true)
1241 expect(pool.emitter.eventNames()).toStrictEqual([])
1242 const promises = new Set()
1243 let poolBackPressure = 0
1244 let poolInfo
1245 pool.emitter.on(PoolEvents.backPressure, info => {
1246 ++poolBackPressure
1247 poolInfo = info
1248 })
1249 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1250 for (let i = 0; i < numberOfWorkers + 1; i++) {
1251 promises.add(pool.execute())
1252 }
1253 await Promise.all(promises)
1254 expect(poolBackPressure).toBe(1)
1255 expect(poolInfo).toStrictEqual({
1256 version,
1257 type: PoolTypes.fixed,
1258 worker: WorkerTypes.thread,
1259 started: true,
1260 ready: true,
1261 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1262 minSize: expect.any(Number),
1263 maxSize: expect.any(Number),
1264 workerNodes: expect.any(Number),
1265 idleWorkerNodes: expect.any(Number),
1266 busyWorkerNodes: expect.any(Number),
1267 executedTasks: expect.any(Number),
1268 executingTasks: expect.any(Number),
1269 maxQueuedTasks: expect.any(Number),
1270 queuedTasks: expect.any(Number),
1271 backPressure: true,
1272 stolenTasks: expect.any(Number),
1273 failedTasks: expect.any(Number)
1274 })
1275 expect(pool.hasBackPressure.callCount).toBe(5)
1276 await pool.destroy()
1277 })
1278
1279 it('Verify that destroy() waits for queued tasks to finish', async () => {
1280 const tasksFinishedTimeout = 2500
1281 const pool = new FixedThreadPool(
1282 numberOfWorkers,
1283 './tests/worker-files/thread/asyncWorker.mjs',
1284 {
1285 enableTasksQueue: true,
1286 tasksQueueOptions: { tasksFinishedTimeout }
1287 }
1288 )
1289 const maxMultiplier = 4
1290 let tasksFinished = 0
1291 for (const workerNode of pool.workerNodes) {
1292 workerNode.on('taskFinished', () => {
1293 ++tasksFinished
1294 })
1295 }
1296 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1297 pool.execute()
1298 }
1299 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1300 const startTime = performance.now()
1301 await pool.destroy()
1302 const elapsedTime = performance.now() - startTime
1303 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1304 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1305 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1306 })
1307
1308 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1309 const tasksFinishedTimeout = 1000
1310 const pool = new FixedThreadPool(
1311 numberOfWorkers,
1312 './tests/worker-files/thread/asyncWorker.mjs',
1313 {
1314 enableTasksQueue: true,
1315 tasksQueueOptions: { tasksFinishedTimeout }
1316 }
1317 )
1318 const maxMultiplier = 4
1319 let tasksFinished = 0
1320 for (const workerNode of pool.workerNodes) {
1321 workerNode.on('taskFinished', () => {
1322 ++tasksFinished
1323 })
1324 }
1325 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1326 pool.execute()
1327 }
1328 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1329 const startTime = performance.now()
1330 await pool.destroy()
1331 const elapsedTime = performance.now() - startTime
1332 expect(tasksFinished).toBe(0)
1333 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1334 })
1335
1336 it('Verify that pool asynchronous resource track tasks execution', async () => {
1337 let taskAsyncId
1338 let initCalls = 0
1339 let beforeCalls = 0
1340 let afterCalls = 0
1341 let resolveCalls = 0
1342 const hook = createHook({
1343 init (asyncId, type) {
1344 if (type === 'poolifier:task') {
1345 initCalls++
1346 taskAsyncId = asyncId
1347 }
1348 },
1349 before (asyncId) {
1350 if (asyncId === taskAsyncId) beforeCalls++
1351 },
1352 after (asyncId) {
1353 if (asyncId === taskAsyncId) afterCalls++
1354 },
1355 promiseResolve () {
1356 if (executionAsyncId() === taskAsyncId) resolveCalls++
1357 }
1358 })
1359 const pool = new FixedThreadPool(
1360 numberOfWorkers,
1361 './tests/worker-files/thread/testWorker.mjs'
1362 )
1363 hook.enable()
1364 await pool.execute()
1365 hook.disable()
1366 expect(initCalls).toBe(1)
1367 expect(beforeCalls).toBe(1)
1368 expect(afterCalls).toBe(1)
1369 expect(resolveCalls).toBe(1)
1370 await pool.destroy()
1371 })
1372
1373 it('Verify that hasTaskFunction() is working', async () => {
1374 const dynamicThreadPool = new DynamicThreadPool(
1375 Math.floor(numberOfWorkers / 2),
1376 numberOfWorkers,
1377 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1378 )
1379 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1380 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1381 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1382 true
1383 )
1384 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1385 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1386 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1387 await dynamicThreadPool.destroy()
1388 const fixedClusterPool = new FixedClusterPool(
1389 numberOfWorkers,
1390 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1391 )
1392 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1393 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1394 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1395 true
1396 )
1397 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1398 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1399 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1400 await fixedClusterPool.destroy()
1401 })
1402
1403 it('Verify that addTaskFunction() is working', async () => {
1404 const dynamicThreadPool = new DynamicThreadPool(
1405 Math.floor(numberOfWorkers / 2),
1406 numberOfWorkers,
1407 './tests/worker-files/thread/testWorker.mjs'
1408 )
1409 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1410 await expect(
1411 dynamicThreadPool.addTaskFunction(0, () => {})
1412 ).rejects.toThrow(new TypeError('name argument must be a string'))
1413 await expect(
1414 dynamicThreadPool.addTaskFunction('', () => {})
1415 ).rejects.toThrow(
1416 new TypeError('name argument must not be an empty string')
1417 )
1418 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1419 new TypeError('fn argument must be a function')
1420 )
1421 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1422 new TypeError('fn argument must be a function')
1423 )
1424 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1425 DEFAULT_TASK_NAME,
1426 'test'
1427 ])
1428 const echoTaskFunction = data => {
1429 return data
1430 }
1431 await expect(
1432 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1433 ).resolves.toBe(true)
1434 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1435 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1436 echoTaskFunction
1437 )
1438 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1439 DEFAULT_TASK_NAME,
1440 'test',
1441 'echo'
1442 ])
1443 const taskFunctionData = { test: 'test' }
1444 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1445 expect(echoResult).toStrictEqual(taskFunctionData)
1446 for (const workerNode of dynamicThreadPool.workerNodes) {
1447 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1448 tasks: {
1449 executed: expect.any(Number),
1450 executing: 0,
1451 queued: 0,
1452 sequentiallyStolen: 0,
1453 stolen: 0,
1454 failed: 0
1455 },
1456 runTime: {
1457 history: new CircularArray()
1458 },
1459 waitTime: {
1460 history: new CircularArray()
1461 },
1462 elu: {
1463 idle: {
1464 history: new CircularArray()
1465 },
1466 active: {
1467 history: new CircularArray()
1468 }
1469 }
1470 })
1471 }
1472 await dynamicThreadPool.destroy()
1473 })
1474
1475 it('Verify that removeTaskFunction() is working', async () => {
1476 const dynamicThreadPool = new DynamicThreadPool(
1477 Math.floor(numberOfWorkers / 2),
1478 numberOfWorkers,
1479 './tests/worker-files/thread/testWorker.mjs'
1480 )
1481 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1482 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1483 DEFAULT_TASK_NAME,
1484 'test'
1485 ])
1486 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1487 new Error('Cannot remove a task function not handled on the pool side')
1488 )
1489 const echoTaskFunction = data => {
1490 return data
1491 }
1492 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1493 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1494 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1495 echoTaskFunction
1496 )
1497 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1498 DEFAULT_TASK_NAME,
1499 'test',
1500 'echo'
1501 ])
1502 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1503 true
1504 )
1505 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1506 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1507 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1508 DEFAULT_TASK_NAME,
1509 'test'
1510 ])
1511 await dynamicThreadPool.destroy()
1512 })
1513
1514 it('Verify that listTaskFunctionNames() is working', async () => {
1515 const dynamicThreadPool = new DynamicThreadPool(
1516 Math.floor(numberOfWorkers / 2),
1517 numberOfWorkers,
1518 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1519 )
1520 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1521 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1522 DEFAULT_TASK_NAME,
1523 'jsonIntegerSerialization',
1524 'factorial',
1525 'fibonacci'
1526 ])
1527 await dynamicThreadPool.destroy()
1528 const fixedClusterPool = new FixedClusterPool(
1529 numberOfWorkers,
1530 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1531 )
1532 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1533 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1534 DEFAULT_TASK_NAME,
1535 'jsonIntegerSerialization',
1536 'factorial',
1537 'fibonacci'
1538 ])
1539 await fixedClusterPool.destroy()
1540 })
1541
1542 it('Verify that setDefaultTaskFunction() is working', async () => {
1543 const dynamicThreadPool = new DynamicThreadPool(
1544 Math.floor(numberOfWorkers / 2),
1545 numberOfWorkers,
1546 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1547 )
1548 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1549 const workerId = dynamicThreadPool.workerNodes[0].info.id
1550 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1551 new Error(
1552 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1553 )
1554 )
1555 await expect(
1556 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1557 ).rejects.toThrow(
1558 new Error(
1559 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1560 )
1561 )
1562 await expect(
1563 dynamicThreadPool.setDefaultTaskFunction('unknown')
1564 ).rejects.toThrow(
1565 new Error(
1566 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1567 )
1568 )
1569 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1570 DEFAULT_TASK_NAME,
1571 'jsonIntegerSerialization',
1572 'factorial',
1573 'fibonacci'
1574 ])
1575 await expect(
1576 dynamicThreadPool.setDefaultTaskFunction('factorial')
1577 ).resolves.toBe(true)
1578 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1579 DEFAULT_TASK_NAME,
1580 'factorial',
1581 'jsonIntegerSerialization',
1582 'fibonacci'
1583 ])
1584 await expect(
1585 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1586 ).resolves.toBe(true)
1587 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1588 DEFAULT_TASK_NAME,
1589 'fibonacci',
1590 'jsonIntegerSerialization',
1591 'factorial'
1592 ])
1593 await dynamicThreadPool.destroy()
1594 })
1595
1596 it('Verify that multiple task functions worker is working', async () => {
1597 const pool = new DynamicClusterPool(
1598 Math.floor(numberOfWorkers / 2),
1599 numberOfWorkers,
1600 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1601 )
1602 const data = { n: 10 }
1603 const result0 = await pool.execute(data)
1604 expect(result0).toStrictEqual({ ok: 1 })
1605 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1606 expect(result1).toStrictEqual({ ok: 1 })
1607 const result2 = await pool.execute(data, 'factorial')
1608 expect(result2).toBe(3628800)
1609 const result3 = await pool.execute(data, 'fibonacci')
1610 expect(result3).toBe(55)
1611 expect(pool.info.executingTasks).toBe(0)
1612 expect(pool.info.executedTasks).toBe(4)
1613 for (const workerNode of pool.workerNodes) {
1614 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1615 DEFAULT_TASK_NAME,
1616 'jsonIntegerSerialization',
1617 'factorial',
1618 'fibonacci'
1619 ])
1620 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1621 for (const name of pool.listTaskFunctionNames()) {
1622 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1623 tasks: {
1624 executed: expect.any(Number),
1625 executing: 0,
1626 failed: 0,
1627 queued: 0,
1628 sequentiallyStolen: 0,
1629 stolen: 0
1630 },
1631 runTime: {
1632 history: expect.any(CircularArray)
1633 },
1634 waitTime: {
1635 history: expect.any(CircularArray)
1636 },
1637 elu: {
1638 idle: {
1639 history: expect.any(CircularArray)
1640 },
1641 active: {
1642 history: expect.any(CircularArray)
1643 }
1644 }
1645 })
1646 expect(
1647 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1648 ).toBeGreaterThan(0)
1649 }
1650 expect(
1651 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1652 ).toStrictEqual(
1653 workerNode.getTaskFunctionWorkerUsage(
1654 workerNode.info.taskFunctionNames[1]
1655 )
1656 )
1657 }
1658 await pool.destroy()
1659 })
1660
1661 it('Verify sendKillMessageToWorker()', async () => {
1662 const pool = new DynamicClusterPool(
1663 Math.floor(numberOfWorkers / 2),
1664 numberOfWorkers,
1665 './tests/worker-files/cluster/testWorker.js'
1666 )
1667 const workerNodeKey = 0
1668 await expect(
1669 pool.sendKillMessageToWorker(workerNodeKey)
1670 ).resolves.toBeUndefined()
1671 await expect(
1672 pool.sendKillMessageToWorker(numberOfWorkers)
1673 ).rejects.toStrictEqual(
1674 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1675 )
1676 await pool.destroy()
1677 })
1678
1679 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1680 const pool = new DynamicClusterPool(
1681 Math.floor(numberOfWorkers / 2),
1682 numberOfWorkers,
1683 './tests/worker-files/cluster/testWorker.js'
1684 )
1685 const workerNodeKey = 0
1686 await expect(
1687 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1688 taskFunctionOperation: 'add',
1689 taskFunctionName: 'empty',
1690 taskFunction: (() => {}).toString()
1691 })
1692 ).resolves.toBe(true)
1693 expect(
1694 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1695 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1696 await pool.destroy()
1697 })
1698
1699 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1700 const pool = new DynamicClusterPool(
1701 Math.floor(numberOfWorkers / 2),
1702 numberOfWorkers,
1703 './tests/worker-files/cluster/testWorker.js'
1704 )
1705 await expect(
1706 pool.sendTaskFunctionOperationToWorkers({
1707 taskFunctionOperation: 'add',
1708 taskFunctionName: 'empty',
1709 taskFunction: (() => {}).toString()
1710 })
1711 ).resolves.toBe(true)
1712 for (const workerNode of pool.workerNodes) {
1713 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1714 DEFAULT_TASK_NAME,
1715 'test',
1716 'empty'
1717 ])
1718 }
1719 await pool.destroy()
1720 })
1721 })