test: less strict expectation
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
8 import {
9 DynamicClusterPool,
10 DynamicThreadPool,
11 FixedClusterPool,
12 FixedThreadPool,
13 PoolEvents,
14 PoolTypes,
15 WorkerChoiceStrategies,
16 WorkerTypes
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
23
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
26 readFileSync(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
28 'utf8'
29 )
30 ).version
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
33 isMain () {
34 return false
35 }
36 }
37
38 afterEach(() => {
39 restore()
40 })
41
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
52 expect(
53 () =>
54 new StubPoolWithIsMain(
55 numberOfWorkers,
56 './tests/worker-files/thread/testWorker.mjs',
57 {
58 errorHandler: e => console.error(e)
59 }
60 )
61 ).toThrow(
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
65 )
66 })
67
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
71 './tests/worker-files/thread/testWorker.mjs'
72 )
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
76 await pool.destroy()
77 })
78
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
96 './tests/worker-files/thread/testWorker.mjs'
97 )
98 ).toThrow(
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
109 ).toThrow(
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
120 ).toThrow(
121 new TypeError(
122 'Cannot instantiate a pool with a non safe integer number of workers'
123 )
124 )
125 })
126
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
143 it('Verify that dynamic pool sizing is checked', () => {
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
151 ).toThrow(
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
161 './tests/worker-files/thread/testWorker.mjs'
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
173 './tests/worker-files/cluster/testWorker.js'
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
180 expect(
181 () =>
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
202 )
203 )
204 expect(
205 () =>
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
211 ).toThrow(
212 new RangeError(
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
214 )
215 )
216 })
217
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
221 './tests/worker-files/thread/testWorker.mjs'
222 )
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
232 retries:
233 pool.info.maxSize +
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
242 })
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
247 retries:
248 pool.info.maxSize +
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
253 // weights: expect.objectContaining({
254 // 0: expect.any(Number),
255 // [pool.info.maxSize - 1]: expect.any(Number)
256 // })
257 })
258 )
259 }
260 await pool.destroy()
261 const testHandler = () => console.info('test handler executed')
262 pool = new FixedThreadPool(
263 numberOfWorkers,
264 './tests/worker-files/thread/testWorker.mjs',
265 {
266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
267 workerChoiceStrategyOptions: {
268 runTime: { median: true },
269 weights: { 0: 300, 1: 200 }
270 },
271 enableEvents: false,
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
274 tasksQueueOptions: { concurrency: 2 },
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 onlineHandler: testHandler,
278 exitHandler: testHandler
279 }
280 )
281 expect(pool.emitter).toBeUndefined()
282 expect(pool.opts).toStrictEqual({
283 startWorkers: true,
284 enableEvents: false,
285 restartWorkerOnError: false,
286 enableTasksQueue: true,
287 tasksQueueOptions: {
288 concurrency: 2,
289 size: Math.pow(numberOfWorkers, 2),
290 taskStealing: true,
291 tasksStealingOnBackPressure: true,
292 tasksFinishedTimeout: 2000
293 },
294 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
295 workerChoiceStrategyOptions: {
296 runTime: { median: true },
297 weights: { 0: 300, 1: 200 }
298 },
299 onlineHandler: testHandler,
300 messageHandler: testHandler,
301 errorHandler: testHandler,
302 exitHandler: testHandler
303 })
304 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
305 retries:
306 pool.info.maxSize +
307 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
308 runTime: { median: true },
309 waitTime: { median: false },
310 elu: { median: false },
311 weights: { 0: 300, 1: 200 }
312 })
313 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
314 .workerChoiceStrategies) {
315 expect(workerChoiceStrategy.opts).toStrictEqual({
316 retries:
317 pool.info.maxSize +
318 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
319 runTime: { median: true },
320 waitTime: { median: false },
321 elu: { median: false },
322 weights: { 0: 300, 1: 200 }
323 })
324 }
325 await pool.destroy()
326 })
327
328 it('Verify that pool options are validated', () => {
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
333 './tests/worker-files/thread/testWorker.mjs',
334 {
335 workerChoiceStrategy: 'invalidStrategy'
336 }
337 )
338 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
339 expect(
340 () =>
341 new FixedThreadPool(
342 numberOfWorkers,
343 './tests/worker-files/thread/testWorker.mjs',
344 {
345 workerChoiceStrategyOptions: { weights: {} }
346 }
347 )
348 ).toThrow(
349 new Error(
350 'Invalid worker choice strategy options: must have a weight for each worker node'
351 )
352 )
353 expect(
354 () =>
355 new FixedThreadPool(
356 numberOfWorkers,
357 './tests/worker-files/thread/testWorker.mjs',
358 {
359 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
360 }
361 )
362 ).toThrow(
363 new Error(
364 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
365 )
366 )
367 expect(
368 () =>
369 new FixedThreadPool(
370 numberOfWorkers,
371 './tests/worker-files/thread/testWorker.mjs',
372 {
373 enableTasksQueue: true,
374 tasksQueueOptions: 'invalidTasksQueueOptions'
375 }
376 )
377 ).toThrow(
378 new TypeError('Invalid tasks queue options: must be a plain object')
379 )
380 expect(
381 () =>
382 new FixedThreadPool(
383 numberOfWorkers,
384 './tests/worker-files/thread/testWorker.mjs',
385 {
386 enableTasksQueue: true,
387 tasksQueueOptions: { concurrency: 0 }
388 }
389 )
390 ).toThrow(
391 new RangeError(
392 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
393 )
394 )
395 expect(
396 () =>
397 new FixedThreadPool(
398 numberOfWorkers,
399 './tests/worker-files/thread/testWorker.mjs',
400 {
401 enableTasksQueue: true,
402 tasksQueueOptions: { concurrency: -1 }
403 }
404 )
405 ).toThrow(
406 new RangeError(
407 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
408 )
409 )
410 expect(
411 () =>
412 new FixedThreadPool(
413 numberOfWorkers,
414 './tests/worker-files/thread/testWorker.mjs',
415 {
416 enableTasksQueue: true,
417 tasksQueueOptions: { concurrency: 0.2 }
418 }
419 )
420 ).toThrow(
421 new TypeError('Invalid worker node tasks concurrency: must be an integer')
422 )
423 expect(
424 () =>
425 new FixedThreadPool(
426 numberOfWorkers,
427 './tests/worker-files/thread/testWorker.mjs',
428 {
429 enableTasksQueue: true,
430 tasksQueueOptions: { size: 0 }
431 }
432 )
433 ).toThrow(
434 new RangeError(
435 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
436 )
437 )
438 expect(
439 () =>
440 new FixedThreadPool(
441 numberOfWorkers,
442 './tests/worker-files/thread/testWorker.mjs',
443 {
444 enableTasksQueue: true,
445 tasksQueueOptions: { size: -1 }
446 }
447 )
448 ).toThrow(
449 new RangeError(
450 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
451 )
452 )
453 expect(
454 () =>
455 new FixedThreadPool(
456 numberOfWorkers,
457 './tests/worker-files/thread/testWorker.mjs',
458 {
459 enableTasksQueue: true,
460 tasksQueueOptions: { size: 0.2 }
461 }
462 )
463 ).toThrow(
464 new TypeError('Invalid worker node tasks queue size: must be an integer')
465 )
466 })
467
468 it('Verify that pool worker choice strategy options can be set', async () => {
469 const pool = new FixedThreadPool(
470 numberOfWorkers,
471 './tests/worker-files/thread/testWorker.mjs',
472 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
473 )
474 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
475 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
476 retries:
477 pool.info.maxSize +
478 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
479 runTime: { median: false },
480 waitTime: { median: false },
481 elu: { median: false },
482 weights: expect.objectContaining({
483 0: expect.any(Number),
484 [pool.info.maxSize - 1]: expect.any(Number)
485 })
486 })
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
489 expect(workerChoiceStrategy.opts).toStrictEqual({
490 retries:
491 pool.info.maxSize +
492 Object.keys(workerChoiceStrategy.opts.weights).length,
493 runTime: { median: false },
494 waitTime: { median: false },
495 elu: { median: false },
496 weights: expect.objectContaining({
497 0: expect.any(Number),
498 [pool.info.maxSize - 1]: expect.any(Number)
499 })
500 })
501 }
502 expect(
503 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
504 ).toStrictEqual({
505 runTime: {
506 aggregate: true,
507 average: true,
508 median: false
509 },
510 waitTime: {
511 aggregate: false,
512 average: false,
513 median: false
514 },
515 elu: {
516 aggregate: true,
517 average: true,
518 median: false
519 }
520 })
521 pool.setWorkerChoiceStrategyOptions({
522 runTime: { median: true },
523 elu: { median: true }
524 })
525 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
526 runTime: { median: true },
527 elu: { median: true }
528 })
529 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
530 retries:
531 pool.info.maxSize +
532 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
533 runTime: { median: true },
534 waitTime: { median: false },
535 elu: { median: true },
536 weights: expect.objectContaining({
537 0: expect.any(Number),
538 [pool.info.maxSize - 1]: expect.any(Number)
539 })
540 })
541 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
542 .workerChoiceStrategies) {
543 expect(workerChoiceStrategy.opts).toStrictEqual({
544 retries:
545 pool.info.maxSize +
546 Object.keys(workerChoiceStrategy.opts.weights).length,
547 runTime: { median: true },
548 waitTime: { median: false },
549 elu: { median: true },
550 weights: expect.objectContaining({
551 0: expect.any(Number),
552 [pool.info.maxSize - 1]: expect.any(Number)
553 })
554 })
555 }
556 expect(
557 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
558 ).toStrictEqual({
559 runTime: {
560 aggregate: true,
561 average: false,
562 median: true
563 },
564 waitTime: {
565 aggregate: false,
566 average: false,
567 median: false
568 },
569 elu: {
570 aggregate: true,
571 average: false,
572 median: true
573 }
574 })
575 pool.setWorkerChoiceStrategyOptions({
576 runTime: { median: false },
577 elu: { median: false }
578 })
579 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
580 runTime: { median: false },
581 elu: { median: false }
582 })
583 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
584 retries:
585 pool.info.maxSize +
586 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
587 runTime: { median: false },
588 waitTime: { median: false },
589 elu: { median: false },
590 weights: expect.objectContaining({
591 0: expect.any(Number),
592 [pool.info.maxSize - 1]: expect.any(Number)
593 })
594 })
595 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
596 .workerChoiceStrategies) {
597 expect(workerChoiceStrategy.opts).toStrictEqual({
598 retries:
599 pool.info.maxSize +
600 Object.keys(workerChoiceStrategy.opts.weights).length,
601 runTime: { median: false },
602 waitTime: { median: false },
603 elu: { median: false },
604 weights: expect.objectContaining({
605 0: expect.any(Number),
606 [pool.info.maxSize - 1]: expect.any(Number)
607 })
608 })
609 }
610 expect(
611 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
612 ).toStrictEqual({
613 runTime: {
614 aggregate: true,
615 average: true,
616 median: false
617 },
618 waitTime: {
619 aggregate: false,
620 average: false,
621 median: false
622 },
623 elu: {
624 aggregate: true,
625 average: true,
626 median: false
627 }
628 })
629 expect(() =>
630 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
631 ).toThrow(
632 new TypeError(
633 'Invalid worker choice strategy options: must be a plain object'
634 )
635 )
636 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
637 new Error(
638 'Invalid worker choice strategy options: must have a weight for each worker node'
639 )
640 )
641 expect(() =>
642 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
643 ).toThrow(
644 new Error(
645 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
646 )
647 )
648 await pool.destroy()
649 })
650
651 it('Verify that pool tasks queue can be enabled/disabled', async () => {
652 const pool = new FixedThreadPool(
653 numberOfWorkers,
654 './tests/worker-files/thread/testWorker.mjs'
655 )
656 expect(pool.opts.enableTasksQueue).toBe(false)
657 expect(pool.opts.tasksQueueOptions).toBeUndefined()
658 pool.enableTasksQueue(true)
659 expect(pool.opts.enableTasksQueue).toBe(true)
660 expect(pool.opts.tasksQueueOptions).toStrictEqual({
661 concurrency: 1,
662 size: Math.pow(numberOfWorkers, 2),
663 taskStealing: true,
664 tasksStealingOnBackPressure: true,
665 tasksFinishedTimeout: 2000
666 })
667 pool.enableTasksQueue(true, { concurrency: 2 })
668 expect(pool.opts.enableTasksQueue).toBe(true)
669 expect(pool.opts.tasksQueueOptions).toStrictEqual({
670 concurrency: 2,
671 size: Math.pow(numberOfWorkers, 2),
672 taskStealing: true,
673 tasksStealingOnBackPressure: true,
674 tasksFinishedTimeout: 2000
675 })
676 pool.enableTasksQueue(false)
677 expect(pool.opts.enableTasksQueue).toBe(false)
678 expect(pool.opts.tasksQueueOptions).toBeUndefined()
679 await pool.destroy()
680 })
681
682 it('Verify that pool tasks queue options can be set', async () => {
683 const pool = new FixedThreadPool(
684 numberOfWorkers,
685 './tests/worker-files/thread/testWorker.mjs',
686 { enableTasksQueue: true }
687 )
688 expect(pool.opts.tasksQueueOptions).toStrictEqual({
689 concurrency: 1,
690 size: Math.pow(numberOfWorkers, 2),
691 taskStealing: true,
692 tasksStealingOnBackPressure: true,
693 tasksFinishedTimeout: 2000
694 })
695 for (const workerNode of pool.workerNodes) {
696 expect(workerNode.tasksQueueBackPressureSize).toBe(
697 pool.opts.tasksQueueOptions.size
698 )
699 }
700 pool.setTasksQueueOptions({
701 concurrency: 2,
702 size: 2,
703 taskStealing: false,
704 tasksStealingOnBackPressure: false,
705 tasksFinishedTimeout: 3000
706 })
707 expect(pool.opts.tasksQueueOptions).toStrictEqual({
708 concurrency: 2,
709 size: 2,
710 taskStealing: false,
711 tasksStealingOnBackPressure: false,
712 tasksFinishedTimeout: 3000
713 })
714 for (const workerNode of pool.workerNodes) {
715 expect(workerNode.tasksQueueBackPressureSize).toBe(
716 pool.opts.tasksQueueOptions.size
717 )
718 }
719 pool.setTasksQueueOptions({
720 concurrency: 1,
721 taskStealing: true,
722 tasksStealingOnBackPressure: true
723 })
724 expect(pool.opts.tasksQueueOptions).toStrictEqual({
725 concurrency: 1,
726 size: Math.pow(numberOfWorkers, 2),
727 taskStealing: true,
728 tasksStealingOnBackPressure: true,
729 tasksFinishedTimeout: 2000
730 })
731 for (const workerNode of pool.workerNodes) {
732 expect(workerNode.tasksQueueBackPressureSize).toBe(
733 pool.opts.tasksQueueOptions.size
734 )
735 }
736 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
737 new TypeError('Invalid tasks queue options: must be a plain object')
738 )
739 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
740 new RangeError(
741 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
742 )
743 )
744 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
745 new RangeError(
746 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
747 )
748 )
749 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
750 new TypeError('Invalid worker node tasks concurrency: must be an integer')
751 )
752 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
753 new RangeError(
754 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
755 )
756 )
757 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
758 new RangeError(
759 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
760 )
761 )
762 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
763 new TypeError('Invalid worker node tasks queue size: must be an integer')
764 )
765 await pool.destroy()
766 })
767
768 it('Verify that pool info is set', async () => {
769 let pool = new FixedThreadPool(
770 numberOfWorkers,
771 './tests/worker-files/thread/testWorker.mjs'
772 )
773 expect(pool.info).toStrictEqual({
774 version,
775 type: PoolTypes.fixed,
776 worker: WorkerTypes.thread,
777 started: true,
778 ready: true,
779 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
780 minSize: numberOfWorkers,
781 maxSize: numberOfWorkers,
782 workerNodes: numberOfWorkers,
783 idleWorkerNodes: numberOfWorkers,
784 busyWorkerNodes: 0,
785 executedTasks: 0,
786 executingTasks: 0,
787 failedTasks: 0
788 })
789 await pool.destroy()
790 pool = new DynamicClusterPool(
791 Math.floor(numberOfWorkers / 2),
792 numberOfWorkers,
793 './tests/worker-files/cluster/testWorker.js'
794 )
795 expect(pool.info).toStrictEqual({
796 version,
797 type: PoolTypes.dynamic,
798 worker: WorkerTypes.cluster,
799 started: true,
800 ready: true,
801 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
802 minSize: Math.floor(numberOfWorkers / 2),
803 maxSize: numberOfWorkers,
804 workerNodes: Math.floor(numberOfWorkers / 2),
805 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
806 busyWorkerNodes: 0,
807 executedTasks: 0,
808 executingTasks: 0,
809 failedTasks: 0
810 })
811 await pool.destroy()
812 })
813
814 it('Verify that pool worker tasks usage are initialized', async () => {
815 const pool = new FixedClusterPool(
816 numberOfWorkers,
817 './tests/worker-files/cluster/testWorker.js'
818 )
819 for (const workerNode of pool.workerNodes) {
820 expect(workerNode).toBeInstanceOf(WorkerNode)
821 expect(workerNode.usage).toStrictEqual({
822 tasks: {
823 executed: 0,
824 executing: 0,
825 queued: 0,
826 maxQueued: 0,
827 sequentiallyStolen: 0,
828 stolen: 0,
829 failed: 0
830 },
831 runTime: {
832 history: new CircularArray()
833 },
834 waitTime: {
835 history: new CircularArray()
836 },
837 elu: {
838 idle: {
839 history: new CircularArray()
840 },
841 active: {
842 history: new CircularArray()
843 }
844 }
845 })
846 }
847 await pool.destroy()
848 })
849
850 it('Verify that pool worker tasks queue are initialized', async () => {
851 let pool = new FixedClusterPool(
852 numberOfWorkers,
853 './tests/worker-files/cluster/testWorker.js'
854 )
855 for (const workerNode of pool.workerNodes) {
856 expect(workerNode).toBeInstanceOf(WorkerNode)
857 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
858 expect(workerNode.tasksQueue.size).toBe(0)
859 expect(workerNode.tasksQueue.maxSize).toBe(0)
860 }
861 await pool.destroy()
862 pool = new DynamicThreadPool(
863 Math.floor(numberOfWorkers / 2),
864 numberOfWorkers,
865 './tests/worker-files/thread/testWorker.mjs'
866 )
867 for (const workerNode of pool.workerNodes) {
868 expect(workerNode).toBeInstanceOf(WorkerNode)
869 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
870 expect(workerNode.tasksQueue.size).toBe(0)
871 expect(workerNode.tasksQueue.maxSize).toBe(0)
872 }
873 await pool.destroy()
874 })
875
876 it('Verify that pool worker info are initialized', async () => {
877 let pool = new FixedClusterPool(
878 numberOfWorkers,
879 './tests/worker-files/cluster/testWorker.js'
880 )
881 for (const workerNode of pool.workerNodes) {
882 expect(workerNode).toBeInstanceOf(WorkerNode)
883 expect(workerNode.info).toStrictEqual({
884 id: expect.any(Number),
885 type: WorkerTypes.cluster,
886 dynamic: false,
887 ready: true
888 })
889 }
890 await pool.destroy()
891 pool = new DynamicThreadPool(
892 Math.floor(numberOfWorkers / 2),
893 numberOfWorkers,
894 './tests/worker-files/thread/testWorker.mjs'
895 )
896 for (const workerNode of pool.workerNodes) {
897 expect(workerNode).toBeInstanceOf(WorkerNode)
898 expect(workerNode.info).toStrictEqual({
899 id: expect.any(Number),
900 type: WorkerTypes.thread,
901 dynamic: false,
902 ready: true
903 })
904 }
905 await pool.destroy()
906 })
907
908 it('Verify that pool statuses are checked at start or destroy', async () => {
909 const pool = new FixedThreadPool(
910 numberOfWorkers,
911 './tests/worker-files/thread/testWorker.mjs'
912 )
913 expect(pool.info.started).toBe(true)
914 expect(pool.info.ready).toBe(true)
915 expect(() => pool.start()).toThrow(
916 new Error('Cannot start an already started pool')
917 )
918 await pool.destroy()
919 expect(pool.info.started).toBe(false)
920 expect(pool.info.ready).toBe(false)
921 await expect(pool.destroy()).rejects.toThrow(
922 new Error('Cannot destroy an already destroyed pool')
923 )
924 })
925
926 it('Verify that pool can be started after initialization', async () => {
927 const pool = new FixedClusterPool(
928 numberOfWorkers,
929 './tests/worker-files/cluster/testWorker.js',
930 {
931 startWorkers: false
932 }
933 )
934 expect(pool.info.started).toBe(false)
935 expect(pool.info.ready).toBe(false)
936 expect(pool.readyEventEmitted).toBe(false)
937 expect(pool.workerNodes).toStrictEqual([])
938 await expect(pool.execute()).rejects.toThrow(
939 new Error('Cannot execute a task on not started pool')
940 )
941 pool.start()
942 expect(pool.info.started).toBe(true)
943 expect(pool.info.ready).toBe(true)
944 await waitPoolEvents(pool, PoolEvents.ready, 1)
945 expect(pool.readyEventEmitted).toBe(true)
946 expect(pool.workerNodes.length).toBe(numberOfWorkers)
947 for (const workerNode of pool.workerNodes) {
948 expect(workerNode).toBeInstanceOf(WorkerNode)
949 }
950 await pool.destroy()
951 })
952
953 it('Verify that pool execute() arguments are checked', async () => {
954 const pool = new FixedClusterPool(
955 numberOfWorkers,
956 './tests/worker-files/cluster/testWorker.js'
957 )
958 await expect(pool.execute(undefined, 0)).rejects.toThrow(
959 new TypeError('name argument must be a string')
960 )
961 await expect(pool.execute(undefined, '')).rejects.toThrow(
962 new TypeError('name argument must not be an empty string')
963 )
964 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
965 new TypeError('transferList argument must be an array')
966 )
967 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
968 "Task function 'unknown' not found"
969 )
970 await pool.destroy()
971 await expect(pool.execute()).rejects.toThrow(
972 new Error('Cannot execute a task on not started pool')
973 )
974 })
975
976 it('Verify that pool worker tasks usage are computed', async () => {
977 const pool = new FixedClusterPool(
978 numberOfWorkers,
979 './tests/worker-files/cluster/testWorker.js'
980 )
981 const promises = new Set()
982 const maxMultiplier = 2
983 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
984 promises.add(pool.execute())
985 }
986 for (const workerNode of pool.workerNodes) {
987 expect(workerNode.usage).toStrictEqual({
988 tasks: {
989 executed: 0,
990 executing: maxMultiplier,
991 queued: 0,
992 maxQueued: 0,
993 sequentiallyStolen: 0,
994 stolen: 0,
995 failed: 0
996 },
997 runTime: {
998 history: expect.any(CircularArray)
999 },
1000 waitTime: {
1001 history: expect.any(CircularArray)
1002 },
1003 elu: {
1004 idle: {
1005 history: expect.any(CircularArray)
1006 },
1007 active: {
1008 history: expect.any(CircularArray)
1009 }
1010 }
1011 })
1012 }
1013 await Promise.all(promises)
1014 for (const workerNode of pool.workerNodes) {
1015 expect(workerNode.usage).toStrictEqual({
1016 tasks: {
1017 executed: maxMultiplier,
1018 executing: 0,
1019 queued: 0,
1020 maxQueued: 0,
1021 sequentiallyStolen: 0,
1022 stolen: 0,
1023 failed: 0
1024 },
1025 runTime: {
1026 history: expect.any(CircularArray)
1027 },
1028 waitTime: {
1029 history: expect.any(CircularArray)
1030 },
1031 elu: {
1032 idle: {
1033 history: expect.any(CircularArray)
1034 },
1035 active: {
1036 history: expect.any(CircularArray)
1037 }
1038 }
1039 })
1040 }
1041 await pool.destroy()
1042 })
1043
1044 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1045 const pool = new DynamicThreadPool(
1046 Math.floor(numberOfWorkers / 2),
1047 numberOfWorkers,
1048 './tests/worker-files/thread/testWorker.mjs'
1049 )
1050 const promises = new Set()
1051 const maxMultiplier = 2
1052 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1053 promises.add(pool.execute())
1054 }
1055 await Promise.all(promises)
1056 for (const workerNode of pool.workerNodes) {
1057 expect(workerNode.usage).toStrictEqual({
1058 tasks: {
1059 executed: expect.any(Number),
1060 executing: 0,
1061 queued: 0,
1062 maxQueued: 0,
1063 sequentiallyStolen: 0,
1064 stolen: 0,
1065 failed: 0
1066 },
1067 runTime: {
1068 history: expect.any(CircularArray)
1069 },
1070 waitTime: {
1071 history: expect.any(CircularArray)
1072 },
1073 elu: {
1074 idle: {
1075 history: expect.any(CircularArray)
1076 },
1077 active: {
1078 history: expect.any(CircularArray)
1079 }
1080 }
1081 })
1082 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1083 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1084 numberOfWorkers * maxMultiplier
1085 )
1086 expect(workerNode.usage.runTime.history.length).toBe(0)
1087 expect(workerNode.usage.waitTime.history.length).toBe(0)
1088 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1089 expect(workerNode.usage.elu.active.history.length).toBe(0)
1090 }
1091 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1092 for (const workerNode of pool.workerNodes) {
1093 expect(workerNode.usage).toStrictEqual({
1094 tasks: {
1095 executed: 0,
1096 executing: 0,
1097 queued: 0,
1098 maxQueued: 0,
1099 sequentiallyStolen: 0,
1100 stolen: 0,
1101 failed: 0
1102 },
1103 runTime: {
1104 history: expect.any(CircularArray)
1105 },
1106 waitTime: {
1107 history: expect.any(CircularArray)
1108 },
1109 elu: {
1110 idle: {
1111 history: expect.any(CircularArray)
1112 },
1113 active: {
1114 history: expect.any(CircularArray)
1115 }
1116 }
1117 })
1118 expect(workerNode.usage.runTime.history.length).toBe(0)
1119 expect(workerNode.usage.waitTime.history.length).toBe(0)
1120 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1121 expect(workerNode.usage.elu.active.history.length).toBe(0)
1122 }
1123 await pool.destroy()
1124 })
1125
1126 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1127 const pool = new DynamicClusterPool(
1128 Math.floor(numberOfWorkers / 2),
1129 numberOfWorkers,
1130 './tests/worker-files/cluster/testWorker.js'
1131 )
1132 expect(pool.emitter.eventNames()).toStrictEqual([])
1133 let poolInfo
1134 let poolReady = 0
1135 pool.emitter.on(PoolEvents.ready, info => {
1136 ++poolReady
1137 poolInfo = info
1138 })
1139 await waitPoolEvents(pool, PoolEvents.ready, 1)
1140 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1141 expect(poolReady).toBe(1)
1142 expect(poolInfo).toStrictEqual({
1143 version,
1144 type: PoolTypes.dynamic,
1145 worker: WorkerTypes.cluster,
1146 started: true,
1147 ready: true,
1148 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1149 minSize: expect.any(Number),
1150 maxSize: expect.any(Number),
1151 workerNodes: expect.any(Number),
1152 idleWorkerNodes: expect.any(Number),
1153 busyWorkerNodes: expect.any(Number),
1154 executedTasks: expect.any(Number),
1155 executingTasks: expect.any(Number),
1156 failedTasks: expect.any(Number)
1157 })
1158 await pool.destroy()
1159 })
1160
1161 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1162 const pool = new FixedThreadPool(
1163 numberOfWorkers,
1164 './tests/worker-files/thread/testWorker.mjs'
1165 )
1166 expect(pool.emitter.eventNames()).toStrictEqual([])
1167 const promises = new Set()
1168 let poolBusy = 0
1169 let poolInfo
1170 pool.emitter.on(PoolEvents.busy, info => {
1171 ++poolBusy
1172 poolInfo = info
1173 })
1174 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1175 for (let i = 0; i < numberOfWorkers * 2; i++) {
1176 promises.add(pool.execute())
1177 }
1178 await Promise.all(promises)
1179 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1180 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1181 expect(poolBusy).toBe(numberOfWorkers + 1)
1182 expect(poolInfo).toStrictEqual({
1183 version,
1184 type: PoolTypes.fixed,
1185 worker: WorkerTypes.thread,
1186 started: true,
1187 ready: true,
1188 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1189 minSize: expect.any(Number),
1190 maxSize: expect.any(Number),
1191 workerNodes: expect.any(Number),
1192 idleWorkerNodes: expect.any(Number),
1193 busyWorkerNodes: expect.any(Number),
1194 executedTasks: expect.any(Number),
1195 executingTasks: expect.any(Number),
1196 failedTasks: expect.any(Number)
1197 })
1198 await pool.destroy()
1199 })
1200
1201 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1202 const pool = new DynamicThreadPool(
1203 Math.floor(numberOfWorkers / 2),
1204 numberOfWorkers,
1205 './tests/worker-files/thread/testWorker.mjs'
1206 )
1207 expect(pool.emitter.eventNames()).toStrictEqual([])
1208 const promises = new Set()
1209 let poolFull = 0
1210 let poolInfo
1211 pool.emitter.on(PoolEvents.full, info => {
1212 ++poolFull
1213 poolInfo = info
1214 })
1215 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1216 for (let i = 0; i < numberOfWorkers * 2; i++) {
1217 promises.add(pool.execute())
1218 }
1219 await Promise.all(promises)
1220 expect(poolFull).toBe(1)
1221 expect(poolInfo).toStrictEqual({
1222 version,
1223 type: PoolTypes.dynamic,
1224 worker: WorkerTypes.thread,
1225 started: true,
1226 ready: true,
1227 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1228 minSize: expect.any(Number),
1229 maxSize: expect.any(Number),
1230 workerNodes: expect.any(Number),
1231 idleWorkerNodes: expect.any(Number),
1232 busyWorkerNodes: expect.any(Number),
1233 executedTasks: expect.any(Number),
1234 executingTasks: expect.any(Number),
1235 failedTasks: expect.any(Number)
1236 })
1237 await pool.destroy()
1238 })
1239
1240 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1241 const pool = new FixedThreadPool(
1242 numberOfWorkers,
1243 './tests/worker-files/thread/testWorker.mjs',
1244 {
1245 enableTasksQueue: true
1246 }
1247 )
1248 stub(pool, 'hasBackPressure').returns(true)
1249 expect(pool.emitter.eventNames()).toStrictEqual([])
1250 const promises = new Set()
1251 let poolBackPressure = 0
1252 let poolInfo
1253 pool.emitter.on(PoolEvents.backPressure, info => {
1254 ++poolBackPressure
1255 poolInfo = info
1256 })
1257 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1258 for (let i = 0; i < numberOfWorkers + 1; i++) {
1259 promises.add(pool.execute())
1260 }
1261 await Promise.all(promises)
1262 expect(poolBackPressure).toBe(1)
1263 expect(poolInfo).toStrictEqual({
1264 version,
1265 type: PoolTypes.fixed,
1266 worker: WorkerTypes.thread,
1267 started: true,
1268 ready: true,
1269 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1270 minSize: expect.any(Number),
1271 maxSize: expect.any(Number),
1272 workerNodes: expect.any(Number),
1273 idleWorkerNodes: expect.any(Number),
1274 busyWorkerNodes: expect.any(Number),
1275 executedTasks: expect.any(Number),
1276 executingTasks: expect.any(Number),
1277 maxQueuedTasks: expect.any(Number),
1278 queuedTasks: expect.any(Number),
1279 backPressure: true,
1280 stolenTasks: expect.any(Number),
1281 failedTasks: expect.any(Number)
1282 })
1283 expect(pool.hasBackPressure.callCount).toBe(5)
1284 await pool.destroy()
1285 })
1286
1287 it('Verify that destroy() waits for queued tasks to finish', async () => {
1288 const tasksFinishedTimeout = 2500
1289 const pool = new FixedThreadPool(
1290 numberOfWorkers,
1291 './tests/worker-files/thread/asyncWorker.mjs',
1292 {
1293 enableTasksQueue: true,
1294 tasksQueueOptions: { tasksFinishedTimeout }
1295 }
1296 )
1297 const maxMultiplier = 4
1298 let tasksFinished = 0
1299 for (const workerNode of pool.workerNodes) {
1300 workerNode.on('taskFinished', () => {
1301 ++tasksFinished
1302 })
1303 }
1304 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1305 pool.execute()
1306 }
1307 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1308 const startTime = performance.now()
1309 await pool.destroy()
1310 const elapsedTime = performance.now() - startTime
1311 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1312 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1313 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1314 })
1315
1316 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1317 const tasksFinishedTimeout = 1000
1318 const pool = new FixedThreadPool(
1319 numberOfWorkers,
1320 './tests/worker-files/thread/asyncWorker.mjs',
1321 {
1322 enableTasksQueue: true,
1323 tasksQueueOptions: { tasksFinishedTimeout }
1324 }
1325 )
1326 const maxMultiplier = 4
1327 let tasksFinished = 0
1328 for (const workerNode of pool.workerNodes) {
1329 workerNode.on('taskFinished', () => {
1330 ++tasksFinished
1331 })
1332 }
1333 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1334 pool.execute()
1335 }
1336 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1337 const startTime = performance.now()
1338 await pool.destroy()
1339 const elapsedTime = performance.now() - startTime
1340 expect(tasksFinished).toBe(0)
1341 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1342 })
1343
1344 it('Verify that pool asynchronous resource track tasks execution', async () => {
1345 let taskAsyncId
1346 let initCalls = 0
1347 let beforeCalls = 0
1348 let afterCalls = 0
1349 let resolveCalls = 0
1350 const hook = createHook({
1351 init (asyncId, type) {
1352 if (type === 'poolifier:task') {
1353 initCalls++
1354 taskAsyncId = asyncId
1355 }
1356 },
1357 before (asyncId) {
1358 if (asyncId === taskAsyncId) beforeCalls++
1359 },
1360 after (asyncId) {
1361 if (asyncId === taskAsyncId) afterCalls++
1362 },
1363 promiseResolve () {
1364 if (executionAsyncId() === taskAsyncId) resolveCalls++
1365 }
1366 })
1367 const pool = new FixedThreadPool(
1368 numberOfWorkers,
1369 './tests/worker-files/thread/testWorker.mjs'
1370 )
1371 hook.enable()
1372 await pool.execute()
1373 hook.disable()
1374 expect(initCalls).toBe(1)
1375 expect(beforeCalls).toBe(1)
1376 expect(afterCalls).toBe(1)
1377 expect(resolveCalls).toBe(1)
1378 await pool.destroy()
1379 })
1380
1381 it('Verify that hasTaskFunction() is working', async () => {
1382 const dynamicThreadPool = new DynamicThreadPool(
1383 Math.floor(numberOfWorkers / 2),
1384 numberOfWorkers,
1385 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1386 )
1387 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1388 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1389 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1390 true
1391 )
1392 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1393 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1394 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1395 await dynamicThreadPool.destroy()
1396 const fixedClusterPool = new FixedClusterPool(
1397 numberOfWorkers,
1398 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1399 )
1400 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1401 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1402 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1403 true
1404 )
1405 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1406 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1407 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1408 await fixedClusterPool.destroy()
1409 })
1410
1411 it('Verify that addTaskFunction() is working', async () => {
1412 const dynamicThreadPool = new DynamicThreadPool(
1413 Math.floor(numberOfWorkers / 2),
1414 numberOfWorkers,
1415 './tests/worker-files/thread/testWorker.mjs'
1416 )
1417 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1418 await expect(
1419 dynamicThreadPool.addTaskFunction(0, () => {})
1420 ).rejects.toThrow(new TypeError('name argument must be a string'))
1421 await expect(
1422 dynamicThreadPool.addTaskFunction('', () => {})
1423 ).rejects.toThrow(
1424 new TypeError('name argument must not be an empty string')
1425 )
1426 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1427 new TypeError('fn argument must be a function')
1428 )
1429 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1430 new TypeError('fn argument must be a function')
1431 )
1432 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1433 DEFAULT_TASK_NAME,
1434 'test'
1435 ])
1436 const echoTaskFunction = data => {
1437 return data
1438 }
1439 await expect(
1440 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1441 ).resolves.toBe(true)
1442 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1443 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1444 echoTaskFunction
1445 )
1446 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1447 DEFAULT_TASK_NAME,
1448 'test',
1449 'echo'
1450 ])
1451 const taskFunctionData = { test: 'test' }
1452 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1453 expect(echoResult).toStrictEqual(taskFunctionData)
1454 for (const workerNode of dynamicThreadPool.workerNodes) {
1455 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1456 tasks: {
1457 executed: expect.any(Number),
1458 executing: 0,
1459 queued: 0,
1460 sequentiallyStolen: 0,
1461 stolen: 0,
1462 failed: 0
1463 },
1464 runTime: {
1465 history: new CircularArray()
1466 },
1467 waitTime: {
1468 history: new CircularArray()
1469 },
1470 elu: {
1471 idle: {
1472 history: new CircularArray()
1473 },
1474 active: {
1475 history: new CircularArray()
1476 }
1477 }
1478 })
1479 }
1480 await dynamicThreadPool.destroy()
1481 })
1482
1483 it('Verify that removeTaskFunction() is working', async () => {
1484 const dynamicThreadPool = new DynamicThreadPool(
1485 Math.floor(numberOfWorkers / 2),
1486 numberOfWorkers,
1487 './tests/worker-files/thread/testWorker.mjs'
1488 )
1489 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1490 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1491 DEFAULT_TASK_NAME,
1492 'test'
1493 ])
1494 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1495 new Error('Cannot remove a task function not handled on the pool side')
1496 )
1497 const echoTaskFunction = data => {
1498 return data
1499 }
1500 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1501 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1502 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1503 echoTaskFunction
1504 )
1505 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1506 DEFAULT_TASK_NAME,
1507 'test',
1508 'echo'
1509 ])
1510 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1511 true
1512 )
1513 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1514 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1515 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1516 DEFAULT_TASK_NAME,
1517 'test'
1518 ])
1519 await dynamicThreadPool.destroy()
1520 })
1521
1522 it('Verify that listTaskFunctionNames() is working', async () => {
1523 const dynamicThreadPool = new DynamicThreadPool(
1524 Math.floor(numberOfWorkers / 2),
1525 numberOfWorkers,
1526 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1527 )
1528 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1529 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1530 DEFAULT_TASK_NAME,
1531 'jsonIntegerSerialization',
1532 'factorial',
1533 'fibonacci'
1534 ])
1535 await dynamicThreadPool.destroy()
1536 const fixedClusterPool = new FixedClusterPool(
1537 numberOfWorkers,
1538 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1539 )
1540 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1541 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1542 DEFAULT_TASK_NAME,
1543 'jsonIntegerSerialization',
1544 'factorial',
1545 'fibonacci'
1546 ])
1547 await fixedClusterPool.destroy()
1548 })
1549
1550 it('Verify that setDefaultTaskFunction() is working', async () => {
1551 const dynamicThreadPool = new DynamicThreadPool(
1552 Math.floor(numberOfWorkers / 2),
1553 numberOfWorkers,
1554 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1555 )
1556 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1557 const workerId = dynamicThreadPool.workerNodes[0].info.id
1558 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1559 new Error(
1560 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1561 )
1562 )
1563 await expect(
1564 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1565 ).rejects.toThrow(
1566 new Error(
1567 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1568 )
1569 )
1570 await expect(
1571 dynamicThreadPool.setDefaultTaskFunction('unknown')
1572 ).rejects.toThrow(
1573 new Error(
1574 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1575 )
1576 )
1577 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1578 DEFAULT_TASK_NAME,
1579 'jsonIntegerSerialization',
1580 'factorial',
1581 'fibonacci'
1582 ])
1583 await expect(
1584 dynamicThreadPool.setDefaultTaskFunction('factorial')
1585 ).resolves.toBe(true)
1586 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1587 DEFAULT_TASK_NAME,
1588 'factorial',
1589 'jsonIntegerSerialization',
1590 'fibonacci'
1591 ])
1592 await expect(
1593 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1594 ).resolves.toBe(true)
1595 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1596 DEFAULT_TASK_NAME,
1597 'fibonacci',
1598 'jsonIntegerSerialization',
1599 'factorial'
1600 ])
1601 await dynamicThreadPool.destroy()
1602 })
1603
1604 it('Verify that multiple task functions worker is working', async () => {
1605 const pool = new DynamicClusterPool(
1606 Math.floor(numberOfWorkers / 2),
1607 numberOfWorkers,
1608 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1609 )
1610 const data = { n: 10 }
1611 const result0 = await pool.execute(data)
1612 expect(result0).toStrictEqual({ ok: 1 })
1613 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1614 expect(result1).toStrictEqual({ ok: 1 })
1615 const result2 = await pool.execute(data, 'factorial')
1616 expect(result2).toBe(3628800)
1617 const result3 = await pool.execute(data, 'fibonacci')
1618 expect(result3).toBe(55)
1619 expect(pool.info.executingTasks).toBe(0)
1620 expect(pool.info.executedTasks).toBe(4)
1621 for (const workerNode of pool.workerNodes) {
1622 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1623 DEFAULT_TASK_NAME,
1624 'jsonIntegerSerialization',
1625 'factorial',
1626 'fibonacci'
1627 ])
1628 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1629 for (const name of pool.listTaskFunctionNames()) {
1630 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1631 tasks: {
1632 executed: expect.any(Number),
1633 executing: 0,
1634 failed: 0,
1635 queued: 0,
1636 sequentiallyStolen: 0,
1637 stolen: 0
1638 },
1639 runTime: {
1640 history: expect.any(CircularArray)
1641 },
1642 waitTime: {
1643 history: expect.any(CircularArray)
1644 },
1645 elu: {
1646 idle: {
1647 history: expect.any(CircularArray)
1648 },
1649 active: {
1650 history: expect.any(CircularArray)
1651 }
1652 }
1653 })
1654 expect(
1655 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1656 ).toBeGreaterThan(0)
1657 }
1658 expect(
1659 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1660 ).toStrictEqual(
1661 workerNode.getTaskFunctionWorkerUsage(
1662 workerNode.info.taskFunctionNames[1]
1663 )
1664 )
1665 }
1666 await pool.destroy()
1667 })
1668
1669 it('Verify sendKillMessageToWorker()', async () => {
1670 const pool = new DynamicClusterPool(
1671 Math.floor(numberOfWorkers / 2),
1672 numberOfWorkers,
1673 './tests/worker-files/cluster/testWorker.js'
1674 )
1675 const workerNodeKey = 0
1676 await expect(
1677 pool.sendKillMessageToWorker(workerNodeKey)
1678 ).resolves.toBeUndefined()
1679 await expect(
1680 pool.sendKillMessageToWorker(numberOfWorkers)
1681 ).rejects.toStrictEqual(
1682 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1683 )
1684 await pool.destroy()
1685 })
1686
1687 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1688 const pool = new DynamicClusterPool(
1689 Math.floor(numberOfWorkers / 2),
1690 numberOfWorkers,
1691 './tests/worker-files/cluster/testWorker.js'
1692 )
1693 const workerNodeKey = 0
1694 await expect(
1695 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1696 taskFunctionOperation: 'add',
1697 taskFunctionName: 'empty',
1698 taskFunction: (() => {}).toString()
1699 })
1700 ).resolves.toBe(true)
1701 expect(
1702 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1703 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1704 await pool.destroy()
1705 })
1706
1707 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1708 const pool = new DynamicClusterPool(
1709 Math.floor(numberOfWorkers / 2),
1710 numberOfWorkers,
1711 './tests/worker-files/cluster/testWorker.js'
1712 )
1713 await expect(
1714 pool.sendTaskFunctionOperationToWorkers({
1715 taskFunctionOperation: 'add',
1716 taskFunctionName: 'empty',
1717 taskFunction: (() => {}).toString()
1718 })
1719 ).resolves.toBe(true)
1720 for (const workerNode of pool.workerNodes) {
1721 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1722 DEFAULT_TASK_NAME,
1723 'test',
1724 'empty'
1725 ])
1726 }
1727 await pool.destroy()
1728 })
1729 })