cf4b55afc26a6a1278f886a0d4c2762b1c230f07
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
8 import {
9 DynamicClusterPool,
10 DynamicThreadPool,
11 FixedClusterPool,
12 FixedThreadPool,
13 PoolEvents,
14 PoolTypes,
15 WorkerChoiceStrategies,
16 WorkerTypes
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
23
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
26 readFileSync(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
28 'utf8'
29 )
30 ).version
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
33 isMain () {
34 return false
35 }
36 }
37
38 afterEach(() => {
39 restore()
40 })
41
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
52 expect(
53 () =>
54 new StubPoolWithIsMain(
55 numberOfWorkers,
56 './tests/worker-files/thread/testWorker.mjs',
57 {
58 errorHandler: e => console.error(e)
59 }
60 )
61 ).toThrow(
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
65 )
66 })
67
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
71 './tests/worker-files/thread/testWorker.mjs'
72 )
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
76 await pool.destroy()
77 })
78
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
96 './tests/worker-files/thread/testWorker.mjs'
97 )
98 ).toThrow(
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
109 ).toThrow(
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
120 ).toThrow(
121 new TypeError(
122 'Cannot instantiate a pool with a non safe integer number of workers'
123 )
124 )
125 })
126
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
143 it('Verify that dynamic pool sizing is checked', () => {
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
151 ).toThrow(
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
161 './tests/worker-files/thread/testWorker.mjs'
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
173 './tests/worker-files/cluster/testWorker.js'
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
180 expect(
181 () =>
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
202 )
203 )
204 expect(
205 () =>
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
211 ).toThrow(
212 new RangeError(
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
214 )
215 )
216 })
217
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
221 './tests/worker-files/thread/testWorker.mjs'
222 )
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
232 retries:
233 pool.info.maxSize +
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
242 })
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual({
246 retries:
247 pool.info.maxSize +
248 Object.keys(workerChoiceStrategy.opts.weights).length,
249 runTime: { median: false },
250 waitTime: { median: false },
251 elu: { median: false },
252 weights: expect.objectContaining({
253 0: expect.any(Number),
254 [pool.info.maxSize - 1]: expect.any(Number)
255 })
256 })
257 }
258 await pool.destroy()
259 const testHandler = () => console.info('test handler executed')
260 pool = new FixedThreadPool(
261 numberOfWorkers,
262 './tests/worker-files/thread/testWorker.mjs',
263 {
264 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
265 workerChoiceStrategyOptions: {
266 runTime: { median: true },
267 weights: { 0: 300, 1: 200 }
268 },
269 enableEvents: false,
270 restartWorkerOnError: false,
271 enableTasksQueue: true,
272 tasksQueueOptions: { concurrency: 2 },
273 messageHandler: testHandler,
274 errorHandler: testHandler,
275 onlineHandler: testHandler,
276 exitHandler: testHandler
277 }
278 )
279 expect(pool.emitter).toBeUndefined()
280 expect(pool.opts).toStrictEqual({
281 startWorkers: true,
282 enableEvents: false,
283 restartWorkerOnError: false,
284 enableTasksQueue: true,
285 tasksQueueOptions: {
286 concurrency: 2,
287 size: Math.pow(numberOfWorkers, 2),
288 taskStealing: true,
289 tasksStealingOnBackPressure: true,
290 tasksFinishedTimeout: 2000
291 },
292 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
293 workerChoiceStrategyOptions: {
294 runTime: { median: true },
295 weights: { 0: 300, 1: 200 }
296 },
297 onlineHandler: testHandler,
298 messageHandler: testHandler,
299 errorHandler: testHandler,
300 exitHandler: testHandler
301 })
302 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
303 retries:
304 pool.info.maxSize +
305 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
306 runTime: { median: true },
307 waitTime: { median: false },
308 elu: { median: false },
309 weights: { 0: 300, 1: 200 }
310 })
311 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
312 .workerChoiceStrategies) {
313 expect(workerChoiceStrategy.opts).toStrictEqual({
314 retries:
315 pool.info.maxSize +
316 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
317 runTime: { median: true },
318 waitTime: { median: false },
319 elu: { median: false },
320 weights: { 0: 300, 1: 200 }
321 })
322 }
323 await pool.destroy()
324 })
325
326 it('Verify that pool options are validated', () => {
327 expect(
328 () =>
329 new FixedThreadPool(
330 numberOfWorkers,
331 './tests/worker-files/thread/testWorker.mjs',
332 {
333 workerChoiceStrategy: 'invalidStrategy'
334 }
335 )
336 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
337 expect(
338 () =>
339 new FixedThreadPool(
340 numberOfWorkers,
341 './tests/worker-files/thread/testWorker.mjs',
342 {
343 workerChoiceStrategyOptions: { weights: {} }
344 }
345 )
346 ).toThrow(
347 new Error(
348 'Invalid worker choice strategy options: must have a weight for each worker node'
349 )
350 )
351 expect(
352 () =>
353 new FixedThreadPool(
354 numberOfWorkers,
355 './tests/worker-files/thread/testWorker.mjs',
356 {
357 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
358 }
359 )
360 ).toThrow(
361 new Error(
362 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
363 )
364 )
365 expect(
366 () =>
367 new FixedThreadPool(
368 numberOfWorkers,
369 './tests/worker-files/thread/testWorker.mjs',
370 {
371 enableTasksQueue: true,
372 tasksQueueOptions: 'invalidTasksQueueOptions'
373 }
374 )
375 ).toThrow(
376 new TypeError('Invalid tasks queue options: must be a plain object')
377 )
378 expect(
379 () =>
380 new FixedThreadPool(
381 numberOfWorkers,
382 './tests/worker-files/thread/testWorker.mjs',
383 {
384 enableTasksQueue: true,
385 tasksQueueOptions: { concurrency: 0 }
386 }
387 )
388 ).toThrow(
389 new RangeError(
390 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
391 )
392 )
393 expect(
394 () =>
395 new FixedThreadPool(
396 numberOfWorkers,
397 './tests/worker-files/thread/testWorker.mjs',
398 {
399 enableTasksQueue: true,
400 tasksQueueOptions: { concurrency: -1 }
401 }
402 )
403 ).toThrow(
404 new RangeError(
405 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
406 )
407 )
408 expect(
409 () =>
410 new FixedThreadPool(
411 numberOfWorkers,
412 './tests/worker-files/thread/testWorker.mjs',
413 {
414 enableTasksQueue: true,
415 tasksQueueOptions: { concurrency: 0.2 }
416 }
417 )
418 ).toThrow(
419 new TypeError('Invalid worker node tasks concurrency: must be an integer')
420 )
421 expect(
422 () =>
423 new FixedThreadPool(
424 numberOfWorkers,
425 './tests/worker-files/thread/testWorker.mjs',
426 {
427 enableTasksQueue: true,
428 tasksQueueOptions: { size: 0 }
429 }
430 )
431 ).toThrow(
432 new RangeError(
433 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
434 )
435 )
436 expect(
437 () =>
438 new FixedThreadPool(
439 numberOfWorkers,
440 './tests/worker-files/thread/testWorker.mjs',
441 {
442 enableTasksQueue: true,
443 tasksQueueOptions: { size: -1 }
444 }
445 )
446 ).toThrow(
447 new RangeError(
448 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
449 )
450 )
451 expect(
452 () =>
453 new FixedThreadPool(
454 numberOfWorkers,
455 './tests/worker-files/thread/testWorker.mjs',
456 {
457 enableTasksQueue: true,
458 tasksQueueOptions: { size: 0.2 }
459 }
460 )
461 ).toThrow(
462 new TypeError('Invalid worker node tasks queue size: must be an integer')
463 )
464 })
465
466 it('Verify that pool worker choice strategy options can be set', async () => {
467 const pool = new FixedThreadPool(
468 numberOfWorkers,
469 './tests/worker-files/thread/testWorker.mjs',
470 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
471 )
472 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
473 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
474 retries:
475 pool.info.maxSize +
476 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
477 runTime: { median: false },
478 waitTime: { median: false },
479 elu: { median: false },
480 weights: expect.objectContaining({
481 0: expect.any(Number),
482 [pool.info.maxSize - 1]: expect.any(Number)
483 })
484 })
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
487 expect(workerChoiceStrategy.opts).toStrictEqual({
488 retries:
489 pool.info.maxSize +
490 Object.keys(workerChoiceStrategy.opts.weights).length,
491 runTime: { median: false },
492 waitTime: { median: false },
493 elu: { median: false },
494 weights: expect.objectContaining({
495 0: expect.any(Number),
496 [pool.info.maxSize - 1]: expect.any(Number)
497 })
498 })
499 }
500 expect(
501 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
502 ).toStrictEqual({
503 runTime: {
504 aggregate: true,
505 average: true,
506 median: false
507 },
508 waitTime: {
509 aggregate: false,
510 average: false,
511 median: false
512 },
513 elu: {
514 aggregate: true,
515 average: true,
516 median: false
517 }
518 })
519 pool.setWorkerChoiceStrategyOptions({
520 runTime: { median: true },
521 elu: { median: true }
522 })
523 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
524 runTime: { median: true },
525 elu: { median: true }
526 })
527 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
528 retries:
529 pool.info.maxSize +
530 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
531 runTime: { median: true },
532 waitTime: { median: false },
533 elu: { median: true },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
537 })
538 })
539 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
540 .workerChoiceStrategies) {
541 expect(workerChoiceStrategy.opts).toStrictEqual({
542 retries:
543 pool.info.maxSize +
544 Object.keys(workerChoiceStrategy.opts.weights).length,
545 runTime: { median: true },
546 waitTime: { median: false },
547 elu: { median: true },
548 weights: expect.objectContaining({
549 0: expect.any(Number),
550 [pool.info.maxSize - 1]: expect.any(Number)
551 })
552 })
553 }
554 expect(
555 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
556 ).toStrictEqual({
557 runTime: {
558 aggregate: true,
559 average: false,
560 median: true
561 },
562 waitTime: {
563 aggregate: false,
564 average: false,
565 median: false
566 },
567 elu: {
568 aggregate: true,
569 average: false,
570 median: true
571 }
572 })
573 pool.setWorkerChoiceStrategyOptions({
574 runTime: { median: false },
575 elu: { median: false }
576 })
577 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
578 runTime: { median: false },
579 elu: { median: false }
580 })
581 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
582 retries:
583 pool.info.maxSize +
584 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
585 runTime: { median: false },
586 waitTime: { median: false },
587 elu: { median: false },
588 weights: expect.objectContaining({
589 0: expect.any(Number),
590 [pool.info.maxSize - 1]: expect.any(Number)
591 })
592 })
593 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
594 .workerChoiceStrategies) {
595 expect(workerChoiceStrategy.opts).toStrictEqual({
596 retries:
597 pool.info.maxSize +
598 Object.keys(workerChoiceStrategy.opts.weights).length,
599 runTime: { median: false },
600 waitTime: { median: false },
601 elu: { median: false },
602 weights: expect.objectContaining({
603 0: expect.any(Number),
604 [pool.info.maxSize - 1]: expect.any(Number)
605 })
606 })
607 }
608 expect(
609 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
610 ).toStrictEqual({
611 runTime: {
612 aggregate: true,
613 average: true,
614 median: false
615 },
616 waitTime: {
617 aggregate: false,
618 average: false,
619 median: false
620 },
621 elu: {
622 aggregate: true,
623 average: true,
624 median: false
625 }
626 })
627 expect(() =>
628 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
629 ).toThrow(
630 new TypeError(
631 'Invalid worker choice strategy options: must be a plain object'
632 )
633 )
634 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
635 new Error(
636 'Invalid worker choice strategy options: must have a weight for each worker node'
637 )
638 )
639 expect(() =>
640 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
641 ).toThrow(
642 new Error(
643 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
644 )
645 )
646 await pool.destroy()
647 })
648
649 it('Verify that pool tasks queue can be enabled/disabled', async () => {
650 const pool = new FixedThreadPool(
651 numberOfWorkers,
652 './tests/worker-files/thread/testWorker.mjs'
653 )
654 expect(pool.opts.enableTasksQueue).toBe(false)
655 expect(pool.opts.tasksQueueOptions).toBeUndefined()
656 pool.enableTasksQueue(true)
657 expect(pool.opts.enableTasksQueue).toBe(true)
658 expect(pool.opts.tasksQueueOptions).toStrictEqual({
659 concurrency: 1,
660 size: Math.pow(numberOfWorkers, 2),
661 taskStealing: true,
662 tasksStealingOnBackPressure: true,
663 tasksFinishedTimeout: 2000
664 })
665 pool.enableTasksQueue(true, { concurrency: 2 })
666 expect(pool.opts.enableTasksQueue).toBe(true)
667 expect(pool.opts.tasksQueueOptions).toStrictEqual({
668 concurrency: 2,
669 size: Math.pow(numberOfWorkers, 2),
670 taskStealing: true,
671 tasksStealingOnBackPressure: true,
672 tasksFinishedTimeout: 2000
673 })
674 pool.enableTasksQueue(false)
675 expect(pool.opts.enableTasksQueue).toBe(false)
676 expect(pool.opts.tasksQueueOptions).toBeUndefined()
677 await pool.destroy()
678 })
679
680 it('Verify that pool tasks queue options can be set', async () => {
681 const pool = new FixedThreadPool(
682 numberOfWorkers,
683 './tests/worker-files/thread/testWorker.mjs',
684 { enableTasksQueue: true }
685 )
686 expect(pool.opts.tasksQueueOptions).toStrictEqual({
687 concurrency: 1,
688 size: Math.pow(numberOfWorkers, 2),
689 taskStealing: true,
690 tasksStealingOnBackPressure: true,
691 tasksFinishedTimeout: 2000
692 })
693 for (const workerNode of pool.workerNodes) {
694 expect(workerNode.tasksQueueBackPressureSize).toBe(
695 pool.opts.tasksQueueOptions.size
696 )
697 }
698 pool.setTasksQueueOptions({
699 concurrency: 2,
700 size: 2,
701 taskStealing: false,
702 tasksStealingOnBackPressure: false,
703 tasksFinishedTimeout: 3000
704 })
705 expect(pool.opts.tasksQueueOptions).toStrictEqual({
706 concurrency: 2,
707 size: 2,
708 taskStealing: false,
709 tasksStealingOnBackPressure: false,
710 tasksFinishedTimeout: 3000
711 })
712 for (const workerNode of pool.workerNodes) {
713 expect(workerNode.tasksQueueBackPressureSize).toBe(
714 pool.opts.tasksQueueOptions.size
715 )
716 }
717 pool.setTasksQueueOptions({
718 concurrency: 1,
719 taskStealing: true,
720 tasksStealingOnBackPressure: true
721 })
722 expect(pool.opts.tasksQueueOptions).toStrictEqual({
723 concurrency: 1,
724 size: Math.pow(numberOfWorkers, 2),
725 taskStealing: true,
726 tasksStealingOnBackPressure: true,
727 tasksFinishedTimeout: 2000
728 })
729 for (const workerNode of pool.workerNodes) {
730 expect(workerNode.tasksQueueBackPressureSize).toBe(
731 pool.opts.tasksQueueOptions.size
732 )
733 }
734 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
735 new TypeError('Invalid tasks queue options: must be a plain object')
736 )
737 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
738 new RangeError(
739 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
740 )
741 )
742 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
743 new RangeError(
744 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
745 )
746 )
747 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
748 new TypeError('Invalid worker node tasks concurrency: must be an integer')
749 )
750 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
751 new RangeError(
752 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
753 )
754 )
755 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
756 new RangeError(
757 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
758 )
759 )
760 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
761 new TypeError('Invalid worker node tasks queue size: must be an integer')
762 )
763 await pool.destroy()
764 })
765
766 it('Verify that pool info is set', async () => {
767 let pool = new FixedThreadPool(
768 numberOfWorkers,
769 './tests/worker-files/thread/testWorker.mjs'
770 )
771 expect(pool.info).toStrictEqual({
772 version,
773 type: PoolTypes.fixed,
774 worker: WorkerTypes.thread,
775 started: true,
776 ready: true,
777 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
778 minSize: numberOfWorkers,
779 maxSize: numberOfWorkers,
780 workerNodes: numberOfWorkers,
781 idleWorkerNodes: numberOfWorkers,
782 busyWorkerNodes: 0,
783 executedTasks: 0,
784 executingTasks: 0,
785 failedTasks: 0
786 })
787 await pool.destroy()
788 pool = new DynamicClusterPool(
789 Math.floor(numberOfWorkers / 2),
790 numberOfWorkers,
791 './tests/worker-files/cluster/testWorker.js'
792 )
793 expect(pool.info).toStrictEqual({
794 version,
795 type: PoolTypes.dynamic,
796 worker: WorkerTypes.cluster,
797 started: true,
798 ready: true,
799 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
800 minSize: Math.floor(numberOfWorkers / 2),
801 maxSize: numberOfWorkers,
802 workerNodes: Math.floor(numberOfWorkers / 2),
803 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
804 busyWorkerNodes: 0,
805 executedTasks: 0,
806 executingTasks: 0,
807 failedTasks: 0
808 })
809 await pool.destroy()
810 })
811
812 it('Verify that pool worker tasks usage are initialized', async () => {
813 const pool = new FixedClusterPool(
814 numberOfWorkers,
815 './tests/worker-files/cluster/testWorker.js'
816 )
817 for (const workerNode of pool.workerNodes) {
818 expect(workerNode).toBeInstanceOf(WorkerNode)
819 expect(workerNode.usage).toStrictEqual({
820 tasks: {
821 executed: 0,
822 executing: 0,
823 queued: 0,
824 maxQueued: 0,
825 sequentiallyStolen: 0,
826 stolen: 0,
827 failed: 0
828 },
829 runTime: {
830 history: new CircularArray()
831 },
832 waitTime: {
833 history: new CircularArray()
834 },
835 elu: {
836 idle: {
837 history: new CircularArray()
838 },
839 active: {
840 history: new CircularArray()
841 }
842 }
843 })
844 }
845 await pool.destroy()
846 })
847
848 it('Verify that pool worker tasks queue are initialized', async () => {
849 let pool = new FixedClusterPool(
850 numberOfWorkers,
851 './tests/worker-files/cluster/testWorker.js'
852 )
853 for (const workerNode of pool.workerNodes) {
854 expect(workerNode).toBeInstanceOf(WorkerNode)
855 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
856 expect(workerNode.tasksQueue.size).toBe(0)
857 expect(workerNode.tasksQueue.maxSize).toBe(0)
858 }
859 await pool.destroy()
860 pool = new DynamicThreadPool(
861 Math.floor(numberOfWorkers / 2),
862 numberOfWorkers,
863 './tests/worker-files/thread/testWorker.mjs'
864 )
865 for (const workerNode of pool.workerNodes) {
866 expect(workerNode).toBeInstanceOf(WorkerNode)
867 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
868 expect(workerNode.tasksQueue.size).toBe(0)
869 expect(workerNode.tasksQueue.maxSize).toBe(0)
870 }
871 await pool.destroy()
872 })
873
874 it('Verify that pool worker info are initialized', async () => {
875 let pool = new FixedClusterPool(
876 numberOfWorkers,
877 './tests/worker-files/cluster/testWorker.js'
878 )
879 for (const workerNode of pool.workerNodes) {
880 expect(workerNode).toBeInstanceOf(WorkerNode)
881 expect(workerNode.info).toStrictEqual({
882 id: expect.any(Number),
883 type: WorkerTypes.cluster,
884 dynamic: false,
885 ready: true,
886 stealing: false
887 })
888 }
889 await pool.destroy()
890 pool = new DynamicThreadPool(
891 Math.floor(numberOfWorkers / 2),
892 numberOfWorkers,
893 './tests/worker-files/thread/testWorker.mjs'
894 )
895 for (const workerNode of pool.workerNodes) {
896 expect(workerNode).toBeInstanceOf(WorkerNode)
897 expect(workerNode.info).toStrictEqual({
898 id: expect.any(Number),
899 type: WorkerTypes.thread,
900 dynamic: false,
901 ready: true,
902 stealing: false
903 })
904 }
905 await pool.destroy()
906 })
907
908 it('Verify that pool statuses are checked at start or destroy', async () => {
909 const pool = new FixedThreadPool(
910 numberOfWorkers,
911 './tests/worker-files/thread/testWorker.mjs'
912 )
913 expect(pool.info.started).toBe(true)
914 expect(pool.info.ready).toBe(true)
915 expect(() => pool.start()).toThrow(
916 new Error('Cannot start an already started pool')
917 )
918 await pool.destroy()
919 expect(pool.info.started).toBe(false)
920 expect(pool.info.ready).toBe(false)
921 await expect(pool.destroy()).rejects.toThrow(
922 new Error('Cannot destroy an already destroyed pool')
923 )
924 })
925
926 it('Verify that pool can be started after initialization', async () => {
927 const pool = new FixedClusterPool(
928 numberOfWorkers,
929 './tests/worker-files/cluster/testWorker.js',
930 {
931 startWorkers: false
932 }
933 )
934 expect(pool.info.started).toBe(false)
935 expect(pool.info.ready).toBe(false)
936 expect(pool.readyEventEmitted).toBe(false)
937 expect(pool.workerNodes).toStrictEqual([])
938 await expect(pool.execute()).rejects.toThrow(
939 new Error('Cannot execute a task on not started pool')
940 )
941 pool.start()
942 expect(pool.info.started).toBe(true)
943 expect(pool.info.ready).toBe(true)
944 await waitPoolEvents(pool, PoolEvents.ready, 1)
945 expect(pool.readyEventEmitted).toBe(true)
946 expect(pool.workerNodes.length).toBe(numberOfWorkers)
947 for (const workerNode of pool.workerNodes) {
948 expect(workerNode).toBeInstanceOf(WorkerNode)
949 }
950 await pool.destroy()
951 })
952
953 it('Verify that pool execute() arguments are checked', async () => {
954 const pool = new FixedClusterPool(
955 numberOfWorkers,
956 './tests/worker-files/cluster/testWorker.js'
957 )
958 await expect(pool.execute(undefined, 0)).rejects.toThrow(
959 new TypeError('name argument must be a string')
960 )
961 await expect(pool.execute(undefined, '')).rejects.toThrow(
962 new TypeError('name argument must not be an empty string')
963 )
964 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
965 new TypeError('transferList argument must be an array')
966 )
967 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
968 "Task function 'unknown' not found"
969 )
970 await pool.destroy()
971 await expect(pool.execute()).rejects.toThrow(
972 new Error('Cannot execute a task on not started pool')
973 )
974 })
975
976 it('Verify that pool worker tasks usage are computed', async () => {
977 const pool = new FixedClusterPool(
978 numberOfWorkers,
979 './tests/worker-files/cluster/testWorker.js'
980 )
981 const promises = new Set()
982 const maxMultiplier = 2
983 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
984 promises.add(pool.execute())
985 }
986 for (const workerNode of pool.workerNodes) {
987 expect(workerNode.usage).toStrictEqual({
988 tasks: {
989 executed: 0,
990 executing: maxMultiplier,
991 queued: 0,
992 maxQueued: 0,
993 sequentiallyStolen: 0,
994 stolen: 0,
995 failed: 0
996 },
997 runTime: {
998 history: expect.any(CircularArray)
999 },
1000 waitTime: {
1001 history: expect.any(CircularArray)
1002 },
1003 elu: {
1004 idle: {
1005 history: expect.any(CircularArray)
1006 },
1007 active: {
1008 history: expect.any(CircularArray)
1009 }
1010 }
1011 })
1012 }
1013 await Promise.all(promises)
1014 for (const workerNode of pool.workerNodes) {
1015 expect(workerNode.usage).toStrictEqual({
1016 tasks: {
1017 executed: maxMultiplier,
1018 executing: 0,
1019 queued: 0,
1020 maxQueued: 0,
1021 sequentiallyStolen: 0,
1022 stolen: 0,
1023 failed: 0
1024 },
1025 runTime: {
1026 history: expect.any(CircularArray)
1027 },
1028 waitTime: {
1029 history: expect.any(CircularArray)
1030 },
1031 elu: {
1032 idle: {
1033 history: expect.any(CircularArray)
1034 },
1035 active: {
1036 history: expect.any(CircularArray)
1037 }
1038 }
1039 })
1040 }
1041 await pool.destroy()
1042 })
1043
1044 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1045 const pool = new DynamicThreadPool(
1046 Math.floor(numberOfWorkers / 2),
1047 numberOfWorkers,
1048 './tests/worker-files/thread/testWorker.mjs'
1049 )
1050 const promises = new Set()
1051 const maxMultiplier = 2
1052 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1053 promises.add(pool.execute())
1054 }
1055 await Promise.all(promises)
1056 for (const workerNode of pool.workerNodes) {
1057 expect(workerNode.usage).toStrictEqual({
1058 tasks: {
1059 executed: expect.any(Number),
1060 executing: 0,
1061 queued: 0,
1062 maxQueued: 0,
1063 sequentiallyStolen: 0,
1064 stolen: 0,
1065 failed: 0
1066 },
1067 runTime: {
1068 history: expect.any(CircularArray)
1069 },
1070 waitTime: {
1071 history: expect.any(CircularArray)
1072 },
1073 elu: {
1074 idle: {
1075 history: expect.any(CircularArray)
1076 },
1077 active: {
1078 history: expect.any(CircularArray)
1079 }
1080 }
1081 })
1082 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1083 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1084 numberOfWorkers * maxMultiplier
1085 )
1086 expect(workerNode.usage.runTime.history.length).toBe(0)
1087 expect(workerNode.usage.waitTime.history.length).toBe(0)
1088 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1089 expect(workerNode.usage.elu.active.history.length).toBe(0)
1090 }
1091 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1092 for (const workerNode of pool.workerNodes) {
1093 expect(workerNode.usage).toStrictEqual({
1094 tasks: {
1095 executed: 0,
1096 executing: 0,
1097 queued: 0,
1098 maxQueued: 0,
1099 sequentiallyStolen: 0,
1100 stolen: 0,
1101 failed: 0
1102 },
1103 runTime: {
1104 history: expect.any(CircularArray)
1105 },
1106 waitTime: {
1107 history: expect.any(CircularArray)
1108 },
1109 elu: {
1110 idle: {
1111 history: expect.any(CircularArray)
1112 },
1113 active: {
1114 history: expect.any(CircularArray)
1115 }
1116 }
1117 })
1118 expect(workerNode.usage.runTime.history.length).toBe(0)
1119 expect(workerNode.usage.waitTime.history.length).toBe(0)
1120 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1121 expect(workerNode.usage.elu.active.history.length).toBe(0)
1122 }
1123 await pool.destroy()
1124 })
1125
1126 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1127 const pool = new DynamicClusterPool(
1128 Math.floor(numberOfWorkers / 2),
1129 numberOfWorkers,
1130 './tests/worker-files/cluster/testWorker.js'
1131 )
1132 expect(pool.emitter.eventNames()).toStrictEqual([])
1133 let poolInfo
1134 let poolReady = 0
1135 pool.emitter.on(PoolEvents.ready, info => {
1136 ++poolReady
1137 poolInfo = info
1138 })
1139 await waitPoolEvents(pool, PoolEvents.ready, 1)
1140 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1141 expect(poolReady).toBe(1)
1142 expect(poolInfo).toStrictEqual({
1143 version,
1144 type: PoolTypes.dynamic,
1145 worker: WorkerTypes.cluster,
1146 started: true,
1147 ready: true,
1148 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1149 minSize: expect.any(Number),
1150 maxSize: expect.any(Number),
1151 workerNodes: expect.any(Number),
1152 idleWorkerNodes: expect.any(Number),
1153 busyWorkerNodes: expect.any(Number),
1154 executedTasks: expect.any(Number),
1155 executingTasks: expect.any(Number),
1156 failedTasks: expect.any(Number)
1157 })
1158 await pool.destroy()
1159 })
1160
1161 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1162 const pool = new FixedThreadPool(
1163 numberOfWorkers,
1164 './tests/worker-files/thread/testWorker.mjs'
1165 )
1166 expect(pool.emitter.eventNames()).toStrictEqual([])
1167 const promises = new Set()
1168 let poolBusy = 0
1169 let poolInfo
1170 pool.emitter.on(PoolEvents.busy, info => {
1171 ++poolBusy
1172 poolInfo = info
1173 })
1174 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1175 for (let i = 0; i < numberOfWorkers * 2; i++) {
1176 promises.add(pool.execute())
1177 }
1178 await Promise.all(promises)
1179 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1180 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1181 expect(poolBusy).toBe(numberOfWorkers + 1)
1182 expect(poolInfo).toStrictEqual({
1183 version,
1184 type: PoolTypes.fixed,
1185 worker: WorkerTypes.thread,
1186 started: true,
1187 ready: true,
1188 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1189 minSize: expect.any(Number),
1190 maxSize: expect.any(Number),
1191 workerNodes: expect.any(Number),
1192 idleWorkerNodes: expect.any(Number),
1193 busyWorkerNodes: expect.any(Number),
1194 executedTasks: expect.any(Number),
1195 executingTasks: expect.any(Number),
1196 failedTasks: expect.any(Number)
1197 })
1198 await pool.destroy()
1199 })
1200
1201 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1202 const pool = new DynamicThreadPool(
1203 Math.floor(numberOfWorkers / 2),
1204 numberOfWorkers,
1205 './tests/worker-files/thread/testWorker.mjs'
1206 )
1207 expect(pool.emitter.eventNames()).toStrictEqual([])
1208 const promises = new Set()
1209 let poolFull = 0
1210 let poolInfo
1211 pool.emitter.on(PoolEvents.full, info => {
1212 ++poolFull
1213 poolInfo = info
1214 })
1215 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1216 for (let i = 0; i < numberOfWorkers * 2; i++) {
1217 promises.add(pool.execute())
1218 }
1219 await Promise.all(promises)
1220 expect(poolFull).toBe(1)
1221 expect(poolInfo).toStrictEqual({
1222 version,
1223 type: PoolTypes.dynamic,
1224 worker: WorkerTypes.thread,
1225 started: true,
1226 ready: true,
1227 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1228 minSize: expect.any(Number),
1229 maxSize: expect.any(Number),
1230 workerNodes: expect.any(Number),
1231 idleWorkerNodes: expect.any(Number),
1232 busyWorkerNodes: expect.any(Number),
1233 executedTasks: expect.any(Number),
1234 executingTasks: expect.any(Number),
1235 failedTasks: expect.any(Number)
1236 })
1237 await pool.destroy()
1238 })
1239
1240 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1241 const pool = new FixedThreadPool(
1242 numberOfWorkers,
1243 './tests/worker-files/thread/testWorker.mjs',
1244 {
1245 enableTasksQueue: true
1246 }
1247 )
1248 stub(pool, 'hasBackPressure').returns(true)
1249 expect(pool.emitter.eventNames()).toStrictEqual([])
1250 const promises = new Set()
1251 let poolBackPressure = 0
1252 let poolInfo
1253 pool.emitter.on(PoolEvents.backPressure, info => {
1254 ++poolBackPressure
1255 poolInfo = info
1256 })
1257 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1258 for (let i = 0; i < numberOfWorkers + 1; i++) {
1259 promises.add(pool.execute())
1260 }
1261 await Promise.all(promises)
1262 expect(poolBackPressure).toBe(1)
1263 expect(poolInfo).toStrictEqual({
1264 version,
1265 type: PoolTypes.fixed,
1266 worker: WorkerTypes.thread,
1267 started: true,
1268 ready: true,
1269 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1270 minSize: expect.any(Number),
1271 maxSize: expect.any(Number),
1272 workerNodes: expect.any(Number),
1273 idleWorkerNodes: expect.any(Number),
1274 stealingWorkerNodes: expect.any(Number),
1275 busyWorkerNodes: expect.any(Number),
1276 executedTasks: expect.any(Number),
1277 executingTasks: expect.any(Number),
1278 maxQueuedTasks: expect.any(Number),
1279 queuedTasks: expect.any(Number),
1280 backPressure: true,
1281 stolenTasks: expect.any(Number),
1282 failedTasks: expect.any(Number)
1283 })
1284 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1285 await pool.destroy()
1286 })
1287
1288 it('Verify that destroy() waits for queued tasks to finish', async () => {
1289 const tasksFinishedTimeout = 2500
1290 const pool = new FixedThreadPool(
1291 numberOfWorkers,
1292 './tests/worker-files/thread/asyncWorker.mjs',
1293 {
1294 enableTasksQueue: true,
1295 tasksQueueOptions: { tasksFinishedTimeout }
1296 }
1297 )
1298 const maxMultiplier = 4
1299 let tasksFinished = 0
1300 for (const workerNode of pool.workerNodes) {
1301 workerNode.on('taskFinished', () => {
1302 ++tasksFinished
1303 })
1304 }
1305 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1306 pool.execute()
1307 }
1308 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1309 const startTime = performance.now()
1310 await pool.destroy()
1311 const elapsedTime = performance.now() - startTime
1312 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1313 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1314 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 400)
1315 })
1316
1317 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1318 const tasksFinishedTimeout = 1000
1319 const pool = new FixedThreadPool(
1320 numberOfWorkers,
1321 './tests/worker-files/thread/asyncWorker.mjs',
1322 {
1323 enableTasksQueue: true,
1324 tasksQueueOptions: { tasksFinishedTimeout }
1325 }
1326 )
1327 const maxMultiplier = 4
1328 let tasksFinished = 0
1329 for (const workerNode of pool.workerNodes) {
1330 workerNode.on('taskFinished', () => {
1331 ++tasksFinished
1332 })
1333 }
1334 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1335 pool.execute()
1336 }
1337 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1338 const startTime = performance.now()
1339 await pool.destroy()
1340 const elapsedTime = performance.now() - startTime
1341 expect(tasksFinished).toBe(0)
1342 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1343 })
1344
1345 it('Verify that pool asynchronous resource track tasks execution', async () => {
1346 let taskAsyncId
1347 let initCalls = 0
1348 let beforeCalls = 0
1349 let afterCalls = 0
1350 let resolveCalls = 0
1351 const hook = createHook({
1352 init (asyncId, type) {
1353 if (type === 'poolifier:task') {
1354 initCalls++
1355 taskAsyncId = asyncId
1356 }
1357 },
1358 before (asyncId) {
1359 if (asyncId === taskAsyncId) beforeCalls++
1360 },
1361 after (asyncId) {
1362 if (asyncId === taskAsyncId) afterCalls++
1363 },
1364 promiseResolve () {
1365 if (executionAsyncId() === taskAsyncId) resolveCalls++
1366 }
1367 })
1368 const pool = new FixedThreadPool(
1369 numberOfWorkers,
1370 './tests/worker-files/thread/testWorker.mjs'
1371 )
1372 hook.enable()
1373 await pool.execute()
1374 hook.disable()
1375 expect(initCalls).toBe(1)
1376 expect(beforeCalls).toBe(1)
1377 expect(afterCalls).toBe(1)
1378 expect(resolveCalls).toBe(1)
1379 await pool.destroy()
1380 })
1381
1382 it('Verify that hasTaskFunction() is working', async () => {
1383 const dynamicThreadPool = new DynamicThreadPool(
1384 Math.floor(numberOfWorkers / 2),
1385 numberOfWorkers,
1386 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1387 )
1388 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1389 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1390 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1391 true
1392 )
1393 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1394 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1395 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1396 await dynamicThreadPool.destroy()
1397 const fixedClusterPool = new FixedClusterPool(
1398 numberOfWorkers,
1399 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1400 )
1401 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1402 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1403 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1404 true
1405 )
1406 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1407 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1408 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1409 await fixedClusterPool.destroy()
1410 })
1411
1412 it('Verify that addTaskFunction() is working', async () => {
1413 const dynamicThreadPool = new DynamicThreadPool(
1414 Math.floor(numberOfWorkers / 2),
1415 numberOfWorkers,
1416 './tests/worker-files/thread/testWorker.mjs'
1417 )
1418 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1419 await expect(
1420 dynamicThreadPool.addTaskFunction(0, () => {})
1421 ).rejects.toThrow(new TypeError('name argument must be a string'))
1422 await expect(
1423 dynamicThreadPool.addTaskFunction('', () => {})
1424 ).rejects.toThrow(
1425 new TypeError('name argument must not be an empty string')
1426 )
1427 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1428 new TypeError('fn argument must be a function')
1429 )
1430 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1431 new TypeError('fn argument must be a function')
1432 )
1433 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1434 DEFAULT_TASK_NAME,
1435 'test'
1436 ])
1437 const echoTaskFunction = data => {
1438 return data
1439 }
1440 await expect(
1441 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1442 ).resolves.toBe(true)
1443 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1444 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1445 echoTaskFunction
1446 )
1447 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1448 DEFAULT_TASK_NAME,
1449 'test',
1450 'echo'
1451 ])
1452 const taskFunctionData = { test: 'test' }
1453 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1454 expect(echoResult).toStrictEqual(taskFunctionData)
1455 for (const workerNode of dynamicThreadPool.workerNodes) {
1456 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1457 tasks: {
1458 executed: expect.any(Number),
1459 executing: 0,
1460 queued: 0,
1461 sequentiallyStolen: 0,
1462 stolen: 0,
1463 failed: 0
1464 },
1465 runTime: {
1466 history: new CircularArray()
1467 },
1468 waitTime: {
1469 history: new CircularArray()
1470 },
1471 elu: {
1472 idle: {
1473 history: new CircularArray()
1474 },
1475 active: {
1476 history: new CircularArray()
1477 }
1478 }
1479 })
1480 }
1481 await dynamicThreadPool.destroy()
1482 })
1483
1484 it('Verify that removeTaskFunction() is working', async () => {
1485 const dynamicThreadPool = new DynamicThreadPool(
1486 Math.floor(numberOfWorkers / 2),
1487 numberOfWorkers,
1488 './tests/worker-files/thread/testWorker.mjs'
1489 )
1490 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1491 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1492 DEFAULT_TASK_NAME,
1493 'test'
1494 ])
1495 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1496 new Error('Cannot remove a task function not handled on the pool side')
1497 )
1498 const echoTaskFunction = data => {
1499 return data
1500 }
1501 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1502 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1503 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1504 echoTaskFunction
1505 )
1506 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1507 DEFAULT_TASK_NAME,
1508 'test',
1509 'echo'
1510 ])
1511 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1512 true
1513 )
1514 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1515 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1516 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1517 DEFAULT_TASK_NAME,
1518 'test'
1519 ])
1520 await dynamicThreadPool.destroy()
1521 })
1522
1523 it('Verify that listTaskFunctionNames() is working', async () => {
1524 const dynamicThreadPool = new DynamicThreadPool(
1525 Math.floor(numberOfWorkers / 2),
1526 numberOfWorkers,
1527 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1528 )
1529 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1530 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1531 DEFAULT_TASK_NAME,
1532 'jsonIntegerSerialization',
1533 'factorial',
1534 'fibonacci'
1535 ])
1536 await dynamicThreadPool.destroy()
1537 const fixedClusterPool = new FixedClusterPool(
1538 numberOfWorkers,
1539 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1540 )
1541 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1542 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1543 DEFAULT_TASK_NAME,
1544 'jsonIntegerSerialization',
1545 'factorial',
1546 'fibonacci'
1547 ])
1548 await fixedClusterPool.destroy()
1549 })
1550
1551 it('Verify that setDefaultTaskFunction() is working', async () => {
1552 const dynamicThreadPool = new DynamicThreadPool(
1553 Math.floor(numberOfWorkers / 2),
1554 numberOfWorkers,
1555 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1556 )
1557 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1558 const workerId = dynamicThreadPool.workerNodes[0].info.id
1559 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1560 new Error(
1561 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1562 )
1563 )
1564 await expect(
1565 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1566 ).rejects.toThrow(
1567 new Error(
1568 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1569 )
1570 )
1571 await expect(
1572 dynamicThreadPool.setDefaultTaskFunction('unknown')
1573 ).rejects.toThrow(
1574 new Error(
1575 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1576 )
1577 )
1578 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1579 DEFAULT_TASK_NAME,
1580 'jsonIntegerSerialization',
1581 'factorial',
1582 'fibonacci'
1583 ])
1584 await expect(
1585 dynamicThreadPool.setDefaultTaskFunction('factorial')
1586 ).resolves.toBe(true)
1587 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1588 DEFAULT_TASK_NAME,
1589 'factorial',
1590 'jsonIntegerSerialization',
1591 'fibonacci'
1592 ])
1593 await expect(
1594 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1595 ).resolves.toBe(true)
1596 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1597 DEFAULT_TASK_NAME,
1598 'fibonacci',
1599 'jsonIntegerSerialization',
1600 'factorial'
1601 ])
1602 await dynamicThreadPool.destroy()
1603 })
1604
1605 it('Verify that multiple task functions worker is working', async () => {
1606 const pool = new DynamicClusterPool(
1607 Math.floor(numberOfWorkers / 2),
1608 numberOfWorkers,
1609 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1610 )
1611 const data = { n: 10 }
1612 const result0 = await pool.execute(data)
1613 expect(result0).toStrictEqual({ ok: 1 })
1614 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1615 expect(result1).toStrictEqual({ ok: 1 })
1616 const result2 = await pool.execute(data, 'factorial')
1617 expect(result2).toBe(3628800)
1618 const result3 = await pool.execute(data, 'fibonacci')
1619 expect(result3).toBe(55)
1620 expect(pool.info.executingTasks).toBe(0)
1621 expect(pool.info.executedTasks).toBe(4)
1622 for (const workerNode of pool.workerNodes) {
1623 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1624 DEFAULT_TASK_NAME,
1625 'jsonIntegerSerialization',
1626 'factorial',
1627 'fibonacci'
1628 ])
1629 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1630 for (const name of pool.listTaskFunctionNames()) {
1631 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1632 tasks: {
1633 executed: expect.any(Number),
1634 executing: 0,
1635 failed: 0,
1636 queued: 0,
1637 sequentiallyStolen: 0,
1638 stolen: 0
1639 },
1640 runTime: {
1641 history: expect.any(CircularArray)
1642 },
1643 waitTime: {
1644 history: expect.any(CircularArray)
1645 },
1646 elu: {
1647 idle: {
1648 history: expect.any(CircularArray)
1649 },
1650 active: {
1651 history: expect.any(CircularArray)
1652 }
1653 }
1654 })
1655 expect(
1656 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1657 ).toBeGreaterThan(0)
1658 }
1659 expect(
1660 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1661 ).toStrictEqual(
1662 workerNode.getTaskFunctionWorkerUsage(
1663 workerNode.info.taskFunctionNames[1]
1664 )
1665 )
1666 }
1667 await pool.destroy()
1668 })
1669
1670 it('Verify sendKillMessageToWorker()', async () => {
1671 const pool = new DynamicClusterPool(
1672 Math.floor(numberOfWorkers / 2),
1673 numberOfWorkers,
1674 './tests/worker-files/cluster/testWorker.js'
1675 )
1676 const workerNodeKey = 0
1677 await expect(
1678 pool.sendKillMessageToWorker(workerNodeKey)
1679 ).resolves.toBeUndefined()
1680 await pool.destroy()
1681 })
1682
1683 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1684 const pool = new DynamicClusterPool(
1685 Math.floor(numberOfWorkers / 2),
1686 numberOfWorkers,
1687 './tests/worker-files/cluster/testWorker.js'
1688 )
1689 const workerNodeKey = 0
1690 await expect(
1691 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1692 taskFunctionOperation: 'add',
1693 taskFunctionName: 'empty',
1694 taskFunction: (() => {}).toString()
1695 })
1696 ).resolves.toBe(true)
1697 expect(
1698 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1699 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1700 await pool.destroy()
1701 })
1702
1703 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1704 const pool = new DynamicClusterPool(
1705 Math.floor(numberOfWorkers / 2),
1706 numberOfWorkers,
1707 './tests/worker-files/cluster/testWorker.js'
1708 )
1709 await expect(
1710 pool.sendTaskFunctionOperationToWorkers({
1711 taskFunctionOperation: 'add',
1712 taskFunctionName: 'empty',
1713 taskFunction: (() => {}).toString()
1714 })
1715 ).resolves.toBe(true)
1716 for (const workerNode of pool.workerNodes) {
1717 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1718 DEFAULT_TASK_NAME,
1719 'test',
1720 'empty'
1721 ])
1722 }
1723 await pool.destroy()
1724 })
1725 })