fix: fix worker weights handling
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
8 import {
9 DynamicClusterPool,
10 DynamicThreadPool,
11 FixedClusterPool,
12 FixedThreadPool,
13 PoolEvents,
14 PoolTypes,
15 WorkerChoiceStrategies,
16 WorkerTypes
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
23
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
26 readFileSync(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
28 'utf8'
29 )
30 ).version
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
33 isMain () {
34 return false
35 }
36 }
37
38 afterEach(() => {
39 restore()
40 })
41
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
52 expect(
53 () =>
54 new StubPoolWithIsMain(
55 numberOfWorkers,
56 './tests/worker-files/thread/testWorker.mjs',
57 {
58 errorHandler: e => console.error(e)
59 }
60 )
61 ).toThrow(
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
65 )
66 })
67
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
71 './tests/worker-files/thread/testWorker.mjs'
72 )
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
76 await pool.destroy()
77 })
78
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
96 './tests/worker-files/thread/testWorker.mjs'
97 )
98 ).toThrow(
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
109 ).toThrow(
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
120 ).toThrow(
121 new TypeError(
122 'Cannot instantiate a pool with a non safe integer number of workers'
123 )
124 )
125 })
126
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
143 it('Verify that dynamic pool sizing is checked', () => {
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.js'
150 )
151 ).toThrow(
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
161 './tests/worker-files/thread/testWorker.mjs'
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
173 './tests/worker-files/cluster/testWorker.js'
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
180 expect(
181 () =>
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
202 )
203 )
204 expect(
205 () =>
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.js'
210 )
211 ).toThrow(
212 new RangeError(
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
214 )
215 )
216 })
217
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
221 './tests/worker-files/thread/testWorker.mjs'
222 )
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
230 })
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
232 retries:
233 pool.info.maxSize +
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
242 })
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
247 retries:
248 pool.info.maxSize +
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
253 // weights: expect.objectContaining({
254 // 0: expect.any(Number),
255 // [pool.info.maxSize - 1]: expect.any(Number)
256 // })
257 })
258 )
259 }
260 await pool.destroy()
261 const testHandler = () => console.info('test handler executed')
262 pool = new FixedThreadPool(
263 numberOfWorkers,
264 './tests/worker-files/thread/testWorker.mjs',
265 {
266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
267 workerChoiceStrategyOptions: {
268 runTime: { median: true },
269 weights: { 0: 300, 1: 200 }
270 },
271 enableEvents: false,
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
274 tasksQueueOptions: { concurrency: 2 },
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 onlineHandler: testHandler,
278 exitHandler: testHandler
279 }
280 )
281 expect(pool.emitter).toBeUndefined()
282 expect(pool.opts).toStrictEqual({
283 startWorkers: true,
284 enableEvents: false,
285 restartWorkerOnError: false,
286 enableTasksQueue: true,
287 tasksQueueOptions: {
288 concurrency: 2,
289 size: Math.pow(numberOfWorkers, 2),
290 taskStealing: true,
291 tasksStealingOnBackPressure: true,
292 tasksFinishedTimeout: 2000
293 },
294 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
295 workerChoiceStrategyOptions: {
296 runTime: { median: true },
297 weights: { 0: 300, 1: 200 }
298 },
299 onlineHandler: testHandler,
300 messageHandler: testHandler,
301 errorHandler: testHandler,
302 exitHandler: testHandler
303 })
304 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
305 retries:
306 pool.info.maxSize +
307 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
308 runTime: { median: true },
309 waitTime: { median: false },
310 elu: { median: false },
311 weights: { 0: 300, 1: 200 }
312 })
313 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
314 .workerChoiceStrategies) {
315 expect(workerChoiceStrategy.opts).toStrictEqual({
316 retries:
317 pool.info.maxSize +
318 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
319 runTime: { median: true },
320 waitTime: { median: false },
321 elu: { median: false },
322 weights: { 0: 300, 1: 200 }
323 })
324 }
325 await pool.destroy()
326 })
327
328 it('Verify that pool options are validated', () => {
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
333 './tests/worker-files/thread/testWorker.mjs',
334 {
335 workerChoiceStrategy: 'invalidStrategy'
336 }
337 )
338 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
339 expect(
340 () =>
341 new FixedThreadPool(
342 numberOfWorkers,
343 './tests/worker-files/thread/testWorker.mjs',
344 {
345 workerChoiceStrategyOptions: { weights: {} }
346 }
347 )
348 ).toThrow(
349 new Error(
350 'Invalid worker choice strategy options: must have a weight for each worker node'
351 )
352 )
353 expect(
354 () =>
355 new FixedThreadPool(
356 numberOfWorkers,
357 './tests/worker-files/thread/testWorker.mjs',
358 {
359 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
360 }
361 )
362 ).toThrow(
363 new Error(
364 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
365 )
366 )
367 expect(
368 () =>
369 new FixedThreadPool(
370 numberOfWorkers,
371 './tests/worker-files/thread/testWorker.mjs',
372 {
373 enableTasksQueue: true,
374 tasksQueueOptions: 'invalidTasksQueueOptions'
375 }
376 )
377 ).toThrow(
378 new TypeError('Invalid tasks queue options: must be a plain object')
379 )
380 expect(
381 () =>
382 new FixedThreadPool(
383 numberOfWorkers,
384 './tests/worker-files/thread/testWorker.mjs',
385 {
386 enableTasksQueue: true,
387 tasksQueueOptions: { concurrency: 0 }
388 }
389 )
390 ).toThrow(
391 new RangeError(
392 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
393 )
394 )
395 expect(
396 () =>
397 new FixedThreadPool(
398 numberOfWorkers,
399 './tests/worker-files/thread/testWorker.mjs',
400 {
401 enableTasksQueue: true,
402 tasksQueueOptions: { concurrency: -1 }
403 }
404 )
405 ).toThrow(
406 new RangeError(
407 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
408 )
409 )
410 expect(
411 () =>
412 new FixedThreadPool(
413 numberOfWorkers,
414 './tests/worker-files/thread/testWorker.mjs',
415 {
416 enableTasksQueue: true,
417 tasksQueueOptions: { concurrency: 0.2 }
418 }
419 )
420 ).toThrow(
421 new TypeError('Invalid worker node tasks concurrency: must be an integer')
422 )
423 expect(
424 () =>
425 new FixedThreadPool(
426 numberOfWorkers,
427 './tests/worker-files/thread/testWorker.mjs',
428 {
429 enableTasksQueue: true,
430 tasksQueueOptions: { size: 0 }
431 }
432 )
433 ).toThrow(
434 new RangeError(
435 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
436 )
437 )
438 expect(
439 () =>
440 new FixedThreadPool(
441 numberOfWorkers,
442 './tests/worker-files/thread/testWorker.mjs',
443 {
444 enableTasksQueue: true,
445 tasksQueueOptions: { size: -1 }
446 }
447 )
448 ).toThrow(
449 new RangeError(
450 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
451 )
452 )
453 expect(
454 () =>
455 new FixedThreadPool(
456 numberOfWorkers,
457 './tests/worker-files/thread/testWorker.mjs',
458 {
459 enableTasksQueue: true,
460 tasksQueueOptions: { size: 0.2 }
461 }
462 )
463 ).toThrow(
464 new TypeError('Invalid worker node tasks queue size: must be an integer')
465 )
466 })
467
468 it('Verify that pool worker choice strategy options can be set', async () => {
469 const pool = new FixedThreadPool(
470 numberOfWorkers,
471 './tests/worker-files/thread/testWorker.mjs',
472 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
473 )
474 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
475 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
476 retries:
477 pool.info.maxSize +
478 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
479 runTime: { median: false },
480 waitTime: { median: false },
481 elu: { median: false },
482 weights: expect.objectContaining({
483 0: expect.any(Number),
484 [pool.info.maxSize - 1]: expect.any(Number)
485 })
486 })
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
489 expect(workerChoiceStrategy.opts).toStrictEqual(
490 expect.objectContaining({
491 retries:
492 pool.info.maxSize +
493 Object.keys(workerChoiceStrategy.opts.weights).length,
494 runTime: { median: false },
495 waitTime: { median: false },
496 elu: { median: false }
497 // weights: expect.objectContaining({
498 // 0: expect.any(Number),
499 // [pool.info.maxSize - 1]: expect.any(Number)
500 // })
501 })
502 )
503 }
504 expect(
505 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
506 ).toStrictEqual({
507 runTime: {
508 aggregate: true,
509 average: true,
510 median: false
511 },
512 waitTime: {
513 aggregate: false,
514 average: false,
515 median: false
516 },
517 elu: {
518 aggregate: true,
519 average: true,
520 median: false
521 }
522 })
523 pool.setWorkerChoiceStrategyOptions({
524 runTime: { median: true },
525 elu: { median: true }
526 })
527 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
528 runTime: { median: true },
529 elu: { median: true }
530 })
531 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
532 retries:
533 pool.info.maxSize +
534 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
535 runTime: { median: true },
536 waitTime: { median: false },
537 elu: { median: true },
538 weights: expect.objectContaining({
539 0: expect.any(Number),
540 [pool.info.maxSize - 1]: expect.any(Number)
541 })
542 })
543 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
544 .workerChoiceStrategies) {
545 expect(workerChoiceStrategy.opts).toStrictEqual(
546 expect.objectContaining({
547 retries:
548 pool.info.maxSize +
549 Object.keys(workerChoiceStrategy.opts.weights).length,
550 runTime: { median: true },
551 waitTime: { median: false },
552 elu: { median: true }
553 // weights: expect.objectContaining({
554 // 0: expect.any(Number),
555 // [pool.info.maxSize - 1]: expect.any(Number)
556 // })
557 })
558 )
559 }
560 expect(
561 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
562 ).toStrictEqual({
563 runTime: {
564 aggregate: true,
565 average: false,
566 median: true
567 },
568 waitTime: {
569 aggregate: false,
570 average: false,
571 median: false
572 },
573 elu: {
574 aggregate: true,
575 average: false,
576 median: true
577 }
578 })
579 pool.setWorkerChoiceStrategyOptions({
580 runTime: { median: false },
581 elu: { median: false }
582 })
583 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
584 runTime: { median: false },
585 elu: { median: false }
586 })
587 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
588 retries:
589 pool.info.maxSize +
590 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
591 runTime: { median: false },
592 waitTime: { median: false },
593 elu: { median: false },
594 weights: expect.objectContaining({
595 0: expect.any(Number),
596 [pool.info.maxSize - 1]: expect.any(Number)
597 })
598 })
599 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
600 .workerChoiceStrategies) {
601 expect(workerChoiceStrategy.opts).toStrictEqual(
602 expect.objectContaining({
603 retries:
604 pool.info.maxSize +
605 Object.keys(workerChoiceStrategy.opts.weights).length,
606 runTime: { median: false },
607 waitTime: { median: false },
608 elu: { median: false }
609 // weights: expect.objectContaining({
610 // 0: expect.any(Number),
611 // [pool.info.maxSize - 1]: expect.any(Number)
612 // })
613 })
614 )
615 }
616 expect(
617 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
618 ).toStrictEqual({
619 runTime: {
620 aggregate: true,
621 average: true,
622 median: false
623 },
624 waitTime: {
625 aggregate: false,
626 average: false,
627 median: false
628 },
629 elu: {
630 aggregate: true,
631 average: true,
632 median: false
633 }
634 })
635 expect(() =>
636 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
637 ).toThrow(
638 new TypeError(
639 'Invalid worker choice strategy options: must be a plain object'
640 )
641 )
642 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
643 new Error(
644 'Invalid worker choice strategy options: must have a weight for each worker node'
645 )
646 )
647 expect(() =>
648 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
649 ).toThrow(
650 new Error(
651 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
652 )
653 )
654 await pool.destroy()
655 })
656
657 it('Verify that pool tasks queue can be enabled/disabled', async () => {
658 const pool = new FixedThreadPool(
659 numberOfWorkers,
660 './tests/worker-files/thread/testWorker.mjs'
661 )
662 expect(pool.opts.enableTasksQueue).toBe(false)
663 expect(pool.opts.tasksQueueOptions).toBeUndefined()
664 pool.enableTasksQueue(true)
665 expect(pool.opts.enableTasksQueue).toBe(true)
666 expect(pool.opts.tasksQueueOptions).toStrictEqual({
667 concurrency: 1,
668 size: Math.pow(numberOfWorkers, 2),
669 taskStealing: true,
670 tasksStealingOnBackPressure: true,
671 tasksFinishedTimeout: 2000
672 })
673 pool.enableTasksQueue(true, { concurrency: 2 })
674 expect(pool.opts.enableTasksQueue).toBe(true)
675 expect(pool.opts.tasksQueueOptions).toStrictEqual({
676 concurrency: 2,
677 size: Math.pow(numberOfWorkers, 2),
678 taskStealing: true,
679 tasksStealingOnBackPressure: true,
680 tasksFinishedTimeout: 2000
681 })
682 pool.enableTasksQueue(false)
683 expect(pool.opts.enableTasksQueue).toBe(false)
684 expect(pool.opts.tasksQueueOptions).toBeUndefined()
685 await pool.destroy()
686 })
687
688 it('Verify that pool tasks queue options can be set', async () => {
689 const pool = new FixedThreadPool(
690 numberOfWorkers,
691 './tests/worker-files/thread/testWorker.mjs',
692 { enableTasksQueue: true }
693 )
694 expect(pool.opts.tasksQueueOptions).toStrictEqual({
695 concurrency: 1,
696 size: Math.pow(numberOfWorkers, 2),
697 taskStealing: true,
698 tasksStealingOnBackPressure: true,
699 tasksFinishedTimeout: 2000
700 })
701 for (const workerNode of pool.workerNodes) {
702 expect(workerNode.tasksQueueBackPressureSize).toBe(
703 pool.opts.tasksQueueOptions.size
704 )
705 }
706 pool.setTasksQueueOptions({
707 concurrency: 2,
708 size: 2,
709 taskStealing: false,
710 tasksStealingOnBackPressure: false,
711 tasksFinishedTimeout: 3000
712 })
713 expect(pool.opts.tasksQueueOptions).toStrictEqual({
714 concurrency: 2,
715 size: 2,
716 taskStealing: false,
717 tasksStealingOnBackPressure: false,
718 tasksFinishedTimeout: 3000
719 })
720 for (const workerNode of pool.workerNodes) {
721 expect(workerNode.tasksQueueBackPressureSize).toBe(
722 pool.opts.tasksQueueOptions.size
723 )
724 }
725 pool.setTasksQueueOptions({
726 concurrency: 1,
727 taskStealing: true,
728 tasksStealingOnBackPressure: true
729 })
730 expect(pool.opts.tasksQueueOptions).toStrictEqual({
731 concurrency: 1,
732 size: Math.pow(numberOfWorkers, 2),
733 taskStealing: true,
734 tasksStealingOnBackPressure: true,
735 tasksFinishedTimeout: 2000
736 })
737 for (const workerNode of pool.workerNodes) {
738 expect(workerNode.tasksQueueBackPressureSize).toBe(
739 pool.opts.tasksQueueOptions.size
740 )
741 }
742 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
743 new TypeError('Invalid tasks queue options: must be a plain object')
744 )
745 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
746 new RangeError(
747 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
748 )
749 )
750 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
751 new RangeError(
752 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
753 )
754 )
755 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
756 new TypeError('Invalid worker node tasks concurrency: must be an integer')
757 )
758 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
759 new RangeError(
760 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
761 )
762 )
763 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
764 new RangeError(
765 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
766 )
767 )
768 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
769 new TypeError('Invalid worker node tasks queue size: must be an integer')
770 )
771 await pool.destroy()
772 })
773
774 it('Verify that pool info is set', async () => {
775 let pool = new FixedThreadPool(
776 numberOfWorkers,
777 './tests/worker-files/thread/testWorker.mjs'
778 )
779 expect(pool.info).toStrictEqual({
780 version,
781 type: PoolTypes.fixed,
782 worker: WorkerTypes.thread,
783 started: true,
784 ready: true,
785 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
786 minSize: numberOfWorkers,
787 maxSize: numberOfWorkers,
788 workerNodes: numberOfWorkers,
789 idleWorkerNodes: numberOfWorkers,
790 busyWorkerNodes: 0,
791 executedTasks: 0,
792 executingTasks: 0,
793 failedTasks: 0
794 })
795 await pool.destroy()
796 pool = new DynamicClusterPool(
797 Math.floor(numberOfWorkers / 2),
798 numberOfWorkers,
799 './tests/worker-files/cluster/testWorker.js'
800 )
801 expect(pool.info).toStrictEqual({
802 version,
803 type: PoolTypes.dynamic,
804 worker: WorkerTypes.cluster,
805 started: true,
806 ready: true,
807 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
808 minSize: Math.floor(numberOfWorkers / 2),
809 maxSize: numberOfWorkers,
810 workerNodes: Math.floor(numberOfWorkers / 2),
811 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
812 busyWorkerNodes: 0,
813 executedTasks: 0,
814 executingTasks: 0,
815 failedTasks: 0
816 })
817 await pool.destroy()
818 })
819
820 it('Verify that pool worker tasks usage are initialized', async () => {
821 const pool = new FixedClusterPool(
822 numberOfWorkers,
823 './tests/worker-files/cluster/testWorker.js'
824 )
825 for (const workerNode of pool.workerNodes) {
826 expect(workerNode).toBeInstanceOf(WorkerNode)
827 expect(workerNode.usage).toStrictEqual({
828 tasks: {
829 executed: 0,
830 executing: 0,
831 queued: 0,
832 maxQueued: 0,
833 sequentiallyStolen: 0,
834 stolen: 0,
835 failed: 0
836 },
837 runTime: {
838 history: new CircularArray()
839 },
840 waitTime: {
841 history: new CircularArray()
842 },
843 elu: {
844 idle: {
845 history: new CircularArray()
846 },
847 active: {
848 history: new CircularArray()
849 }
850 }
851 })
852 }
853 await pool.destroy()
854 })
855
856 it('Verify that pool worker tasks queue are initialized', async () => {
857 let pool = new FixedClusterPool(
858 numberOfWorkers,
859 './tests/worker-files/cluster/testWorker.js'
860 )
861 for (const workerNode of pool.workerNodes) {
862 expect(workerNode).toBeInstanceOf(WorkerNode)
863 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
864 expect(workerNode.tasksQueue.size).toBe(0)
865 expect(workerNode.tasksQueue.maxSize).toBe(0)
866 }
867 await pool.destroy()
868 pool = new DynamicThreadPool(
869 Math.floor(numberOfWorkers / 2),
870 numberOfWorkers,
871 './tests/worker-files/thread/testWorker.mjs'
872 )
873 for (const workerNode of pool.workerNodes) {
874 expect(workerNode).toBeInstanceOf(WorkerNode)
875 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
876 expect(workerNode.tasksQueue.size).toBe(0)
877 expect(workerNode.tasksQueue.maxSize).toBe(0)
878 }
879 await pool.destroy()
880 })
881
882 it('Verify that pool worker info are initialized', async () => {
883 let pool = new FixedClusterPool(
884 numberOfWorkers,
885 './tests/worker-files/cluster/testWorker.js'
886 )
887 for (const workerNode of pool.workerNodes) {
888 expect(workerNode).toBeInstanceOf(WorkerNode)
889 expect(workerNode.info).toStrictEqual({
890 id: expect.any(Number),
891 type: WorkerTypes.cluster,
892 dynamic: false,
893 ready: true
894 })
895 }
896 await pool.destroy()
897 pool = new DynamicThreadPool(
898 Math.floor(numberOfWorkers / 2),
899 numberOfWorkers,
900 './tests/worker-files/thread/testWorker.mjs'
901 )
902 for (const workerNode of pool.workerNodes) {
903 expect(workerNode).toBeInstanceOf(WorkerNode)
904 expect(workerNode.info).toStrictEqual({
905 id: expect.any(Number),
906 type: WorkerTypes.thread,
907 dynamic: false,
908 ready: true
909 })
910 }
911 await pool.destroy()
912 })
913
914 it('Verify that pool statuses are checked at start or destroy', async () => {
915 const pool = new FixedThreadPool(
916 numberOfWorkers,
917 './tests/worker-files/thread/testWorker.mjs'
918 )
919 expect(pool.info.started).toBe(true)
920 expect(pool.info.ready).toBe(true)
921 expect(() => pool.start()).toThrow(
922 new Error('Cannot start an already started pool')
923 )
924 await pool.destroy()
925 expect(pool.info.started).toBe(false)
926 expect(pool.info.ready).toBe(false)
927 await expect(pool.destroy()).rejects.toThrow(
928 new Error('Cannot destroy an already destroyed pool')
929 )
930 })
931
932 it('Verify that pool can be started after initialization', async () => {
933 const pool = new FixedClusterPool(
934 numberOfWorkers,
935 './tests/worker-files/cluster/testWorker.js',
936 {
937 startWorkers: false
938 }
939 )
940 expect(pool.info.started).toBe(false)
941 expect(pool.info.ready).toBe(false)
942 expect(pool.readyEventEmitted).toBe(false)
943 expect(pool.workerNodes).toStrictEqual([])
944 await expect(pool.execute()).rejects.toThrow(
945 new Error('Cannot execute a task on not started pool')
946 )
947 pool.start()
948 expect(pool.info.started).toBe(true)
949 expect(pool.info.ready).toBe(true)
950 await waitPoolEvents(pool, PoolEvents.ready, 1)
951 expect(pool.readyEventEmitted).toBe(true)
952 expect(pool.workerNodes.length).toBe(numberOfWorkers)
953 for (const workerNode of pool.workerNodes) {
954 expect(workerNode).toBeInstanceOf(WorkerNode)
955 }
956 await pool.destroy()
957 })
958
959 it('Verify that pool execute() arguments are checked', async () => {
960 const pool = new FixedClusterPool(
961 numberOfWorkers,
962 './tests/worker-files/cluster/testWorker.js'
963 )
964 await expect(pool.execute(undefined, 0)).rejects.toThrow(
965 new TypeError('name argument must be a string')
966 )
967 await expect(pool.execute(undefined, '')).rejects.toThrow(
968 new TypeError('name argument must not be an empty string')
969 )
970 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
971 new TypeError('transferList argument must be an array')
972 )
973 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
974 "Task function 'unknown' not found"
975 )
976 await pool.destroy()
977 await expect(pool.execute()).rejects.toThrow(
978 new Error('Cannot execute a task on not started pool')
979 )
980 })
981
982 it('Verify that pool worker tasks usage are computed', async () => {
983 const pool = new FixedClusterPool(
984 numberOfWorkers,
985 './tests/worker-files/cluster/testWorker.js'
986 )
987 const promises = new Set()
988 const maxMultiplier = 2
989 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
990 promises.add(pool.execute())
991 }
992 for (const workerNode of pool.workerNodes) {
993 expect(workerNode.usage).toStrictEqual({
994 tasks: {
995 executed: 0,
996 executing: maxMultiplier,
997 queued: 0,
998 maxQueued: 0,
999 sequentiallyStolen: 0,
1000 stolen: 0,
1001 failed: 0
1002 },
1003 runTime: {
1004 history: expect.any(CircularArray)
1005 },
1006 waitTime: {
1007 history: expect.any(CircularArray)
1008 },
1009 elu: {
1010 idle: {
1011 history: expect.any(CircularArray)
1012 },
1013 active: {
1014 history: expect.any(CircularArray)
1015 }
1016 }
1017 })
1018 }
1019 await Promise.all(promises)
1020 for (const workerNode of pool.workerNodes) {
1021 expect(workerNode.usage).toStrictEqual({
1022 tasks: {
1023 executed: maxMultiplier,
1024 executing: 0,
1025 queued: 0,
1026 maxQueued: 0,
1027 sequentiallyStolen: 0,
1028 stolen: 0,
1029 failed: 0
1030 },
1031 runTime: {
1032 history: expect.any(CircularArray)
1033 },
1034 waitTime: {
1035 history: expect.any(CircularArray)
1036 },
1037 elu: {
1038 idle: {
1039 history: expect.any(CircularArray)
1040 },
1041 active: {
1042 history: expect.any(CircularArray)
1043 }
1044 }
1045 })
1046 }
1047 await pool.destroy()
1048 })
1049
1050 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1051 const pool = new DynamicThreadPool(
1052 Math.floor(numberOfWorkers / 2),
1053 numberOfWorkers,
1054 './tests/worker-files/thread/testWorker.mjs'
1055 )
1056 const promises = new Set()
1057 const maxMultiplier = 2
1058 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1059 promises.add(pool.execute())
1060 }
1061 await Promise.all(promises)
1062 for (const workerNode of pool.workerNodes) {
1063 expect(workerNode.usage).toStrictEqual({
1064 tasks: {
1065 executed: expect.any(Number),
1066 executing: 0,
1067 queued: 0,
1068 maxQueued: 0,
1069 sequentiallyStolen: 0,
1070 stolen: 0,
1071 failed: 0
1072 },
1073 runTime: {
1074 history: expect.any(CircularArray)
1075 },
1076 waitTime: {
1077 history: expect.any(CircularArray)
1078 },
1079 elu: {
1080 idle: {
1081 history: expect.any(CircularArray)
1082 },
1083 active: {
1084 history: expect.any(CircularArray)
1085 }
1086 }
1087 })
1088 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1089 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1090 numberOfWorkers * maxMultiplier
1091 )
1092 expect(workerNode.usage.runTime.history.length).toBe(0)
1093 expect(workerNode.usage.waitTime.history.length).toBe(0)
1094 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1095 expect(workerNode.usage.elu.active.history.length).toBe(0)
1096 }
1097 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1098 for (const workerNode of pool.workerNodes) {
1099 expect(workerNode.usage).toStrictEqual({
1100 tasks: {
1101 executed: 0,
1102 executing: 0,
1103 queued: 0,
1104 maxQueued: 0,
1105 sequentiallyStolen: 0,
1106 stolen: 0,
1107 failed: 0
1108 },
1109 runTime: {
1110 history: expect.any(CircularArray)
1111 },
1112 waitTime: {
1113 history: expect.any(CircularArray)
1114 },
1115 elu: {
1116 idle: {
1117 history: expect.any(CircularArray)
1118 },
1119 active: {
1120 history: expect.any(CircularArray)
1121 }
1122 }
1123 })
1124 expect(workerNode.usage.runTime.history.length).toBe(0)
1125 expect(workerNode.usage.waitTime.history.length).toBe(0)
1126 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1127 expect(workerNode.usage.elu.active.history.length).toBe(0)
1128 }
1129 await pool.destroy()
1130 })
1131
1132 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1133 const pool = new DynamicClusterPool(
1134 Math.floor(numberOfWorkers / 2),
1135 numberOfWorkers,
1136 './tests/worker-files/cluster/testWorker.js'
1137 )
1138 expect(pool.emitter.eventNames()).toStrictEqual([])
1139 let poolInfo
1140 let poolReady = 0
1141 pool.emitter.on(PoolEvents.ready, info => {
1142 ++poolReady
1143 poolInfo = info
1144 })
1145 await waitPoolEvents(pool, PoolEvents.ready, 1)
1146 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1147 expect(poolReady).toBe(1)
1148 expect(poolInfo).toStrictEqual({
1149 version,
1150 type: PoolTypes.dynamic,
1151 worker: WorkerTypes.cluster,
1152 started: true,
1153 ready: true,
1154 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1155 minSize: expect.any(Number),
1156 maxSize: expect.any(Number),
1157 workerNodes: expect.any(Number),
1158 idleWorkerNodes: expect.any(Number),
1159 busyWorkerNodes: expect.any(Number),
1160 executedTasks: expect.any(Number),
1161 executingTasks: expect.any(Number),
1162 failedTasks: expect.any(Number)
1163 })
1164 await pool.destroy()
1165 })
1166
1167 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1168 const pool = new FixedThreadPool(
1169 numberOfWorkers,
1170 './tests/worker-files/thread/testWorker.mjs'
1171 )
1172 expect(pool.emitter.eventNames()).toStrictEqual([])
1173 const promises = new Set()
1174 let poolBusy = 0
1175 let poolInfo
1176 pool.emitter.on(PoolEvents.busy, info => {
1177 ++poolBusy
1178 poolInfo = info
1179 })
1180 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1181 for (let i = 0; i < numberOfWorkers * 2; i++) {
1182 promises.add(pool.execute())
1183 }
1184 await Promise.all(promises)
1185 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1186 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1187 expect(poolBusy).toBe(numberOfWorkers + 1)
1188 expect(poolInfo).toStrictEqual({
1189 version,
1190 type: PoolTypes.fixed,
1191 worker: WorkerTypes.thread,
1192 started: true,
1193 ready: true,
1194 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1195 minSize: expect.any(Number),
1196 maxSize: expect.any(Number),
1197 workerNodes: expect.any(Number),
1198 idleWorkerNodes: expect.any(Number),
1199 busyWorkerNodes: expect.any(Number),
1200 executedTasks: expect.any(Number),
1201 executingTasks: expect.any(Number),
1202 failedTasks: expect.any(Number)
1203 })
1204 await pool.destroy()
1205 })
1206
1207 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1208 const pool = new DynamicThreadPool(
1209 Math.floor(numberOfWorkers / 2),
1210 numberOfWorkers,
1211 './tests/worker-files/thread/testWorker.mjs'
1212 )
1213 expect(pool.emitter.eventNames()).toStrictEqual([])
1214 const promises = new Set()
1215 let poolFull = 0
1216 let poolInfo
1217 pool.emitter.on(PoolEvents.full, info => {
1218 ++poolFull
1219 poolInfo = info
1220 })
1221 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1222 for (let i = 0; i < numberOfWorkers * 2; i++) {
1223 promises.add(pool.execute())
1224 }
1225 await Promise.all(promises)
1226 expect(poolFull).toBe(1)
1227 expect(poolInfo).toStrictEqual({
1228 version,
1229 type: PoolTypes.dynamic,
1230 worker: WorkerTypes.thread,
1231 started: true,
1232 ready: true,
1233 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1234 minSize: expect.any(Number),
1235 maxSize: expect.any(Number),
1236 workerNodes: expect.any(Number),
1237 idleWorkerNodes: expect.any(Number),
1238 busyWorkerNodes: expect.any(Number),
1239 executedTasks: expect.any(Number),
1240 executingTasks: expect.any(Number),
1241 failedTasks: expect.any(Number)
1242 })
1243 await pool.destroy()
1244 })
1245
1246 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1247 const pool = new FixedThreadPool(
1248 numberOfWorkers,
1249 './tests/worker-files/thread/testWorker.mjs',
1250 {
1251 enableTasksQueue: true
1252 }
1253 )
1254 stub(pool, 'hasBackPressure').returns(true)
1255 expect(pool.emitter.eventNames()).toStrictEqual([])
1256 const promises = new Set()
1257 let poolBackPressure = 0
1258 let poolInfo
1259 pool.emitter.on(PoolEvents.backPressure, info => {
1260 ++poolBackPressure
1261 poolInfo = info
1262 })
1263 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1264 for (let i = 0; i < numberOfWorkers + 1; i++) {
1265 promises.add(pool.execute())
1266 }
1267 await Promise.all(promises)
1268 expect(poolBackPressure).toBe(1)
1269 expect(poolInfo).toStrictEqual({
1270 version,
1271 type: PoolTypes.fixed,
1272 worker: WorkerTypes.thread,
1273 started: true,
1274 ready: true,
1275 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1276 minSize: expect.any(Number),
1277 maxSize: expect.any(Number),
1278 workerNodes: expect.any(Number),
1279 idleWorkerNodes: expect.any(Number),
1280 busyWorkerNodes: expect.any(Number),
1281 executedTasks: expect.any(Number),
1282 executingTasks: expect.any(Number),
1283 maxQueuedTasks: expect.any(Number),
1284 queuedTasks: expect.any(Number),
1285 backPressure: true,
1286 stolenTasks: expect.any(Number),
1287 failedTasks: expect.any(Number)
1288 })
1289 expect(pool.hasBackPressure.callCount).toBe(5)
1290 await pool.destroy()
1291 })
1292
1293 it('Verify that destroy() waits for queued tasks to finish', async () => {
1294 const tasksFinishedTimeout = 2500
1295 const pool = new FixedThreadPool(
1296 numberOfWorkers,
1297 './tests/worker-files/thread/asyncWorker.mjs',
1298 {
1299 enableTasksQueue: true,
1300 tasksQueueOptions: { tasksFinishedTimeout }
1301 }
1302 )
1303 const maxMultiplier = 4
1304 let tasksFinished = 0
1305 for (const workerNode of pool.workerNodes) {
1306 workerNode.on('taskFinished', () => {
1307 ++tasksFinished
1308 })
1309 }
1310 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1311 pool.execute()
1312 }
1313 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1314 const startTime = performance.now()
1315 await pool.destroy()
1316 const elapsedTime = performance.now() - startTime
1317 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1318 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1319 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1320 })
1321
1322 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1323 const tasksFinishedTimeout = 1000
1324 const pool = new FixedThreadPool(
1325 numberOfWorkers,
1326 './tests/worker-files/thread/asyncWorker.mjs',
1327 {
1328 enableTasksQueue: true,
1329 tasksQueueOptions: { tasksFinishedTimeout }
1330 }
1331 )
1332 const maxMultiplier = 4
1333 let tasksFinished = 0
1334 for (const workerNode of pool.workerNodes) {
1335 workerNode.on('taskFinished', () => {
1336 ++tasksFinished
1337 })
1338 }
1339 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1340 pool.execute()
1341 }
1342 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1343 const startTime = performance.now()
1344 await pool.destroy()
1345 const elapsedTime = performance.now() - startTime
1346 expect(tasksFinished).toBe(0)
1347 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1348 })
1349
1350 it('Verify that pool asynchronous resource track tasks execution', async () => {
1351 let taskAsyncId
1352 let initCalls = 0
1353 let beforeCalls = 0
1354 let afterCalls = 0
1355 let resolveCalls = 0
1356 const hook = createHook({
1357 init (asyncId, type) {
1358 if (type === 'poolifier:task') {
1359 initCalls++
1360 taskAsyncId = asyncId
1361 }
1362 },
1363 before (asyncId) {
1364 if (asyncId === taskAsyncId) beforeCalls++
1365 },
1366 after (asyncId) {
1367 if (asyncId === taskAsyncId) afterCalls++
1368 },
1369 promiseResolve () {
1370 if (executionAsyncId() === taskAsyncId) resolveCalls++
1371 }
1372 })
1373 const pool = new FixedThreadPool(
1374 numberOfWorkers,
1375 './tests/worker-files/thread/testWorker.mjs'
1376 )
1377 hook.enable()
1378 await pool.execute()
1379 hook.disable()
1380 expect(initCalls).toBe(1)
1381 expect(beforeCalls).toBe(1)
1382 expect(afterCalls).toBe(1)
1383 expect(resolveCalls).toBe(1)
1384 await pool.destroy()
1385 })
1386
1387 it('Verify that hasTaskFunction() is working', async () => {
1388 const dynamicThreadPool = new DynamicThreadPool(
1389 Math.floor(numberOfWorkers / 2),
1390 numberOfWorkers,
1391 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1392 )
1393 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1394 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1395 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1396 true
1397 )
1398 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1399 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1400 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1401 await dynamicThreadPool.destroy()
1402 const fixedClusterPool = new FixedClusterPool(
1403 numberOfWorkers,
1404 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1405 )
1406 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1407 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1408 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1409 true
1410 )
1411 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1412 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1413 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1414 await fixedClusterPool.destroy()
1415 })
1416
1417 it('Verify that addTaskFunction() is working', async () => {
1418 const dynamicThreadPool = new DynamicThreadPool(
1419 Math.floor(numberOfWorkers / 2),
1420 numberOfWorkers,
1421 './tests/worker-files/thread/testWorker.mjs'
1422 )
1423 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1424 await expect(
1425 dynamicThreadPool.addTaskFunction(0, () => {})
1426 ).rejects.toThrow(new TypeError('name argument must be a string'))
1427 await expect(
1428 dynamicThreadPool.addTaskFunction('', () => {})
1429 ).rejects.toThrow(
1430 new TypeError('name argument must not be an empty string')
1431 )
1432 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1433 new TypeError('fn argument must be a function')
1434 )
1435 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1436 new TypeError('fn argument must be a function')
1437 )
1438 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1439 DEFAULT_TASK_NAME,
1440 'test'
1441 ])
1442 const echoTaskFunction = data => {
1443 return data
1444 }
1445 await expect(
1446 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1447 ).resolves.toBe(true)
1448 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1449 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1450 echoTaskFunction
1451 )
1452 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1453 DEFAULT_TASK_NAME,
1454 'test',
1455 'echo'
1456 ])
1457 const taskFunctionData = { test: 'test' }
1458 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1459 expect(echoResult).toStrictEqual(taskFunctionData)
1460 for (const workerNode of dynamicThreadPool.workerNodes) {
1461 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1462 tasks: {
1463 executed: expect.any(Number),
1464 executing: 0,
1465 queued: 0,
1466 sequentiallyStolen: 0,
1467 stolen: 0,
1468 failed: 0
1469 },
1470 runTime: {
1471 history: new CircularArray()
1472 },
1473 waitTime: {
1474 history: new CircularArray()
1475 },
1476 elu: {
1477 idle: {
1478 history: new CircularArray()
1479 },
1480 active: {
1481 history: new CircularArray()
1482 }
1483 }
1484 })
1485 }
1486 await dynamicThreadPool.destroy()
1487 })
1488
1489 it('Verify that removeTaskFunction() is working', async () => {
1490 const dynamicThreadPool = new DynamicThreadPool(
1491 Math.floor(numberOfWorkers / 2),
1492 numberOfWorkers,
1493 './tests/worker-files/thread/testWorker.mjs'
1494 )
1495 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1496 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1497 DEFAULT_TASK_NAME,
1498 'test'
1499 ])
1500 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1501 new Error('Cannot remove a task function not handled on the pool side')
1502 )
1503 const echoTaskFunction = data => {
1504 return data
1505 }
1506 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1507 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1508 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1509 echoTaskFunction
1510 )
1511 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1512 DEFAULT_TASK_NAME,
1513 'test',
1514 'echo'
1515 ])
1516 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1517 true
1518 )
1519 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1520 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1521 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1522 DEFAULT_TASK_NAME,
1523 'test'
1524 ])
1525 await dynamicThreadPool.destroy()
1526 })
1527
1528 it('Verify that listTaskFunctionNames() is working', async () => {
1529 const dynamicThreadPool = new DynamicThreadPool(
1530 Math.floor(numberOfWorkers / 2),
1531 numberOfWorkers,
1532 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1533 )
1534 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1535 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1536 DEFAULT_TASK_NAME,
1537 'jsonIntegerSerialization',
1538 'factorial',
1539 'fibonacci'
1540 ])
1541 await dynamicThreadPool.destroy()
1542 const fixedClusterPool = new FixedClusterPool(
1543 numberOfWorkers,
1544 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1545 )
1546 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1547 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1548 DEFAULT_TASK_NAME,
1549 'jsonIntegerSerialization',
1550 'factorial',
1551 'fibonacci'
1552 ])
1553 await fixedClusterPool.destroy()
1554 })
1555
1556 it('Verify that setDefaultTaskFunction() is working', async () => {
1557 const dynamicThreadPool = new DynamicThreadPool(
1558 Math.floor(numberOfWorkers / 2),
1559 numberOfWorkers,
1560 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1561 )
1562 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1563 const workerId = dynamicThreadPool.workerNodes[0].info.id
1564 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1565 new Error(
1566 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1567 )
1568 )
1569 await expect(
1570 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1571 ).rejects.toThrow(
1572 new Error(
1573 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1574 )
1575 )
1576 await expect(
1577 dynamicThreadPool.setDefaultTaskFunction('unknown')
1578 ).rejects.toThrow(
1579 new Error(
1580 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1581 )
1582 )
1583 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1584 DEFAULT_TASK_NAME,
1585 'jsonIntegerSerialization',
1586 'factorial',
1587 'fibonacci'
1588 ])
1589 await expect(
1590 dynamicThreadPool.setDefaultTaskFunction('factorial')
1591 ).resolves.toBe(true)
1592 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1593 DEFAULT_TASK_NAME,
1594 'factorial',
1595 'jsonIntegerSerialization',
1596 'fibonacci'
1597 ])
1598 await expect(
1599 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1600 ).resolves.toBe(true)
1601 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1602 DEFAULT_TASK_NAME,
1603 'fibonacci',
1604 'jsonIntegerSerialization',
1605 'factorial'
1606 ])
1607 await dynamicThreadPool.destroy()
1608 })
1609
1610 it('Verify that multiple task functions worker is working', async () => {
1611 const pool = new DynamicClusterPool(
1612 Math.floor(numberOfWorkers / 2),
1613 numberOfWorkers,
1614 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1615 )
1616 const data = { n: 10 }
1617 const result0 = await pool.execute(data)
1618 expect(result0).toStrictEqual({ ok: 1 })
1619 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1620 expect(result1).toStrictEqual({ ok: 1 })
1621 const result2 = await pool.execute(data, 'factorial')
1622 expect(result2).toBe(3628800)
1623 const result3 = await pool.execute(data, 'fibonacci')
1624 expect(result3).toBe(55)
1625 expect(pool.info.executingTasks).toBe(0)
1626 expect(pool.info.executedTasks).toBe(4)
1627 for (const workerNode of pool.workerNodes) {
1628 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1629 DEFAULT_TASK_NAME,
1630 'jsonIntegerSerialization',
1631 'factorial',
1632 'fibonacci'
1633 ])
1634 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1635 for (const name of pool.listTaskFunctionNames()) {
1636 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1637 tasks: {
1638 executed: expect.any(Number),
1639 executing: 0,
1640 failed: 0,
1641 queued: 0,
1642 sequentiallyStolen: 0,
1643 stolen: 0
1644 },
1645 runTime: {
1646 history: expect.any(CircularArray)
1647 },
1648 waitTime: {
1649 history: expect.any(CircularArray)
1650 },
1651 elu: {
1652 idle: {
1653 history: expect.any(CircularArray)
1654 },
1655 active: {
1656 history: expect.any(CircularArray)
1657 }
1658 }
1659 })
1660 expect(
1661 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1662 ).toBeGreaterThan(0)
1663 }
1664 expect(
1665 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1666 ).toStrictEqual(
1667 workerNode.getTaskFunctionWorkerUsage(
1668 workerNode.info.taskFunctionNames[1]
1669 )
1670 )
1671 }
1672 await pool.destroy()
1673 })
1674
1675 it('Verify sendKillMessageToWorker()', async () => {
1676 const pool = new DynamicClusterPool(
1677 Math.floor(numberOfWorkers / 2),
1678 numberOfWorkers,
1679 './tests/worker-files/cluster/testWorker.js'
1680 )
1681 const workerNodeKey = 0
1682 await expect(
1683 pool.sendKillMessageToWorker(workerNodeKey)
1684 ).resolves.toBeUndefined()
1685 await expect(
1686 pool.sendKillMessageToWorker(numberOfWorkers)
1687 ).rejects.toStrictEqual(
1688 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1689 )
1690 await pool.destroy()
1691 })
1692
1693 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1694 const pool = new DynamicClusterPool(
1695 Math.floor(numberOfWorkers / 2),
1696 numberOfWorkers,
1697 './tests/worker-files/cluster/testWorker.js'
1698 )
1699 const workerNodeKey = 0
1700 await expect(
1701 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1702 taskFunctionOperation: 'add',
1703 taskFunctionName: 'empty',
1704 taskFunction: (() => {}).toString()
1705 })
1706 ).resolves.toBe(true)
1707 expect(
1708 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1709 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1710 await pool.destroy()
1711 })
1712
1713 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1714 const pool = new DynamicClusterPool(
1715 Math.floor(numberOfWorkers / 2),
1716 numberOfWorkers,
1717 './tests/worker-files/cluster/testWorker.js'
1718 )
1719 await expect(
1720 pool.sendTaskFunctionOperationToWorkers({
1721 taskFunctionOperation: 'add',
1722 taskFunctionName: 'empty',
1723 taskFunction: (() => {}).toString()
1724 })
1725 ).resolves.toBe(true)
1726 for (const workerNode of pool.workerNodes) {
1727 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1728 DEFAULT_TASK_NAME,
1729 'test',
1730 'empty'
1731 ])
1732 }
1733 await pool.destroy()
1734 })
1735 })