build(deps-dev): apply updates
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 // eslint-disable-next-line n/no-unsupported-features/node-builtins
2 import { createHook, executionAsyncId } from 'node:async_hooks'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { readFileSync } from 'node:fs'
5 import { dirname, join } from 'node:path'
6 import { fileURLToPath } from 'node:url'
7
8 import { expect } from 'expect'
9 import { restore, stub } from 'sinon'
10
11 import { CircularBuffer } from '../../lib/circular-buffer.cjs'
12 import {
13 DynamicClusterPool,
14 DynamicThreadPool,
15 FixedClusterPool,
16 FixedThreadPool,
17 PoolEvents,
18 PoolTypes,
19 WorkerChoiceStrategies,
20 WorkerTypes,
21 } from '../../lib/index.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23 import { defaultBucketSize, PriorityQueue } from '../../lib/priority-queue.cjs'
24 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
25 import { waitPoolEvents } from '../test-utils.cjs'
26
27 describe('Abstract pool test suite', () => {
28 const version = JSON.parse(
29 readFileSync(
30 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 'utf8'
32 )
33 ).version
34 const numberOfWorkers = 2
35 class StubPoolWithIsMain extends FixedThreadPool {
36 isMain () {
37 return false
38 }
39 }
40
41 afterEach(() => {
42 restore()
43 })
44
45 it('Verify that pool can be created and destroyed', async () => {
46 const pool = new FixedThreadPool(
47 numberOfWorkers,
48 './tests/worker-files/thread/testWorker.mjs'
49 )
50 expect(pool).toBeInstanceOf(FixedThreadPool)
51 await pool.destroy()
52 })
53
54 it('Verify that pool cannot be created from a non main thread/process', () => {
55 expect(
56 () =>
57 new StubPoolWithIsMain(
58 numberOfWorkers,
59 './tests/worker-files/thread/testWorker.mjs',
60 {
61 errorHandler: e => console.error(e),
62 }
63 )
64 ).toThrow(
65 new Error(
66 'Cannot start a pool from a worker with the same type as the pool'
67 )
68 )
69 })
70
71 it('Verify that pool statuses properties are set', async () => {
72 const pool = new FixedThreadPool(
73 numberOfWorkers,
74 './tests/worker-files/thread/testWorker.mjs'
75 )
76 expect(pool.started).toBe(true)
77 expect(pool.starting).toBe(false)
78 expect(pool.destroying).toBe(false)
79 await pool.destroy()
80 })
81
82 it('Verify that filePath is checked', () => {
83 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
84 new TypeError('The worker file path must be specified')
85 )
86 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
87 new TypeError('The worker file path must be a string')
88 )
89 expect(
90 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
91 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
92 })
93
94 it('Verify that numberOfWorkers is checked', () => {
95 expect(
96 () =>
97 new FixedThreadPool(
98 undefined,
99 './tests/worker-files/thread/testWorker.mjs'
100 )
101 ).toThrow(
102 new Error(
103 'Cannot instantiate a pool without specifying the number of workers'
104 )
105 )
106 })
107
108 it('Verify that a negative number of workers is checked', () => {
109 expect(
110 () =>
111 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
112 ).toThrow(
113 new RangeError(
114 'Cannot instantiate a pool with a negative number of workers'
115 )
116 )
117 })
118
119 it('Verify that a non integer number of workers is checked', () => {
120 expect(
121 () =>
122 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
123 ).toThrow(
124 new TypeError(
125 'Cannot instantiate a pool with a non safe integer number of workers'
126 )
127 )
128 })
129
130 it('Verify that pool arguments number and pool type are checked', () => {
131 expect(
132 () =>
133 new FixedThreadPool(
134 numberOfWorkers,
135 './tests/worker-files/thread/testWorker.mjs',
136 undefined,
137 numberOfWorkers * 2
138 )
139 ).toThrow(
140 new Error(
141 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
142 )
143 )
144 })
145
146 it('Verify that dynamic pool sizing is checked', () => {
147 expect(
148 () =>
149 new DynamicClusterPool(
150 1,
151 undefined,
152 './tests/worker-files/cluster/testWorker.cjs'
153 )
154 ).toThrow(
155 new TypeError(
156 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
157 )
158 )
159 expect(
160 () =>
161 new DynamicThreadPool(
162 0.5,
163 1,
164 './tests/worker-files/thread/testWorker.mjs'
165 )
166 ).toThrow(
167 new TypeError(
168 'Cannot instantiate a pool with a non safe integer number of workers'
169 )
170 )
171 expect(
172 () =>
173 new DynamicClusterPool(
174 0,
175 0.5,
176 './tests/worker-files/cluster/testWorker.cjs'
177 )
178 ).toThrow(
179 new TypeError(
180 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
181 )
182 )
183 expect(
184 () =>
185 new DynamicThreadPool(
186 2,
187 1,
188 './tests/worker-files/thread/testWorker.mjs'
189 )
190 ).toThrow(
191 new RangeError(
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
193 )
194 )
195 expect(
196 () =>
197 new DynamicThreadPool(
198 0,
199 0,
200 './tests/worker-files/thread/testWorker.mjs'
201 )
202 ).toThrow(
203 new RangeError(
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
205 )
206 )
207 expect(
208 () =>
209 new DynamicClusterPool(
210 1,
211 1,
212 './tests/worker-files/cluster/testWorker.cjs'
213 )
214 ).toThrow(
215 new RangeError(
216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
217 )
218 )
219 })
220
221 it('Verify that pool options are checked', async () => {
222 let pool = new FixedThreadPool(
223 numberOfWorkers,
224 './tests/worker-files/thread/testWorker.mjs'
225 )
226 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
227 expect(pool.emitter.eventNames()).toStrictEqual([])
228 expect(pool.opts).toStrictEqual({
229 startWorkers: true,
230 enableEvents: true,
231 restartWorkerOnError: true,
232 enableTasksQueue: false,
233 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
234 })
235 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
236 .workerChoiceStrategies) {
237 expect(workerChoiceStrategy.opts).toStrictEqual({
238 runTime: { median: false },
239 waitTime: { median: false },
240 elu: { median: false },
241 weights: expect.objectContaining({
242 0: expect.any(Number),
243 [pool.info.maxSize - 1]: expect.any(Number),
244 }),
245 })
246 }
247 await pool.destroy()
248 const testHandler = () => console.info('test handler executed')
249 pool = new FixedThreadPool(
250 numberOfWorkers,
251 './tests/worker-files/thread/testWorker.mjs',
252 {
253 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
254 workerChoiceStrategyOptions: {
255 runTime: { median: true },
256 weights: { 0: 300, 1: 200 },
257 },
258 enableEvents: false,
259 restartWorkerOnError: false,
260 enableTasksQueue: true,
261 tasksQueueOptions: { concurrency: 2 },
262 messageHandler: testHandler,
263 errorHandler: testHandler,
264 onlineHandler: testHandler,
265 exitHandler: testHandler,
266 }
267 )
268 expect(pool.emitter).toBeUndefined()
269 expect(pool.opts).toStrictEqual({
270 startWorkers: true,
271 enableEvents: false,
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
274 tasksQueueOptions: {
275 concurrency: 2,
276 size: Math.pow(numberOfWorkers, 2),
277 taskStealing: true,
278 tasksStealingOnBackPressure: false,
279 tasksFinishedTimeout: 2000,
280 },
281 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
282 workerChoiceStrategyOptions: {
283 runTime: { median: true },
284 weights: { 0: 300, 1: 200 },
285 },
286 onlineHandler: testHandler,
287 messageHandler: testHandler,
288 errorHandler: testHandler,
289 exitHandler: testHandler,
290 })
291 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
292 .workerChoiceStrategies) {
293 expect(workerChoiceStrategy.opts).toStrictEqual({
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 },
298 })
299 }
300 await pool.destroy()
301 })
302
303 it('Verify that pool options are validated', () => {
304 expect(
305 () =>
306 new FixedThreadPool(
307 numberOfWorkers,
308 './tests/worker-files/thread/testWorker.mjs',
309 {
310 workerChoiceStrategy: 'invalidStrategy',
311 }
312 )
313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
314 expect(
315 () =>
316 new FixedThreadPool(
317 numberOfWorkers,
318 './tests/worker-files/thread/testWorker.mjs',
319 {
320 workerChoiceStrategyOptions: { weights: {} },
321 }
322 )
323 ).toThrow(
324 new Error(
325 'Invalid worker choice strategy options: must have a weight for each worker node'
326 )
327 )
328 expect(
329 () =>
330 new FixedThreadPool(
331 numberOfWorkers,
332 './tests/worker-files/thread/testWorker.mjs',
333 {
334 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' },
335 }
336 )
337 ).toThrow(
338 new Error(
339 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
340 )
341 )
342 expect(
343 () =>
344 new FixedThreadPool(
345 numberOfWorkers,
346 './tests/worker-files/thread/testWorker.mjs',
347 {
348 enableTasksQueue: true,
349 tasksQueueOptions: 'invalidTasksQueueOptions',
350 }
351 )
352 ).toThrow(
353 new TypeError('Invalid tasks queue options: must be a plain object')
354 )
355 expect(
356 () =>
357 new FixedThreadPool(
358 numberOfWorkers,
359 './tests/worker-files/thread/testWorker.mjs',
360 {
361 enableTasksQueue: true,
362 tasksQueueOptions: { concurrency: 0 },
363 }
364 )
365 ).toThrow(
366 new RangeError(
367 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
368 )
369 )
370 expect(
371 () =>
372 new FixedThreadPool(
373 numberOfWorkers,
374 './tests/worker-files/thread/testWorker.mjs',
375 {
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: -1 },
378 }
379 )
380 ).toThrow(
381 new RangeError(
382 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
383 )
384 )
385 expect(
386 () =>
387 new FixedThreadPool(
388 numberOfWorkers,
389 './tests/worker-files/thread/testWorker.mjs',
390 {
391 enableTasksQueue: true,
392 tasksQueueOptions: { concurrency: 0.2 },
393 }
394 )
395 ).toThrow(
396 new TypeError('Invalid worker node tasks concurrency: must be an integer')
397 )
398 expect(
399 () =>
400 new FixedThreadPool(
401 numberOfWorkers,
402 './tests/worker-files/thread/testWorker.mjs',
403 {
404 enableTasksQueue: true,
405 tasksQueueOptions: { size: 0 },
406 }
407 )
408 ).toThrow(
409 new RangeError(
410 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
411 )
412 )
413 expect(
414 () =>
415 new FixedThreadPool(
416 numberOfWorkers,
417 './tests/worker-files/thread/testWorker.mjs',
418 {
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: -1 },
421 }
422 )
423 ).toThrow(
424 new RangeError(
425 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
426 )
427 )
428 expect(
429 () =>
430 new FixedThreadPool(
431 numberOfWorkers,
432 './tests/worker-files/thread/testWorker.mjs',
433 {
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: 0.2 },
436 }
437 )
438 ).toThrow(
439 new TypeError('Invalid worker node tasks queue size: must be an integer')
440 )
441 })
442
443 it('Verify that pool worker choice strategy options can be set', async () => {
444 const pool = new FixedThreadPool(
445 numberOfWorkers,
446 './tests/worker-files/thread/testWorker.mjs',
447 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
448 )
449 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
450 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
451 .workerChoiceStrategies) {
452 expect(workerChoiceStrategy.opts).toStrictEqual({
453 runTime: { median: false },
454 waitTime: { median: false },
455 elu: { median: false },
456 weights: expect.objectContaining({
457 0: expect.any(Number),
458 [pool.info.maxSize - 1]: expect.any(Number),
459 }),
460 })
461 }
462 expect(
463 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
464 ).toStrictEqual({
465 runTime: {
466 aggregate: true,
467 average: true,
468 median: false,
469 },
470 waitTime: {
471 aggregate: true,
472 average: true,
473 median: false,
474 },
475 elu: {
476 aggregate: true,
477 average: true,
478 median: false,
479 },
480 })
481 pool.setWorkerChoiceStrategyOptions({
482 runTime: { median: true },
483 elu: { median: true },
484 })
485 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
486 runTime: { median: true },
487 elu: { median: true },
488 })
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
490 .workerChoiceStrategies) {
491 expect(workerChoiceStrategy.opts).toStrictEqual({
492 runTime: { median: true },
493 waitTime: { median: false },
494 elu: { median: true },
495 weights: expect.objectContaining({
496 0: expect.any(Number),
497 [pool.info.maxSize - 1]: expect.any(Number),
498 }),
499 })
500 }
501 expect(
502 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
503 ).toStrictEqual({
504 runTime: {
505 aggregate: true,
506 average: false,
507 median: true,
508 },
509 waitTime: {
510 aggregate: true,
511 average: true,
512 median: false,
513 },
514 elu: {
515 aggregate: true,
516 average: false,
517 median: true,
518 },
519 })
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: false },
522 elu: { median: false },
523 })
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
525 runTime: { median: false },
526 elu: { median: false },
527 })
528 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
529 .workerChoiceStrategies) {
530 expect(workerChoiceStrategy.opts).toStrictEqual({
531 runTime: { median: false },
532 waitTime: { median: false },
533 elu: { median: false },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number),
537 }),
538 })
539 }
540 expect(
541 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
542 ).toStrictEqual({
543 runTime: {
544 aggregate: true,
545 average: true,
546 median: false,
547 },
548 waitTime: {
549 aggregate: true,
550 average: true,
551 median: false,
552 },
553 elu: {
554 aggregate: true,
555 average: true,
556 median: false,
557 },
558 })
559 expect(() =>
560 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
561 ).toThrow(
562 new TypeError(
563 'Invalid worker choice strategy options: must be a plain object'
564 )
565 )
566 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
567 new Error(
568 'Invalid worker choice strategy options: must have a weight for each worker node'
569 )
570 )
571 expect(() =>
572 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
573 ).toThrow(
574 new Error(
575 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
576 )
577 )
578 await pool.destroy()
579 })
580
581 it('Verify that pool tasks queue can be enabled/disabled', async () => {
582 const pool = new FixedThreadPool(
583 numberOfWorkers,
584 './tests/worker-files/thread/testWorker.mjs'
585 )
586 expect(pool.opts.enableTasksQueue).toBe(false)
587 expect(pool.opts.tasksQueueOptions).toBeUndefined()
588 pool.enableTasksQueue(true)
589 expect(pool.opts.enableTasksQueue).toBe(true)
590 expect(pool.opts.tasksQueueOptions).toStrictEqual({
591 concurrency: 1,
592 size: Math.pow(numberOfWorkers, 2),
593 taskStealing: true,
594 tasksStealingOnBackPressure: false,
595 tasksFinishedTimeout: 2000,
596 })
597 pool.enableTasksQueue(true, { concurrency: 2 })
598 expect(pool.opts.enableTasksQueue).toBe(true)
599 expect(pool.opts.tasksQueueOptions).toStrictEqual({
600 concurrency: 2,
601 size: Math.pow(numberOfWorkers, 2),
602 taskStealing: true,
603 tasksStealingOnBackPressure: false,
604 tasksFinishedTimeout: 2000,
605 })
606 pool.enableTasksQueue(false)
607 expect(pool.opts.enableTasksQueue).toBe(false)
608 expect(pool.opts.tasksQueueOptions).toBeUndefined()
609 await pool.destroy()
610 })
611
612 it('Verify that pool tasks queue options can be set', async () => {
613 const pool = new FixedThreadPool(
614 numberOfWorkers,
615 './tests/worker-files/thread/testWorker.mjs',
616 { enableTasksQueue: true }
617 )
618 expect(pool.opts.tasksQueueOptions).toStrictEqual({
619 concurrency: 1,
620 size: Math.pow(numberOfWorkers, 2),
621 taskStealing: true,
622 tasksStealingOnBackPressure: false,
623 tasksFinishedTimeout: 2000,
624 })
625 for (const workerNode of pool.workerNodes) {
626 expect(workerNode.tasksQueueBackPressureSize).toBe(
627 pool.opts.tasksQueueOptions.size
628 )
629 }
630 pool.setTasksQueueOptions({
631 concurrency: 2,
632 size: 2,
633 taskStealing: false,
634 tasksStealingOnBackPressure: false,
635 tasksFinishedTimeout: 3000,
636 })
637 expect(pool.opts.tasksQueueOptions).toStrictEqual({
638 concurrency: 2,
639 size: 2,
640 taskStealing: false,
641 tasksStealingOnBackPressure: false,
642 tasksFinishedTimeout: 3000,
643 })
644 for (const workerNode of pool.workerNodes) {
645 expect(workerNode.tasksQueueBackPressureSize).toBe(
646 pool.opts.tasksQueueOptions.size
647 )
648 }
649 pool.setTasksQueueOptions({
650 concurrency: 1,
651 taskStealing: true,
652 tasksStealingOnBackPressure: true,
653 })
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
655 concurrency: 1,
656 size: Math.pow(numberOfWorkers, 2),
657 taskStealing: true,
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 2000,
660 })
661 for (const workerNode of pool.workerNodes) {
662 expect(workerNode.tasksQueueBackPressureSize).toBe(
663 pool.opts.tasksQueueOptions.size
664 )
665 }
666 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
667 new TypeError('Invalid tasks queue options: must be a plain object')
668 )
669 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
670 new RangeError(
671 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
672 )
673 )
674 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
675 new RangeError(
676 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
677 )
678 )
679 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
680 new TypeError('Invalid worker node tasks concurrency: must be an integer')
681 )
682 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
683 new RangeError(
684 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
685 )
686 )
687 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
688 new RangeError(
689 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
690 )
691 )
692 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
693 new TypeError('Invalid worker node tasks queue size: must be an integer')
694 )
695 await pool.destroy()
696 })
697
698 it('Verify that pool info is set', async () => {
699 let pool = new FixedThreadPool(
700 numberOfWorkers,
701 './tests/worker-files/thread/testWorker.mjs'
702 )
703 expect(pool.info).toStrictEqual({
704 version,
705 type: PoolTypes.fixed,
706 worker: WorkerTypes.thread,
707 started: true,
708 ready: true,
709 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
710 strategyRetries: 0,
711 minSize: numberOfWorkers,
712 maxSize: numberOfWorkers,
713 workerNodes: numberOfWorkers,
714 idleWorkerNodes: numberOfWorkers,
715 busyWorkerNodes: 0,
716 executedTasks: 0,
717 executingTasks: 0,
718 failedTasks: 0,
719 })
720 await pool.destroy()
721 pool = new DynamicClusterPool(
722 Math.floor(numberOfWorkers / 2),
723 numberOfWorkers,
724 './tests/worker-files/cluster/testWorker.cjs'
725 )
726 expect(pool.info).toStrictEqual({
727 version,
728 type: PoolTypes.dynamic,
729 worker: WorkerTypes.cluster,
730 started: true,
731 ready: true,
732 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
733 strategyRetries: 0,
734 minSize: Math.floor(numberOfWorkers / 2),
735 maxSize: numberOfWorkers,
736 workerNodes: Math.floor(numberOfWorkers / 2),
737 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
738 busyWorkerNodes: 0,
739 executedTasks: 0,
740 executingTasks: 0,
741 failedTasks: 0,
742 })
743 await pool.destroy()
744 })
745
746 it('Verify that pool worker tasks usage are initialized', async () => {
747 const pool = new FixedClusterPool(
748 numberOfWorkers,
749 './tests/worker-files/cluster/testWorker.cjs'
750 )
751 for (const workerNode of pool.workerNodes) {
752 expect(workerNode).toBeInstanceOf(WorkerNode)
753 expect(workerNode.usage).toStrictEqual({
754 tasks: {
755 executed: 0,
756 executing: 0,
757 queued: 0,
758 maxQueued: 0,
759 sequentiallyStolen: 0,
760 stolen: 0,
761 failed: 0,
762 },
763 runTime: {
764 history: expect.any(CircularBuffer),
765 },
766 waitTime: {
767 history: expect.any(CircularBuffer),
768 },
769 elu: {
770 idle: {
771 history: expect.any(CircularBuffer),
772 },
773 active: {
774 history: expect.any(CircularBuffer),
775 },
776 },
777 })
778 }
779 await pool.destroy()
780 })
781
782 it('Verify that pool worker tasks queue are initialized', async () => {
783 let pool = new FixedClusterPool(
784 numberOfWorkers,
785 './tests/worker-files/cluster/testWorker.cjs'
786 )
787 for (const workerNode of pool.workerNodes) {
788 expect(workerNode).toBeInstanceOf(WorkerNode)
789 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
790 expect(workerNode.tasksQueue.size).toBe(0)
791 expect(workerNode.tasksQueue.maxSize).toBe(0)
792 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
793 expect(workerNode.tasksQueue.enablePriority).toBe(false)
794 }
795 await pool.destroy()
796 pool = new DynamicThreadPool(
797 Math.floor(numberOfWorkers / 2),
798 numberOfWorkers,
799 './tests/worker-files/thread/testWorker.mjs'
800 )
801 for (const workerNode of pool.workerNodes) {
802 expect(workerNode).toBeInstanceOf(WorkerNode)
803 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
804 expect(workerNode.tasksQueue.size).toBe(0)
805 expect(workerNode.tasksQueue.maxSize).toBe(0)
806 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
807 expect(workerNode.tasksQueue.enablePriority).toBe(false)
808 }
809 await pool.destroy()
810 })
811
812 it('Verify that pool worker info are initialized', async () => {
813 let pool = new FixedClusterPool(
814 numberOfWorkers,
815 './tests/worker-files/cluster/testWorker.cjs'
816 )
817 for (const workerNode of pool.workerNodes) {
818 expect(workerNode).toBeInstanceOf(WorkerNode)
819 expect(workerNode.info).toStrictEqual({
820 id: expect.any(Number),
821 type: WorkerTypes.cluster,
822 dynamic: false,
823 ready: true,
824 stealing: false,
825 backPressure: false,
826 })
827 }
828 await pool.destroy()
829 pool = new DynamicThreadPool(
830 Math.floor(numberOfWorkers / 2),
831 numberOfWorkers,
832 './tests/worker-files/thread/testWorker.mjs'
833 )
834 for (const workerNode of pool.workerNodes) {
835 expect(workerNode).toBeInstanceOf(WorkerNode)
836 expect(workerNode.info).toStrictEqual({
837 id: expect.any(Number),
838 type: WorkerTypes.thread,
839 dynamic: false,
840 ready: true,
841 stealing: false,
842 backPressure: false,
843 })
844 }
845 await pool.destroy()
846 })
847
848 it('Verify that pool statuses are checked at start or destroy', async () => {
849 const pool = new FixedThreadPool(
850 numberOfWorkers,
851 './tests/worker-files/thread/testWorker.mjs'
852 )
853 expect(pool.info.started).toBe(true)
854 expect(pool.info.ready).toBe(true)
855 expect(() => pool.start()).toThrow(
856 new Error('Cannot start an already started pool')
857 )
858 await pool.destroy()
859 expect(pool.info.started).toBe(false)
860 expect(pool.info.ready).toBe(false)
861 await expect(pool.destroy()).rejects.toThrow(
862 new Error('Cannot destroy an already destroyed pool')
863 )
864 })
865
866 it('Verify that pool can be started after initialization', async () => {
867 const pool = new FixedClusterPool(
868 numberOfWorkers,
869 './tests/worker-files/cluster/testWorker.cjs',
870 {
871 startWorkers: false,
872 }
873 )
874 expect(pool.info.started).toBe(false)
875 expect(pool.info.ready).toBe(false)
876 expect(pool.workerNodes).toStrictEqual([])
877 expect(pool.readyEventEmitted).toBe(false)
878 await expect(pool.execute()).rejects.toThrow(
879 new Error('Cannot execute a task on not started pool')
880 )
881 pool.start()
882 expect(pool.info.started).toBe(true)
883 expect(pool.info.ready).toBe(true)
884 await waitPoolEvents(pool, PoolEvents.ready, 1)
885 expect(pool.readyEventEmitted).toBe(true)
886 expect(pool.workerNodes.length).toBe(numberOfWorkers)
887 for (const workerNode of pool.workerNodes) {
888 expect(workerNode).toBeInstanceOf(WorkerNode)
889 }
890 await pool.destroy()
891 })
892
893 it('Verify that pool execute() arguments are checked', async () => {
894 const pool = new FixedClusterPool(
895 numberOfWorkers,
896 './tests/worker-files/cluster/testWorker.cjs'
897 )
898 await expect(pool.execute(undefined, 0)).rejects.toThrow(
899 new TypeError('name argument must be a string')
900 )
901 await expect(pool.execute(undefined, '')).rejects.toThrow(
902 new TypeError('name argument must not be an empty string')
903 )
904 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
905 new TypeError('transferList argument must be an array')
906 )
907 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
908 "Task function 'unknown' not found"
909 )
910 await pool.destroy()
911 await expect(pool.execute()).rejects.toThrow(
912 new Error('Cannot execute a task on not started pool')
913 )
914 })
915
916 it('Verify that pool worker tasks usage are computed', async () => {
917 const pool = new FixedClusterPool(
918 numberOfWorkers,
919 './tests/worker-files/cluster/testWorker.cjs'
920 )
921 const promises = new Set()
922 const maxMultiplier = 2
923 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
924 promises.add(pool.execute())
925 }
926 for (const workerNode of pool.workerNodes) {
927 expect(workerNode.usage).toStrictEqual({
928 tasks: {
929 executed: 0,
930 executing: maxMultiplier,
931 queued: 0,
932 maxQueued: 0,
933 sequentiallyStolen: 0,
934 stolen: 0,
935 failed: 0,
936 },
937 runTime: {
938 history: expect.any(CircularBuffer),
939 },
940 waitTime: {
941 history: expect.any(CircularBuffer),
942 },
943 elu: {
944 idle: {
945 history: expect.any(CircularBuffer),
946 },
947 active: {
948 history: expect.any(CircularBuffer),
949 },
950 },
951 })
952 }
953 await Promise.all(promises)
954 for (const workerNode of pool.workerNodes) {
955 expect(workerNode.usage).toStrictEqual({
956 tasks: {
957 executed: maxMultiplier,
958 executing: 0,
959 queued: 0,
960 maxQueued: 0,
961 sequentiallyStolen: 0,
962 stolen: 0,
963 failed: 0,
964 },
965 runTime: {
966 history: expect.any(CircularBuffer),
967 },
968 waitTime: {
969 history: expect.any(CircularBuffer),
970 },
971 elu: {
972 idle: {
973 history: expect.any(CircularBuffer),
974 },
975 active: {
976 history: expect.any(CircularBuffer),
977 },
978 },
979 })
980 }
981 await pool.destroy()
982 })
983
984 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
985 const pool = new DynamicThreadPool(
986 Math.floor(numberOfWorkers / 2),
987 numberOfWorkers,
988 './tests/worker-files/thread/testWorker.mjs'
989 )
990 const promises = new Set()
991 const maxMultiplier = 2
992 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
993 promises.add(pool.execute())
994 }
995 await Promise.all(promises)
996 for (const workerNode of pool.workerNodes) {
997 expect(workerNode.usage).toStrictEqual({
998 tasks: {
999 executed: expect.any(Number),
1000 executing: 0,
1001 queued: 0,
1002 maxQueued: 0,
1003 sequentiallyStolen: 0,
1004 stolen: 0,
1005 failed: 0,
1006 },
1007 runTime: {
1008 history: expect.any(CircularBuffer),
1009 },
1010 waitTime: {
1011 history: expect.any(CircularBuffer),
1012 },
1013 elu: {
1014 idle: {
1015 history: expect.any(CircularBuffer),
1016 },
1017 active: {
1018 history: expect.any(CircularBuffer),
1019 },
1020 },
1021 })
1022 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1023 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1024 numberOfWorkers * maxMultiplier
1025 )
1026 }
1027 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1028 for (const workerNode of pool.workerNodes) {
1029 expect(workerNode.usage).toStrictEqual({
1030 tasks: {
1031 executed: expect.any(Number),
1032 executing: 0,
1033 queued: 0,
1034 maxQueued: 0,
1035 sequentiallyStolen: 0,
1036 stolen: 0,
1037 failed: 0,
1038 },
1039 runTime: {
1040 history: expect.any(CircularBuffer),
1041 },
1042 waitTime: {
1043 history: expect.any(CircularBuffer),
1044 },
1045 elu: {
1046 idle: {
1047 history: expect.any(CircularBuffer),
1048 },
1049 active: {
1050 history: expect.any(CircularBuffer),
1051 },
1052 },
1053 })
1054 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1055 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1056 numberOfWorkers * maxMultiplier
1057 )
1058 }
1059 await pool.destroy()
1060 })
1061
1062 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1063 const pool = new DynamicClusterPool(
1064 Math.floor(numberOfWorkers / 2),
1065 numberOfWorkers,
1066 './tests/worker-files/cluster/testWorker.cjs'
1067 )
1068 expect(pool.emitter.eventNames()).toStrictEqual([])
1069 let poolInfo
1070 let poolReady = 0
1071 pool.emitter.on(PoolEvents.ready, info => {
1072 ++poolReady
1073 poolInfo = info
1074 })
1075 await waitPoolEvents(pool, PoolEvents.ready, 1)
1076 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1077 expect(poolReady).toBe(1)
1078 expect(poolInfo).toStrictEqual({
1079 version,
1080 type: PoolTypes.dynamic,
1081 worker: WorkerTypes.cluster,
1082 started: true,
1083 ready: true,
1084 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1085 strategyRetries: expect.any(Number),
1086 minSize: expect.any(Number),
1087 maxSize: expect.any(Number),
1088 workerNodes: expect.any(Number),
1089 idleWorkerNodes: expect.any(Number),
1090 busyWorkerNodes: expect.any(Number),
1091 executedTasks: expect.any(Number),
1092 executingTasks: expect.any(Number),
1093 failedTasks: expect.any(Number),
1094 })
1095 await pool.destroy()
1096 })
1097
1098 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1099 const pool = new FixedThreadPool(
1100 numberOfWorkers,
1101 './tests/worker-files/thread/testWorker.mjs'
1102 )
1103 expect(pool.emitter.eventNames()).toStrictEqual([])
1104 const promises = new Set()
1105 let poolBusy = 0
1106 let poolInfo
1107 pool.emitter.on(PoolEvents.busy, info => {
1108 ++poolBusy
1109 poolInfo = info
1110 })
1111 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1112 for (let i = 0; i < numberOfWorkers * 2; i++) {
1113 promises.add(pool.execute())
1114 }
1115 await Promise.all(promises)
1116 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1117 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1118 expect(poolBusy).toBe(numberOfWorkers + 1)
1119 expect(poolInfo).toStrictEqual({
1120 version,
1121 type: PoolTypes.fixed,
1122 worker: WorkerTypes.thread,
1123 started: true,
1124 ready: true,
1125 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1126 strategyRetries: expect.any(Number),
1127 minSize: expect.any(Number),
1128 maxSize: expect.any(Number),
1129 workerNodes: expect.any(Number),
1130 idleWorkerNodes: expect.any(Number),
1131 busyWorkerNodes: expect.any(Number),
1132 executedTasks: expect.any(Number),
1133 executingTasks: expect.any(Number),
1134 failedTasks: expect.any(Number),
1135 })
1136 await pool.destroy()
1137 })
1138
1139 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1140 const pool = new DynamicThreadPool(
1141 Math.floor(numberOfWorkers / 2),
1142 numberOfWorkers,
1143 './tests/worker-files/thread/testWorker.mjs'
1144 )
1145 expect(pool.emitter.eventNames()).toStrictEqual([])
1146 const promises = new Set()
1147 let poolFull = 0
1148 let poolInfo
1149 pool.emitter.on(PoolEvents.full, info => {
1150 ++poolFull
1151 poolInfo = info
1152 })
1153 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1154 for (let i = 0; i < numberOfWorkers * 2; i++) {
1155 promises.add(pool.execute())
1156 }
1157 await Promise.all(promises)
1158 expect(poolFull).toBe(1)
1159 expect(poolInfo).toStrictEqual({
1160 version,
1161 type: PoolTypes.dynamic,
1162 worker: WorkerTypes.thread,
1163 started: true,
1164 ready: true,
1165 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1166 strategyRetries: expect.any(Number),
1167 minSize: expect.any(Number),
1168 maxSize: expect.any(Number),
1169 workerNodes: expect.any(Number),
1170 idleWorkerNodes: expect.any(Number),
1171 busyWorkerNodes: expect.any(Number),
1172 executedTasks: expect.any(Number),
1173 executingTasks: expect.any(Number),
1174 failedTasks: expect.any(Number),
1175 })
1176 await pool.destroy()
1177 })
1178
1179 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1180 const pool = new FixedThreadPool(
1181 numberOfWorkers,
1182 './tests/worker-files/thread/testWorker.mjs',
1183 {
1184 enableTasksQueue: true,
1185 }
1186 )
1187 stub(pool, 'hasBackPressure').returns(true)
1188 expect(pool.emitter.eventNames()).toStrictEqual([])
1189 const promises = new Set()
1190 let poolBackPressure = 0
1191 let poolInfo
1192 pool.emitter.on(PoolEvents.backPressure, info => {
1193 ++poolBackPressure
1194 poolInfo = info
1195 })
1196 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1197 for (let i = 0; i < numberOfWorkers + 1; i++) {
1198 promises.add(pool.execute())
1199 }
1200 await Promise.all(promises)
1201 expect(poolBackPressure).toBe(1)
1202 expect(poolInfo).toStrictEqual({
1203 version,
1204 type: PoolTypes.fixed,
1205 worker: WorkerTypes.thread,
1206 started: true,
1207 ready: true,
1208 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1209 strategyRetries: expect.any(Number),
1210 minSize: expect.any(Number),
1211 maxSize: expect.any(Number),
1212 workerNodes: expect.any(Number),
1213 idleWorkerNodes: expect.any(Number),
1214 stealingWorkerNodes: expect.any(Number),
1215 busyWorkerNodes: expect.any(Number),
1216 executedTasks: expect.any(Number),
1217 executingTasks: expect.any(Number),
1218 maxQueuedTasks: expect.any(Number),
1219 queuedTasks: expect.any(Number),
1220 backPressure: true,
1221 stolenTasks: expect.any(Number),
1222 failedTasks: expect.any(Number),
1223 })
1224 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1225 await pool.destroy()
1226 })
1227
1228 it('Verify that destroy() waits for queued tasks to finish', async () => {
1229 const tasksFinishedTimeout = 2500
1230 const pool = new FixedThreadPool(
1231 numberOfWorkers,
1232 './tests/worker-files/thread/asyncWorker.mjs',
1233 {
1234 enableTasksQueue: true,
1235 tasksQueueOptions: { tasksFinishedTimeout },
1236 }
1237 )
1238 const maxMultiplier = 4
1239 let tasksFinished = 0
1240 for (const workerNode of pool.workerNodes) {
1241 workerNode.on('taskFinished', () => {
1242 ++tasksFinished
1243 })
1244 }
1245 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1246 pool.execute()
1247 }
1248 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1249 const startTime = performance.now()
1250 await pool.destroy()
1251 const elapsedTime = performance.now() - startTime
1252 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1253 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1254 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1255 })
1256
1257 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1258 const tasksFinishedTimeout = 1000
1259 const pool = new FixedThreadPool(
1260 numberOfWorkers,
1261 './tests/worker-files/thread/asyncWorker.mjs',
1262 {
1263 enableTasksQueue: true,
1264 tasksQueueOptions: { tasksFinishedTimeout },
1265 }
1266 )
1267 const maxMultiplier = 4
1268 let tasksFinished = 0
1269 for (const workerNode of pool.workerNodes) {
1270 workerNode.on('taskFinished', () => {
1271 ++tasksFinished
1272 })
1273 }
1274 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1275 pool.execute()
1276 }
1277 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1278 const startTime = performance.now()
1279 await pool.destroy()
1280 const elapsedTime = performance.now() - startTime
1281 expect(tasksFinished).toBe(0)
1282 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1283 })
1284
1285 it('Verify that pool asynchronous resource track tasks execution', async () => {
1286 let taskAsyncId
1287 let initCalls = 0
1288 let beforeCalls = 0
1289 let afterCalls = 0
1290 let resolveCalls = 0
1291 const hook = createHook({
1292 init (asyncId, type) {
1293 if (type === 'poolifier:task') {
1294 initCalls++
1295 taskAsyncId = asyncId
1296 }
1297 },
1298 before (asyncId) {
1299 if (asyncId === taskAsyncId) beforeCalls++
1300 },
1301 after (asyncId) {
1302 if (asyncId === taskAsyncId) afterCalls++
1303 },
1304 promiseResolve () {
1305 if (executionAsyncId() === taskAsyncId) resolveCalls++
1306 },
1307 })
1308 const pool = new FixedThreadPool(
1309 numberOfWorkers,
1310 './tests/worker-files/thread/testWorker.mjs'
1311 )
1312 hook.enable()
1313 await pool.execute()
1314 hook.disable()
1315 expect(initCalls).toBe(1)
1316 expect(beforeCalls).toBe(1)
1317 expect(afterCalls).toBe(1)
1318 expect(resolveCalls).toBe(1)
1319 await pool.destroy()
1320 })
1321
1322 it('Verify that hasTaskFunction() is working', async () => {
1323 const dynamicThreadPool = new DynamicThreadPool(
1324 Math.floor(numberOfWorkers / 2),
1325 numberOfWorkers,
1326 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1327 )
1328 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1329 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1330 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1331 true
1332 )
1333 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1334 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1335 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1336 await dynamicThreadPool.destroy()
1337 const fixedClusterPool = new FixedClusterPool(
1338 numberOfWorkers,
1339 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1340 )
1341 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1342 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1343 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1344 true
1345 )
1346 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1347 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1348 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1349 await fixedClusterPool.destroy()
1350 })
1351
1352 it('Verify that addTaskFunction() is working', async () => {
1353 const dynamicThreadPool = new DynamicThreadPool(
1354 Math.floor(numberOfWorkers / 2),
1355 numberOfWorkers,
1356 './tests/worker-files/thread/testWorker.mjs'
1357 )
1358 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1359 await expect(
1360 dynamicThreadPool.addTaskFunction(0, () => {})
1361 ).rejects.toThrow(new TypeError('name argument must be a string'))
1362 await expect(
1363 dynamicThreadPool.addTaskFunction('', () => {})
1364 ).rejects.toThrow(
1365 new TypeError('name argument must not be an empty string')
1366 )
1367 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1368 new TypeError('taskFunction property must be a function')
1369 )
1370 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1371 new TypeError('taskFunction property must be a function')
1372 )
1373 await expect(
1374 dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
1375 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1376 await expect(
1377 dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
1378 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1379 await expect(
1380 dynamicThreadPool.addTaskFunction('test', {
1381 taskFunction: () => {},
1382 priority: -21,
1383 })
1384 ).rejects.toThrow(
1385 new RangeError("Property 'priority' must be between -20 and 19")
1386 )
1387 await expect(
1388 dynamicThreadPool.addTaskFunction('test', {
1389 taskFunction: () => {},
1390 priority: 20,
1391 })
1392 ).rejects.toThrow(
1393 new RangeError("Property 'priority' must be between -20 and 19")
1394 )
1395 await expect(
1396 dynamicThreadPool.addTaskFunction('test', {
1397 taskFunction: () => {},
1398 strategy: 'invalidStrategy',
1399 })
1400 ).rejects.toThrow(
1401 new Error("Invalid worker choice strategy 'invalidStrategy'")
1402 )
1403 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1404 { name: DEFAULT_TASK_NAME },
1405 { name: 'test' },
1406 ])
1407 expect([
1408 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1409 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1410 const echoTaskFunction = data => {
1411 return data
1412 }
1413 await expect(
1414 dynamicThreadPool.addTaskFunction('echo', {
1415 taskFunction: echoTaskFunction,
1416 strategy: WorkerChoiceStrategies.LEAST_ELU,
1417 })
1418 ).resolves.toBe(true)
1419 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1420 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1421 taskFunction: echoTaskFunction,
1422 strategy: WorkerChoiceStrategies.LEAST_ELU,
1423 })
1424 expect([
1425 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1426 ]).toStrictEqual([
1427 WorkerChoiceStrategies.ROUND_ROBIN,
1428 WorkerChoiceStrategies.LEAST_ELU,
1429 ])
1430 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1431 { name: DEFAULT_TASK_NAME },
1432 { name: 'test' },
1433 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
1434 ])
1435 const taskFunctionData = { test: 'test' }
1436 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1437 expect(echoResult).toStrictEqual(taskFunctionData)
1438 for (const workerNode of dynamicThreadPool.workerNodes) {
1439 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1440 tasks: {
1441 executed: expect.any(Number),
1442 executing: 0,
1443 queued: 0,
1444 sequentiallyStolen: 0,
1445 stolen: 0,
1446 failed: 0,
1447 },
1448 runTime: {
1449 history: expect.any(CircularBuffer),
1450 },
1451 waitTime: {
1452 history: expect.any(CircularBuffer),
1453 },
1454 elu: expect.objectContaining({
1455 idle: expect.objectContaining({
1456 history: expect.any(CircularBuffer),
1457 }),
1458 active: expect.objectContaining({
1459 history: expect.any(CircularBuffer),
1460 }),
1461 }),
1462 })
1463 expect(
1464 workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
1465 ).toBeGreaterThan(0)
1466 if (
1467 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
1468 null
1469 ) {
1470 expect(
1471 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1472 ).toBeUndefined()
1473 } else {
1474 expect(
1475 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1476 ).toBeGreaterThan(0)
1477 }
1478 if (
1479 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
1480 ) {
1481 expect(
1482 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1483 ).toBeUndefined()
1484 } else {
1485 expect(
1486 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1487 ).toBeGreaterThanOrEqual(0)
1488 }
1489 if (
1490 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
1491 ) {
1492 expect(
1493 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1494 ).toBeUndefined()
1495 } else {
1496 expect(
1497 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1498 ).toBeGreaterThanOrEqual(0)
1499 expect(
1500 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1501 ).toBeLessThanOrEqual(1)
1502 }
1503 }
1504 await dynamicThreadPool.destroy()
1505 })
1506
1507 it('Verify that removeTaskFunction() is working', async () => {
1508 const dynamicThreadPool = new DynamicThreadPool(
1509 Math.floor(numberOfWorkers / 2),
1510 numberOfWorkers,
1511 './tests/worker-files/thread/testWorker.mjs'
1512 )
1513 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1514 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1515 { name: DEFAULT_TASK_NAME },
1516 { name: 'test' },
1517 ])
1518 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1519 new Error('Cannot remove a task function not handled on the pool side')
1520 )
1521 const echoTaskFunction = data => {
1522 return data
1523 }
1524 await dynamicThreadPool.addTaskFunction('echo', {
1525 taskFunction: echoTaskFunction,
1526 strategy: WorkerChoiceStrategies.LEAST_ELU,
1527 })
1528 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1529 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1530 taskFunction: echoTaskFunction,
1531 strategy: WorkerChoiceStrategies.LEAST_ELU,
1532 })
1533 expect([
1534 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1535 ]).toStrictEqual([
1536 WorkerChoiceStrategies.ROUND_ROBIN,
1537 WorkerChoiceStrategies.LEAST_ELU,
1538 ])
1539 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1540 { name: DEFAULT_TASK_NAME },
1541 { name: 'test' },
1542 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
1543 ])
1544 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1545 true
1546 )
1547 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1548 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1549 expect([
1550 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1551 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1552 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1553 { name: DEFAULT_TASK_NAME },
1554 { name: 'test' },
1555 ])
1556 await dynamicThreadPool.destroy()
1557 })
1558
1559 it('Verify that listTaskFunctionsProperties() is working', async () => {
1560 const dynamicThreadPool = new DynamicThreadPool(
1561 Math.floor(numberOfWorkers / 2),
1562 numberOfWorkers,
1563 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1564 )
1565 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1566 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1567 { name: DEFAULT_TASK_NAME },
1568 { name: 'jsonIntegerSerialization' },
1569 { name: 'factorial' },
1570 { name: 'fibonacci' },
1571 ])
1572 await dynamicThreadPool.destroy()
1573 const fixedClusterPool = new FixedClusterPool(
1574 numberOfWorkers,
1575 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1576 )
1577 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1578 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1579 { name: DEFAULT_TASK_NAME },
1580 { name: 'jsonIntegerSerialization' },
1581 { name: 'factorial' },
1582 { name: 'fibonacci' },
1583 ])
1584 await fixedClusterPool.destroy()
1585 })
1586
1587 it('Verify that setDefaultTaskFunction() is working', async () => {
1588 const dynamicThreadPool = new DynamicThreadPool(
1589 Math.floor(numberOfWorkers / 2),
1590 numberOfWorkers,
1591 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1592 )
1593 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1594 const workerId = dynamicThreadPool.workerNodes[0].info.id
1595 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1596 new Error(
1597 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1598 )
1599 )
1600 await expect(
1601 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1602 ).rejects.toThrow(
1603 new Error(
1604 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1605 )
1606 )
1607 await expect(
1608 dynamicThreadPool.setDefaultTaskFunction('unknown')
1609 ).rejects.toThrow(
1610 new Error(
1611 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1612 )
1613 )
1614 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1615 { name: DEFAULT_TASK_NAME },
1616 { name: 'jsonIntegerSerialization' },
1617 { name: 'factorial' },
1618 { name: 'fibonacci' },
1619 ])
1620 await expect(
1621 dynamicThreadPool.setDefaultTaskFunction('factorial')
1622 ).resolves.toBe(true)
1623 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1624 { name: DEFAULT_TASK_NAME },
1625 { name: 'factorial' },
1626 { name: 'jsonIntegerSerialization' },
1627 { name: 'fibonacci' },
1628 ])
1629 await expect(
1630 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1631 ).resolves.toBe(true)
1632 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1633 { name: DEFAULT_TASK_NAME },
1634 { name: 'fibonacci' },
1635 { name: 'jsonIntegerSerialization' },
1636 { name: 'factorial' },
1637 ])
1638 await dynamicThreadPool.destroy()
1639 })
1640
1641 it('Verify that multiple task functions worker is working', async () => {
1642 const pool = new DynamicClusterPool(
1643 Math.floor(numberOfWorkers / 2),
1644 numberOfWorkers,
1645 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1646 )
1647 const data = { n: 10 }
1648 const result0 = await pool.execute(data)
1649 expect(result0).toStrictEqual({ ok: 1 })
1650 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1651 expect(result1).toStrictEqual({ ok: 1 })
1652 const result2 = await pool.execute(data, 'factorial')
1653 expect(result2).toBe(3628800)
1654 const result3 = await pool.execute(data, 'fibonacci')
1655 expect(result3).toBe(55)
1656 expect(pool.info.executingTasks).toBe(0)
1657 expect(pool.info.executedTasks).toBe(4)
1658 for (const workerNode of pool.workerNodes) {
1659 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1660 { name: DEFAULT_TASK_NAME },
1661 { name: 'jsonIntegerSerialization' },
1662 { name: 'factorial' },
1663 { name: 'fibonacci' },
1664 ])
1665 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1666 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1667 expect(workerNode.tasksQueue.enablePriority).toBe(false)
1668 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1669 expect(
1670 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1671 ).toStrictEqual({
1672 tasks: {
1673 executed: expect.any(Number),
1674 executing: 0,
1675 failed: 0,
1676 queued: 0,
1677 sequentiallyStolen: 0,
1678 stolen: 0,
1679 },
1680 runTime: {
1681 history: expect.any(CircularBuffer),
1682 },
1683 waitTime: {
1684 history: expect.any(CircularBuffer),
1685 },
1686 elu: {
1687 idle: {
1688 history: expect.any(CircularBuffer),
1689 },
1690 active: {
1691 history: expect.any(CircularBuffer),
1692 },
1693 },
1694 })
1695 expect(
1696 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1697 .tasks.executed
1698 ).toBeGreaterThan(0)
1699 }
1700 expect(
1701 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1702 ).toStrictEqual(
1703 workerNode.getTaskFunctionWorkerUsage(
1704 workerNode.info.taskFunctionsProperties[1].name
1705 )
1706 )
1707 }
1708 await pool.destroy()
1709 })
1710
1711 it('Verify that task function objects worker is working', async () => {
1712 const pool = new DynamicThreadPool(
1713 Math.floor(numberOfWorkers / 2),
1714 numberOfWorkers,
1715 './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs'
1716 )
1717 const data = { n: 10 }
1718 const result0 = await pool.execute(data)
1719 expect(result0).toStrictEqual({ ok: 1 })
1720 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1721 expect(result1).toStrictEqual({ ok: 1 })
1722 const result2 = await pool.execute(data, 'factorial')
1723 expect(result2).toBe(3628800)
1724 const result3 = await pool.execute(data, 'fibonacci')
1725 expect(result3).toBe(55)
1726 expect(pool.info.executingTasks).toBe(0)
1727 expect(pool.info.executedTasks).toBe(4)
1728 for (const workerNode of pool.workerNodes) {
1729 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1730 { name: DEFAULT_TASK_NAME },
1731 { name: 'jsonIntegerSerialization' },
1732 { name: 'factorial' },
1733 { name: 'fibonacci', priority: -5 },
1734 ])
1735 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1736 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1737 expect(workerNode.tasksQueue.enablePriority).toBe(true)
1738 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1739 expect(
1740 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1741 ).toStrictEqual({
1742 tasks: {
1743 executed: expect.any(Number),
1744 executing: 0,
1745 failed: 0,
1746 queued: 0,
1747 sequentiallyStolen: 0,
1748 stolen: 0,
1749 },
1750 runTime: {
1751 history: expect.any(CircularBuffer),
1752 },
1753 waitTime: {
1754 history: expect.any(CircularBuffer),
1755 },
1756 elu: {
1757 idle: {
1758 history: expect.any(CircularBuffer),
1759 },
1760 active: {
1761 history: expect.any(CircularBuffer),
1762 },
1763 },
1764 })
1765 expect(
1766 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1767 .tasks.executed
1768 ).toBeGreaterThan(0)
1769 }
1770 expect(
1771 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1772 ).toStrictEqual(
1773 workerNode.getTaskFunctionWorkerUsage(
1774 workerNode.info.taskFunctionsProperties[1].name
1775 )
1776 )
1777 }
1778 await pool.destroy()
1779 })
1780
1781 it('Verify sendKillMessageToWorker()', async () => {
1782 const pool = new DynamicClusterPool(
1783 Math.floor(numberOfWorkers / 2),
1784 numberOfWorkers,
1785 './tests/worker-files/cluster/testWorker.cjs'
1786 )
1787 const workerNodeKey = 0
1788 await expect(
1789 pool.sendKillMessageToWorker(workerNodeKey)
1790 ).resolves.toBeUndefined()
1791 await pool.destroy()
1792 })
1793
1794 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1795 const pool = new DynamicClusterPool(
1796 Math.floor(numberOfWorkers / 2),
1797 numberOfWorkers,
1798 './tests/worker-files/cluster/testWorker.cjs'
1799 )
1800 const workerNodeKey = 0
1801 await expect(
1802 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1803 taskFunctionOperation: 'add',
1804 taskFunctionProperties: { name: 'empty' },
1805 taskFunction: (() => {}).toString(),
1806 })
1807 ).resolves.toBe(true)
1808 expect(
1809 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
1810 ).toStrictEqual([
1811 { name: DEFAULT_TASK_NAME },
1812 { name: 'test' },
1813 { name: 'empty' },
1814 ])
1815 await pool.destroy()
1816 })
1817
1818 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1819 const pool = new DynamicClusterPool(
1820 Math.floor(numberOfWorkers / 2),
1821 numberOfWorkers,
1822 './tests/worker-files/cluster/testWorker.cjs'
1823 )
1824 await expect(
1825 pool.sendTaskFunctionOperationToWorkers({
1826 taskFunctionOperation: 'add',
1827 taskFunctionProperties: { name: 'empty' },
1828 taskFunction: (() => {}).toString(),
1829 })
1830 ).resolves.toBe(true)
1831 for (const workerNode of pool.workerNodes) {
1832 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1833 { name: DEFAULT_TASK_NAME },
1834 { name: 'test' },
1835 { name: 'empty' },
1836 ])
1837 }
1838 await pool.destroy()
1839 })
1840 })