perf: optimize tasks queuing implementation
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 // eslint-disable-next-line n/no-unsupported-features/node-builtins
2 import { createHook, executionAsyncId } from 'node:async_hooks'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { readFileSync } from 'node:fs'
5 import { dirname, join } from 'node:path'
6 import { fileURLToPath } from 'node:url'
7
8 import { expect } from 'expect'
9 import { restore, stub } from 'sinon'
10
11 import { CircularBuffer } from '../../lib/circular-buffer.cjs'
12 import {
13 DynamicClusterPool,
14 DynamicThreadPool,
15 FixedClusterPool,
16 FixedThreadPool,
17 PoolEvents,
18 PoolTypes,
19 WorkerChoiceStrategies,
20 WorkerTypes,
21 } from '../../lib/index.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23 import { PriorityQueue } from '../../lib/priority-queue.cjs'
24 import { defaultBucketSize } from '../../lib/utility-types.cjs'
25 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
26 import { waitPoolEvents } from '../test-utils.cjs'
27
28 describe('Abstract pool test suite', () => {
29 const version = JSON.parse(
30 readFileSync(
31 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
32 'utf8'
33 )
34 ).version
35 const numberOfWorkers = 2
36 class StubPoolWithIsMain extends FixedThreadPool {
37 isMain () {
38 return false
39 }
40 }
41
42 afterEach(() => {
43 restore()
44 })
45
46 it('Verify that pool can be created and destroyed', async () => {
47 const pool = new FixedThreadPool(
48 numberOfWorkers,
49 './tests/worker-files/thread/testWorker.mjs'
50 )
51 expect(pool).toBeInstanceOf(FixedThreadPool)
52 await pool.destroy()
53 })
54
55 it('Verify that pool cannot be created from a non main thread/process', () => {
56 expect(
57 () =>
58 new StubPoolWithIsMain(
59 numberOfWorkers,
60 './tests/worker-files/thread/testWorker.mjs',
61 {
62 errorHandler: e => console.error(e),
63 }
64 )
65 ).toThrow(
66 new Error(
67 'Cannot start a pool from a worker with the same type as the pool'
68 )
69 )
70 })
71
72 it('Verify that pool statuses properties are set', async () => {
73 const pool = new FixedThreadPool(
74 numberOfWorkers,
75 './tests/worker-files/thread/testWorker.mjs'
76 )
77 expect(pool.started).toBe(true)
78 expect(pool.starting).toBe(false)
79 expect(pool.destroying).toBe(false)
80 await pool.destroy()
81 })
82
83 it('Verify that filePath is checked', () => {
84 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
85 new TypeError('The worker file path must be specified')
86 )
87 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
88 new TypeError('The worker file path must be a string')
89 )
90 expect(
91 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
92 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
93 })
94
95 it('Verify that numberOfWorkers is checked', () => {
96 expect(
97 () =>
98 new FixedThreadPool(
99 undefined,
100 './tests/worker-files/thread/testWorker.mjs'
101 )
102 ).toThrow(
103 new Error(
104 'Cannot instantiate a pool without specifying the number of workers'
105 )
106 )
107 })
108
109 it('Verify that a negative number of workers is checked', () => {
110 expect(
111 () =>
112 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
113 ).toThrow(
114 new RangeError(
115 'Cannot instantiate a pool with a negative number of workers'
116 )
117 )
118 })
119
120 it('Verify that a non integer number of workers is checked', () => {
121 expect(
122 () =>
123 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
124 ).toThrow(
125 new TypeError(
126 'Cannot instantiate a pool with a non safe integer number of workers'
127 )
128 )
129 })
130
131 it('Verify that pool arguments number and pool type are checked', () => {
132 expect(
133 () =>
134 new FixedThreadPool(
135 numberOfWorkers,
136 './tests/worker-files/thread/testWorker.mjs',
137 undefined,
138 numberOfWorkers * 2
139 )
140 ).toThrow(
141 new Error(
142 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 )
144 )
145 })
146
147 it('Verify that dynamic pool sizing is checked', () => {
148 expect(
149 () =>
150 new DynamicClusterPool(
151 1,
152 undefined,
153 './tests/worker-files/cluster/testWorker.cjs'
154 )
155 ).toThrow(
156 new TypeError(
157 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 )
159 )
160 expect(
161 () =>
162 new DynamicThreadPool(
163 0.5,
164 1,
165 './tests/worker-files/thread/testWorker.mjs'
166 )
167 ).toThrow(
168 new TypeError(
169 'Cannot instantiate a pool with a non safe integer number of workers'
170 )
171 )
172 expect(
173 () =>
174 new DynamicClusterPool(
175 0,
176 0.5,
177 './tests/worker-files/cluster/testWorker.cjs'
178 )
179 ).toThrow(
180 new TypeError(
181 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 )
183 )
184 expect(
185 () =>
186 new DynamicThreadPool(
187 2,
188 1,
189 './tests/worker-files/thread/testWorker.mjs'
190 )
191 ).toThrow(
192 new RangeError(
193 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 )
195 )
196 expect(
197 () =>
198 new DynamicThreadPool(
199 0,
200 0,
201 './tests/worker-files/thread/testWorker.mjs'
202 )
203 ).toThrow(
204 new RangeError(
205 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 )
207 )
208 expect(
209 () =>
210 new DynamicClusterPool(
211 1,
212 1,
213 './tests/worker-files/cluster/testWorker.cjs'
214 )
215 ).toThrow(
216 new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 )
220 })
221
222 it('Verify that pool options are checked', async () => {
223 let pool = new FixedThreadPool(
224 numberOfWorkers,
225 './tests/worker-files/thread/testWorker.mjs'
226 )
227 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
228 expect(pool.emitter.eventNames()).toStrictEqual([])
229 expect(pool.opts).toStrictEqual({
230 startWorkers: true,
231 enableEvents: true,
232 restartWorkerOnError: true,
233 enableTasksQueue: false,
234 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
235 })
236 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
237 .workerChoiceStrategies) {
238 expect(workerChoiceStrategy.opts).toStrictEqual({
239 runTime: { median: false },
240 waitTime: { median: false },
241 elu: { median: false },
242 weights: expect.objectContaining({
243 0: expect.any(Number),
244 [pool.info.maxSize - 1]: expect.any(Number),
245 }),
246 })
247 }
248 await pool.destroy()
249 const testHandler = () => console.info('test handler executed')
250 pool = new FixedThreadPool(
251 numberOfWorkers,
252 './tests/worker-files/thread/testWorker.mjs',
253 {
254 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
255 workerChoiceStrategyOptions: {
256 runTime: { median: true },
257 weights: { 0: 300, 1: 200 },
258 },
259 enableEvents: false,
260 restartWorkerOnError: false,
261 enableTasksQueue: true,
262 tasksQueueOptions: { concurrency: 2 },
263 messageHandler: testHandler,
264 errorHandler: testHandler,
265 onlineHandler: testHandler,
266 exitHandler: testHandler,
267 }
268 )
269 expect(pool.emitter).toBeUndefined()
270 expect(pool.opts).toStrictEqual({
271 startWorkers: true,
272 enableEvents: false,
273 restartWorkerOnError: false,
274 enableTasksQueue: true,
275 tasksQueueOptions: {
276 concurrency: 2,
277 size: Math.pow(numberOfWorkers, 2),
278 taskStealing: true,
279 tasksStealingOnBackPressure: false,
280 tasksFinishedTimeout: 2000,
281 },
282 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
283 workerChoiceStrategyOptions: {
284 runTime: { median: true },
285 weights: { 0: 300, 1: 200 },
286 },
287 onlineHandler: testHandler,
288 messageHandler: testHandler,
289 errorHandler: testHandler,
290 exitHandler: testHandler,
291 })
292 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
293 .workerChoiceStrategies) {
294 expect(workerChoiceStrategy.opts).toStrictEqual({
295 runTime: { median: true },
296 waitTime: { median: false },
297 elu: { median: false },
298 weights: { 0: 300, 1: 200 },
299 })
300 }
301 await pool.destroy()
302 })
303
304 it('Verify that pool options are validated', () => {
305 expect(
306 () =>
307 new FixedThreadPool(
308 numberOfWorkers,
309 './tests/worker-files/thread/testWorker.mjs',
310 {
311 workerChoiceStrategy: 'invalidStrategy',
312 }
313 )
314 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
315 expect(
316 () =>
317 new FixedThreadPool(
318 numberOfWorkers,
319 './tests/worker-files/thread/testWorker.mjs',
320 {
321 workerChoiceStrategyOptions: { weights: {} },
322 }
323 )
324 ).toThrow(
325 new Error(
326 'Invalid worker choice strategy options: must have a weight for each worker node'
327 )
328 )
329 expect(
330 () =>
331 new FixedThreadPool(
332 numberOfWorkers,
333 './tests/worker-files/thread/testWorker.mjs',
334 {
335 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' },
336 }
337 )
338 ).toThrow(
339 new Error(
340 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
341 )
342 )
343 expect(
344 () =>
345 new FixedThreadPool(
346 numberOfWorkers,
347 './tests/worker-files/thread/testWorker.mjs',
348 {
349 enableTasksQueue: true,
350 tasksQueueOptions: 'invalidTasksQueueOptions',
351 }
352 )
353 ).toThrow(
354 new TypeError('Invalid tasks queue options: must be a plain object')
355 )
356 expect(
357 () =>
358 new FixedThreadPool(
359 numberOfWorkers,
360 './tests/worker-files/thread/testWorker.mjs',
361 {
362 enableTasksQueue: true,
363 tasksQueueOptions: { concurrency: 0 },
364 }
365 )
366 ).toThrow(
367 new RangeError(
368 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
369 )
370 )
371 expect(
372 () =>
373 new FixedThreadPool(
374 numberOfWorkers,
375 './tests/worker-files/thread/testWorker.mjs',
376 {
377 enableTasksQueue: true,
378 tasksQueueOptions: { concurrency: -1 },
379 }
380 )
381 ).toThrow(
382 new RangeError(
383 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
384 )
385 )
386 expect(
387 () =>
388 new FixedThreadPool(
389 numberOfWorkers,
390 './tests/worker-files/thread/testWorker.mjs',
391 {
392 enableTasksQueue: true,
393 tasksQueueOptions: { concurrency: 0.2 },
394 }
395 )
396 ).toThrow(
397 new TypeError('Invalid worker node tasks concurrency: must be an integer')
398 )
399 expect(
400 () =>
401 new FixedThreadPool(
402 numberOfWorkers,
403 './tests/worker-files/thread/testWorker.mjs',
404 {
405 enableTasksQueue: true,
406 tasksQueueOptions: { size: 0 },
407 }
408 )
409 ).toThrow(
410 new RangeError(
411 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
412 )
413 )
414 expect(
415 () =>
416 new FixedThreadPool(
417 numberOfWorkers,
418 './tests/worker-files/thread/testWorker.mjs',
419 {
420 enableTasksQueue: true,
421 tasksQueueOptions: { size: -1 },
422 }
423 )
424 ).toThrow(
425 new RangeError(
426 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
427 )
428 )
429 expect(
430 () =>
431 new FixedThreadPool(
432 numberOfWorkers,
433 './tests/worker-files/thread/testWorker.mjs',
434 {
435 enableTasksQueue: true,
436 tasksQueueOptions: { size: 0.2 },
437 }
438 )
439 ).toThrow(
440 new TypeError('Invalid worker node tasks queue size: must be an integer')
441 )
442 })
443
444 it('Verify that pool worker choice strategy options can be set', async () => {
445 const pool = new FixedThreadPool(
446 numberOfWorkers,
447 './tests/worker-files/thread/testWorker.mjs',
448 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
449 )
450 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
451 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
452 .workerChoiceStrategies) {
453 expect(workerChoiceStrategy.opts).toStrictEqual({
454 runTime: { median: false },
455 waitTime: { median: false },
456 elu: { median: false },
457 weights: expect.objectContaining({
458 0: expect.any(Number),
459 [pool.info.maxSize - 1]: expect.any(Number),
460 }),
461 })
462 }
463 expect(
464 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
465 ).toStrictEqual({
466 runTime: {
467 aggregate: true,
468 average: true,
469 median: false,
470 },
471 waitTime: {
472 aggregate: true,
473 average: true,
474 median: false,
475 },
476 elu: {
477 aggregate: true,
478 average: true,
479 median: false,
480 },
481 })
482 pool.setWorkerChoiceStrategyOptions({
483 runTime: { median: true },
484 elu: { median: true },
485 })
486 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
487 runTime: { median: true },
488 elu: { median: true },
489 })
490 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
491 .workerChoiceStrategies) {
492 expect(workerChoiceStrategy.opts).toStrictEqual({
493 runTime: { median: true },
494 waitTime: { median: false },
495 elu: { median: true },
496 weights: expect.objectContaining({
497 0: expect.any(Number),
498 [pool.info.maxSize - 1]: expect.any(Number),
499 }),
500 })
501 }
502 expect(
503 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
504 ).toStrictEqual({
505 runTime: {
506 aggregate: true,
507 average: false,
508 median: true,
509 },
510 waitTime: {
511 aggregate: true,
512 average: true,
513 median: false,
514 },
515 elu: {
516 aggregate: true,
517 average: false,
518 median: true,
519 },
520 })
521 pool.setWorkerChoiceStrategyOptions({
522 runTime: { median: false },
523 elu: { median: false },
524 })
525 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
526 runTime: { median: false },
527 elu: { median: false },
528 })
529 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
530 .workerChoiceStrategies) {
531 expect(workerChoiceStrategy.opts).toStrictEqual({
532 runTime: { median: false },
533 waitTime: { median: false },
534 elu: { median: false },
535 weights: expect.objectContaining({
536 0: expect.any(Number),
537 [pool.info.maxSize - 1]: expect.any(Number),
538 }),
539 })
540 }
541 expect(
542 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
543 ).toStrictEqual({
544 runTime: {
545 aggregate: true,
546 average: true,
547 median: false,
548 },
549 waitTime: {
550 aggregate: true,
551 average: true,
552 median: false,
553 },
554 elu: {
555 aggregate: true,
556 average: true,
557 median: false,
558 },
559 })
560 expect(() =>
561 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
562 ).toThrow(
563 new TypeError(
564 'Invalid worker choice strategy options: must be a plain object'
565 )
566 )
567 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
568 new Error(
569 'Invalid worker choice strategy options: must have a weight for each worker node'
570 )
571 )
572 expect(() =>
573 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
574 ).toThrow(
575 new Error(
576 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
577 )
578 )
579 await pool.destroy()
580 })
581
582 it('Verify that pool tasks queue can be enabled/disabled', async () => {
583 const pool = new FixedThreadPool(
584 numberOfWorkers,
585 './tests/worker-files/thread/testWorker.mjs'
586 )
587 expect(pool.opts.enableTasksQueue).toBe(false)
588 expect(pool.opts.tasksQueueOptions).toBeUndefined()
589 pool.enableTasksQueue(true)
590 expect(pool.opts.enableTasksQueue).toBe(true)
591 expect(pool.opts.tasksQueueOptions).toStrictEqual({
592 concurrency: 1,
593 size: Math.pow(numberOfWorkers, 2),
594 taskStealing: true,
595 tasksStealingOnBackPressure: false,
596 tasksFinishedTimeout: 2000,
597 })
598 pool.enableTasksQueue(true, { concurrency: 2 })
599 expect(pool.opts.enableTasksQueue).toBe(true)
600 expect(pool.opts.tasksQueueOptions).toStrictEqual({
601 concurrency: 2,
602 size: Math.pow(numberOfWorkers, 2),
603 taskStealing: true,
604 tasksStealingOnBackPressure: false,
605 tasksFinishedTimeout: 2000,
606 })
607 pool.enableTasksQueue(false)
608 expect(pool.opts.enableTasksQueue).toBe(false)
609 expect(pool.opts.tasksQueueOptions).toBeUndefined()
610 await pool.destroy()
611 })
612
613 it('Verify that pool tasks queue options can be set', async () => {
614 const pool = new FixedThreadPool(
615 numberOfWorkers,
616 './tests/worker-files/thread/testWorker.mjs',
617 { enableTasksQueue: true }
618 )
619 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 concurrency: 1,
621 size: Math.pow(numberOfWorkers, 2),
622 taskStealing: true,
623 tasksStealingOnBackPressure: false,
624 tasksFinishedTimeout: 2000,
625 })
626 for (const workerNode of pool.workerNodes) {
627 expect(workerNode.tasksQueueBackPressureSize).toBe(
628 pool.opts.tasksQueueOptions.size
629 )
630 }
631 pool.setTasksQueueOptions({
632 concurrency: 2,
633 size: 2,
634 taskStealing: false,
635 tasksStealingOnBackPressure: false,
636 tasksFinishedTimeout: 3000,
637 })
638 expect(pool.opts.tasksQueueOptions).toStrictEqual({
639 concurrency: 2,
640 size: 2,
641 taskStealing: false,
642 tasksStealingOnBackPressure: false,
643 tasksFinishedTimeout: 3000,
644 })
645 for (const workerNode of pool.workerNodes) {
646 expect(workerNode.tasksQueueBackPressureSize).toBe(
647 pool.opts.tasksQueueOptions.size
648 )
649 }
650 pool.setTasksQueueOptions({
651 concurrency: 1,
652 taskStealing: true,
653 tasksStealingOnBackPressure: true,
654 })
655 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 concurrency: 1,
657 size: Math.pow(numberOfWorkers, 2),
658 taskStealing: true,
659 tasksStealingOnBackPressure: true,
660 tasksFinishedTimeout: 2000,
661 })
662 for (const workerNode of pool.workerNodes) {
663 expect(workerNode.tasksQueueBackPressureSize).toBe(
664 pool.opts.tasksQueueOptions.size
665 )
666 }
667 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
668 new TypeError('Invalid tasks queue options: must be a plain object')
669 )
670 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
671 new RangeError(
672 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
673 )
674 )
675 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
676 new RangeError(
677 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
678 )
679 )
680 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
681 new TypeError('Invalid worker node tasks concurrency: must be an integer')
682 )
683 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
684 new RangeError(
685 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
686 )
687 )
688 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
689 new RangeError(
690 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
691 )
692 )
693 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
694 new TypeError('Invalid worker node tasks queue size: must be an integer')
695 )
696 await pool.destroy()
697 })
698
699 it('Verify that pool info is set', async () => {
700 let pool = new FixedThreadPool(
701 numberOfWorkers,
702 './tests/worker-files/thread/testWorker.mjs'
703 )
704 expect(pool.info).toStrictEqual({
705 version,
706 type: PoolTypes.fixed,
707 worker: WorkerTypes.thread,
708 started: true,
709 ready: true,
710 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
711 strategyRetries: 0,
712 minSize: numberOfWorkers,
713 maxSize: numberOfWorkers,
714 workerNodes: numberOfWorkers,
715 idleWorkerNodes: numberOfWorkers,
716 busyWorkerNodes: 0,
717 executedTasks: 0,
718 executingTasks: 0,
719 failedTasks: 0,
720 })
721 await pool.destroy()
722 pool = new DynamicClusterPool(
723 Math.floor(numberOfWorkers / 2),
724 numberOfWorkers,
725 './tests/worker-files/cluster/testWorker.cjs'
726 )
727 expect(pool.info).toStrictEqual({
728 version,
729 type: PoolTypes.dynamic,
730 worker: WorkerTypes.cluster,
731 started: true,
732 ready: true,
733 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
734 strategyRetries: 0,
735 minSize: Math.floor(numberOfWorkers / 2),
736 maxSize: numberOfWorkers,
737 workerNodes: Math.floor(numberOfWorkers / 2),
738 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
739 busyWorkerNodes: 0,
740 executedTasks: 0,
741 executingTasks: 0,
742 failedTasks: 0,
743 })
744 await pool.destroy()
745 })
746
747 it('Verify that pool worker tasks usage are initialized', async () => {
748 const pool = new FixedClusterPool(
749 numberOfWorkers,
750 './tests/worker-files/cluster/testWorker.cjs'
751 )
752 for (const workerNode of pool.workerNodes) {
753 expect(workerNode).toBeInstanceOf(WorkerNode)
754 expect(workerNode.usage).toStrictEqual({
755 tasks: {
756 executed: 0,
757 executing: 0,
758 queued: 0,
759 maxQueued: 0,
760 sequentiallyStolen: 0,
761 stolen: 0,
762 failed: 0,
763 },
764 runTime: {
765 history: expect.any(CircularBuffer),
766 },
767 waitTime: {
768 history: expect.any(CircularBuffer),
769 },
770 elu: {
771 idle: {
772 history: expect.any(CircularBuffer),
773 },
774 active: {
775 history: expect.any(CircularBuffer),
776 },
777 },
778 })
779 }
780 await pool.destroy()
781 })
782
783 it('Verify that pool worker tasks queue are initialized', async () => {
784 let pool = new FixedClusterPool(
785 numberOfWorkers,
786 './tests/worker-files/cluster/testWorker.cjs'
787 )
788 for (const workerNode of pool.workerNodes) {
789 expect(workerNode).toBeInstanceOf(WorkerNode)
790 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
791 expect(workerNode.tasksQueue.size).toBe(0)
792 expect(workerNode.tasksQueue.maxSize).toBe(0)
793 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
794 expect(workerNode.tasksQueue.enablePriority).toBe(false)
795 }
796 await pool.destroy()
797 pool = new DynamicThreadPool(
798 Math.floor(numberOfWorkers / 2),
799 numberOfWorkers,
800 './tests/worker-files/thread/testWorker.mjs'
801 )
802 for (const workerNode of pool.workerNodes) {
803 expect(workerNode).toBeInstanceOf(WorkerNode)
804 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
805 expect(workerNode.tasksQueue.size).toBe(0)
806 expect(workerNode.tasksQueue.maxSize).toBe(0)
807 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
808 expect(workerNode.tasksQueue.enablePriority).toBe(false)
809 }
810 await pool.destroy()
811 })
812
813 it('Verify that pool worker info are initialized', async () => {
814 let pool = new FixedClusterPool(
815 numberOfWorkers,
816 './tests/worker-files/cluster/testWorker.cjs'
817 )
818 for (const workerNode of pool.workerNodes) {
819 expect(workerNode).toBeInstanceOf(WorkerNode)
820 expect(workerNode.info).toStrictEqual({
821 id: expect.any(Number),
822 type: WorkerTypes.cluster,
823 dynamic: false,
824 ready: true,
825 stealing: false,
826 backPressure: false,
827 })
828 }
829 await pool.destroy()
830 pool = new DynamicThreadPool(
831 Math.floor(numberOfWorkers / 2),
832 numberOfWorkers,
833 './tests/worker-files/thread/testWorker.mjs'
834 )
835 for (const workerNode of pool.workerNodes) {
836 expect(workerNode).toBeInstanceOf(WorkerNode)
837 expect(workerNode.info).toStrictEqual({
838 id: expect.any(Number),
839 type: WorkerTypes.thread,
840 dynamic: false,
841 ready: true,
842 stealing: false,
843 backPressure: false,
844 })
845 }
846 await pool.destroy()
847 })
848
849 it('Verify that pool statuses are checked at start or destroy', async () => {
850 const pool = new FixedThreadPool(
851 numberOfWorkers,
852 './tests/worker-files/thread/testWorker.mjs'
853 )
854 expect(pool.info.started).toBe(true)
855 expect(pool.info.ready).toBe(true)
856 expect(() => pool.start()).toThrow(
857 new Error('Cannot start an already started pool')
858 )
859 await pool.destroy()
860 expect(pool.info.started).toBe(false)
861 expect(pool.info.ready).toBe(false)
862 await expect(pool.destroy()).rejects.toThrow(
863 new Error('Cannot destroy an already destroyed pool')
864 )
865 })
866
867 it('Verify that pool can be started after initialization', async () => {
868 const pool = new FixedClusterPool(
869 numberOfWorkers,
870 './tests/worker-files/cluster/testWorker.cjs',
871 {
872 startWorkers: false,
873 }
874 )
875 expect(pool.info.started).toBe(false)
876 expect(pool.info.ready).toBe(false)
877 expect(pool.workerNodes).toStrictEqual([])
878 expect(pool.readyEventEmitted).toBe(false)
879 await expect(pool.execute()).rejects.toThrow(
880 new Error('Cannot execute a task on not started pool')
881 )
882 pool.start()
883 expect(pool.info.started).toBe(true)
884 expect(pool.info.ready).toBe(true)
885 await waitPoolEvents(pool, PoolEvents.ready, 1)
886 expect(pool.readyEventEmitted).toBe(true)
887 expect(pool.workerNodes.length).toBe(numberOfWorkers)
888 for (const workerNode of pool.workerNodes) {
889 expect(workerNode).toBeInstanceOf(WorkerNode)
890 }
891 await pool.destroy()
892 })
893
894 it('Verify that pool execute() arguments are checked', async () => {
895 const pool = new FixedClusterPool(
896 numberOfWorkers,
897 './tests/worker-files/cluster/testWorker.cjs'
898 )
899 await expect(pool.execute(undefined, 0)).rejects.toThrow(
900 new TypeError('name argument must be a string')
901 )
902 await expect(pool.execute(undefined, '')).rejects.toThrow(
903 new TypeError('name argument must not be an empty string')
904 )
905 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
906 new TypeError('transferList argument must be an array')
907 )
908 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
909 "Task function 'unknown' not found"
910 )
911 await pool.destroy()
912 await expect(pool.execute()).rejects.toThrow(
913 new Error('Cannot execute a task on not started pool')
914 )
915 })
916
917 it('Verify that pool worker tasks usage are computed', async () => {
918 const pool = new FixedClusterPool(
919 numberOfWorkers,
920 './tests/worker-files/cluster/testWorker.cjs'
921 )
922 const promises = new Set()
923 const maxMultiplier = 2
924 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
925 promises.add(pool.execute())
926 }
927 for (const workerNode of pool.workerNodes) {
928 expect(workerNode.usage).toStrictEqual({
929 tasks: {
930 executed: 0,
931 executing: maxMultiplier,
932 queued: 0,
933 maxQueued: 0,
934 sequentiallyStolen: 0,
935 stolen: 0,
936 failed: 0,
937 },
938 runTime: {
939 history: expect.any(CircularBuffer),
940 },
941 waitTime: {
942 history: expect.any(CircularBuffer),
943 },
944 elu: {
945 idle: {
946 history: expect.any(CircularBuffer),
947 },
948 active: {
949 history: expect.any(CircularBuffer),
950 },
951 },
952 })
953 }
954 await Promise.all(promises)
955 for (const workerNode of pool.workerNodes) {
956 expect(workerNode.usage).toStrictEqual({
957 tasks: {
958 executed: maxMultiplier,
959 executing: 0,
960 queued: 0,
961 maxQueued: 0,
962 sequentiallyStolen: 0,
963 stolen: 0,
964 failed: 0,
965 },
966 runTime: {
967 history: expect.any(CircularBuffer),
968 },
969 waitTime: {
970 history: expect.any(CircularBuffer),
971 },
972 elu: {
973 idle: {
974 history: expect.any(CircularBuffer),
975 },
976 active: {
977 history: expect.any(CircularBuffer),
978 },
979 },
980 })
981 }
982 await pool.destroy()
983 })
984
985 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
986 const pool = new DynamicThreadPool(
987 Math.floor(numberOfWorkers / 2),
988 numberOfWorkers,
989 './tests/worker-files/thread/testWorker.mjs'
990 )
991 const promises = new Set()
992 const maxMultiplier = 2
993 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
994 promises.add(pool.execute())
995 }
996 await Promise.all(promises)
997 for (const workerNode of pool.workerNodes) {
998 expect(workerNode.usage).toStrictEqual({
999 tasks: {
1000 executed: expect.any(Number),
1001 executing: 0,
1002 queued: 0,
1003 maxQueued: 0,
1004 sequentiallyStolen: 0,
1005 stolen: 0,
1006 failed: 0,
1007 },
1008 runTime: {
1009 history: expect.any(CircularBuffer),
1010 },
1011 waitTime: {
1012 history: expect.any(CircularBuffer),
1013 },
1014 elu: {
1015 idle: {
1016 history: expect.any(CircularBuffer),
1017 },
1018 active: {
1019 history: expect.any(CircularBuffer),
1020 },
1021 },
1022 })
1023 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1024 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1025 numberOfWorkers * maxMultiplier
1026 )
1027 }
1028 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1029 for (const workerNode of pool.workerNodes) {
1030 expect(workerNode.usage).toStrictEqual({
1031 tasks: {
1032 executed: expect.any(Number),
1033 executing: 0,
1034 queued: 0,
1035 maxQueued: 0,
1036 sequentiallyStolen: 0,
1037 stolen: 0,
1038 failed: 0,
1039 },
1040 runTime: {
1041 history: expect.any(CircularBuffer),
1042 },
1043 waitTime: {
1044 history: expect.any(CircularBuffer),
1045 },
1046 elu: {
1047 idle: {
1048 history: expect.any(CircularBuffer),
1049 },
1050 active: {
1051 history: expect.any(CircularBuffer),
1052 },
1053 },
1054 })
1055 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1056 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1057 numberOfWorkers * maxMultiplier
1058 )
1059 }
1060 await pool.destroy()
1061 })
1062
1063 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1064 const pool = new DynamicClusterPool(
1065 Math.floor(numberOfWorkers / 2),
1066 numberOfWorkers,
1067 './tests/worker-files/cluster/testWorker.cjs'
1068 )
1069 expect(pool.emitter.eventNames()).toStrictEqual([])
1070 let poolInfo
1071 let poolReady = 0
1072 pool.emitter.on(PoolEvents.ready, info => {
1073 ++poolReady
1074 poolInfo = info
1075 })
1076 await waitPoolEvents(pool, PoolEvents.ready, 1)
1077 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1078 expect(poolReady).toBe(1)
1079 expect(poolInfo).toStrictEqual({
1080 version,
1081 type: PoolTypes.dynamic,
1082 worker: WorkerTypes.cluster,
1083 started: true,
1084 ready: true,
1085 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1086 strategyRetries: expect.any(Number),
1087 minSize: expect.any(Number),
1088 maxSize: expect.any(Number),
1089 workerNodes: expect.any(Number),
1090 idleWorkerNodes: expect.any(Number),
1091 busyWorkerNodes: expect.any(Number),
1092 executedTasks: expect.any(Number),
1093 executingTasks: expect.any(Number),
1094 failedTasks: expect.any(Number),
1095 })
1096 await pool.destroy()
1097 })
1098
1099 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1100 const pool = new FixedThreadPool(
1101 numberOfWorkers,
1102 './tests/worker-files/thread/testWorker.mjs'
1103 )
1104 expect(pool.emitter.eventNames()).toStrictEqual([])
1105 const promises = new Set()
1106 let poolBusy = 0
1107 let poolInfo
1108 pool.emitter.on(PoolEvents.busy, info => {
1109 ++poolBusy
1110 poolInfo = info
1111 })
1112 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1113 for (let i = 0; i < numberOfWorkers * 2; i++) {
1114 promises.add(pool.execute())
1115 }
1116 await Promise.all(promises)
1117 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1118 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1119 expect(poolBusy).toBe(numberOfWorkers + 1)
1120 expect(poolInfo).toStrictEqual({
1121 version,
1122 type: PoolTypes.fixed,
1123 worker: WorkerTypes.thread,
1124 started: true,
1125 ready: true,
1126 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1127 strategyRetries: expect.any(Number),
1128 minSize: expect.any(Number),
1129 maxSize: expect.any(Number),
1130 workerNodes: expect.any(Number),
1131 idleWorkerNodes: expect.any(Number),
1132 busyWorkerNodes: expect.any(Number),
1133 executedTasks: expect.any(Number),
1134 executingTasks: expect.any(Number),
1135 failedTasks: expect.any(Number),
1136 })
1137 await pool.destroy()
1138 })
1139
1140 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1141 const pool = new DynamicThreadPool(
1142 Math.floor(numberOfWorkers / 2),
1143 numberOfWorkers,
1144 './tests/worker-files/thread/testWorker.mjs'
1145 )
1146 expect(pool.emitter.eventNames()).toStrictEqual([])
1147 const promises = new Set()
1148 let poolFull = 0
1149 let poolInfo
1150 pool.emitter.on(PoolEvents.full, info => {
1151 ++poolFull
1152 poolInfo = info
1153 })
1154 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1155 for (let i = 0; i < numberOfWorkers * 2; i++) {
1156 promises.add(pool.execute())
1157 }
1158 await Promise.all(promises)
1159 expect(poolFull).toBe(1)
1160 expect(poolInfo).toStrictEqual({
1161 version,
1162 type: PoolTypes.dynamic,
1163 worker: WorkerTypes.thread,
1164 started: true,
1165 ready: true,
1166 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1167 strategyRetries: expect.any(Number),
1168 minSize: expect.any(Number),
1169 maxSize: expect.any(Number),
1170 workerNodes: expect.any(Number),
1171 idleWorkerNodes: expect.any(Number),
1172 busyWorkerNodes: expect.any(Number),
1173 executedTasks: expect.any(Number),
1174 executingTasks: expect.any(Number),
1175 failedTasks: expect.any(Number),
1176 })
1177 await pool.destroy()
1178 })
1179
1180 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1181 const pool = new FixedThreadPool(
1182 numberOfWorkers,
1183 './tests/worker-files/thread/testWorker.mjs',
1184 {
1185 enableTasksQueue: true,
1186 }
1187 )
1188 stub(pool, 'hasBackPressure').returns(true)
1189 expect(pool.emitter.eventNames()).toStrictEqual([])
1190 const promises = new Set()
1191 let poolBackPressure = 0
1192 let poolInfo
1193 pool.emitter.on(PoolEvents.backPressure, info => {
1194 ++poolBackPressure
1195 poolInfo = info
1196 })
1197 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1198 for (let i = 0; i < numberOfWorkers + 1; i++) {
1199 promises.add(pool.execute())
1200 }
1201 await Promise.all(promises)
1202 expect(poolBackPressure).toBe(1)
1203 expect(poolInfo).toStrictEqual({
1204 version,
1205 type: PoolTypes.fixed,
1206 worker: WorkerTypes.thread,
1207 started: true,
1208 ready: true,
1209 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1210 strategyRetries: expect.any(Number),
1211 minSize: expect.any(Number),
1212 maxSize: expect.any(Number),
1213 workerNodes: expect.any(Number),
1214 idleWorkerNodes: expect.any(Number),
1215 stealingWorkerNodes: expect.any(Number),
1216 busyWorkerNodes: expect.any(Number),
1217 executedTasks: expect.any(Number),
1218 executingTasks: expect.any(Number),
1219 maxQueuedTasks: expect.any(Number),
1220 queuedTasks: expect.any(Number),
1221 backPressure: true,
1222 stolenTasks: expect.any(Number),
1223 failedTasks: expect.any(Number),
1224 })
1225 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1226 await pool.destroy()
1227 })
1228
1229 it('Verify that destroy() waits for queued tasks to finish', async () => {
1230 const tasksFinishedTimeout = 2500
1231 const pool = new FixedThreadPool(
1232 numberOfWorkers,
1233 './tests/worker-files/thread/asyncWorker.mjs',
1234 {
1235 enableTasksQueue: true,
1236 tasksQueueOptions: { tasksFinishedTimeout },
1237 }
1238 )
1239 const maxMultiplier = 4
1240 let tasksFinished = 0
1241 for (const workerNode of pool.workerNodes) {
1242 workerNode.on('taskFinished', () => {
1243 ++tasksFinished
1244 })
1245 }
1246 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1247 pool.execute()
1248 }
1249 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1250 const startTime = performance.now()
1251 await pool.destroy()
1252 const elapsedTime = performance.now() - startTime
1253 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1254 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1255 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1256 })
1257
1258 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1259 const tasksFinishedTimeout = 1000
1260 const pool = new FixedThreadPool(
1261 numberOfWorkers,
1262 './tests/worker-files/thread/asyncWorker.mjs',
1263 {
1264 enableTasksQueue: true,
1265 tasksQueueOptions: { tasksFinishedTimeout },
1266 }
1267 )
1268 const maxMultiplier = 4
1269 let tasksFinished = 0
1270 for (const workerNode of pool.workerNodes) {
1271 workerNode.on('taskFinished', () => {
1272 ++tasksFinished
1273 })
1274 }
1275 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1276 pool.execute()
1277 }
1278 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1279 const startTime = performance.now()
1280 await pool.destroy()
1281 const elapsedTime = performance.now() - startTime
1282 expect(tasksFinished).toBe(0)
1283 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1284 })
1285
1286 it('Verify that pool asynchronous resource track tasks execution', async () => {
1287 let taskAsyncId
1288 let initCalls = 0
1289 let beforeCalls = 0
1290 let afterCalls = 0
1291 let resolveCalls = 0
1292 const hook = createHook({
1293 init (asyncId, type) {
1294 if (type === 'poolifier:task') {
1295 initCalls++
1296 taskAsyncId = asyncId
1297 }
1298 },
1299 before (asyncId) {
1300 if (asyncId === taskAsyncId) beforeCalls++
1301 },
1302 after (asyncId) {
1303 if (asyncId === taskAsyncId) afterCalls++
1304 },
1305 promiseResolve () {
1306 if (executionAsyncId() === taskAsyncId) resolveCalls++
1307 },
1308 })
1309 const pool = new FixedThreadPool(
1310 numberOfWorkers,
1311 './tests/worker-files/thread/testWorker.mjs'
1312 )
1313 hook.enable()
1314 await pool.execute()
1315 hook.disable()
1316 expect(initCalls).toBe(1)
1317 expect(beforeCalls).toBe(1)
1318 expect(afterCalls).toBe(1)
1319 expect(resolveCalls).toBe(1)
1320 await pool.destroy()
1321 })
1322
1323 it('Verify that hasTaskFunction() is working', async () => {
1324 const dynamicThreadPool = new DynamicThreadPool(
1325 Math.floor(numberOfWorkers / 2),
1326 numberOfWorkers,
1327 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1328 )
1329 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1330 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1331 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1332 true
1333 )
1334 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1335 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1336 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1337 await dynamicThreadPool.destroy()
1338 const fixedClusterPool = new FixedClusterPool(
1339 numberOfWorkers,
1340 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1341 )
1342 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1343 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1344 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1345 true
1346 )
1347 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1348 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1349 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1350 await fixedClusterPool.destroy()
1351 })
1352
1353 it('Verify that addTaskFunction() is working', async () => {
1354 const dynamicThreadPool = new DynamicThreadPool(
1355 Math.floor(numberOfWorkers / 2),
1356 numberOfWorkers,
1357 './tests/worker-files/thread/testWorker.mjs'
1358 )
1359 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1360 await expect(
1361 dynamicThreadPool.addTaskFunction(0, () => {})
1362 ).rejects.toThrow(new TypeError('name argument must be a string'))
1363 await expect(
1364 dynamicThreadPool.addTaskFunction('', () => {})
1365 ).rejects.toThrow(
1366 new TypeError('name argument must not be an empty string')
1367 )
1368 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1369 new TypeError('taskFunction property must be a function')
1370 )
1371 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1372 new TypeError('taskFunction property must be a function')
1373 )
1374 await expect(
1375 dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
1376 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1377 await expect(
1378 dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
1379 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1380 await expect(
1381 dynamicThreadPool.addTaskFunction('test', {
1382 taskFunction: () => {},
1383 priority: -21,
1384 })
1385 ).rejects.toThrow(
1386 new RangeError("Property 'priority' must be between -20 and 19")
1387 )
1388 await expect(
1389 dynamicThreadPool.addTaskFunction('test', {
1390 taskFunction: () => {},
1391 priority: 20,
1392 })
1393 ).rejects.toThrow(
1394 new RangeError("Property 'priority' must be between -20 and 19")
1395 )
1396 await expect(
1397 dynamicThreadPool.addTaskFunction('test', {
1398 taskFunction: () => {},
1399 strategy: 'invalidStrategy',
1400 })
1401 ).rejects.toThrow(
1402 new Error("Invalid worker choice strategy 'invalidStrategy'")
1403 )
1404 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1405 { name: DEFAULT_TASK_NAME },
1406 { name: 'test' },
1407 ])
1408 expect([
1409 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1410 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1411 const echoTaskFunction = data => {
1412 return data
1413 }
1414 await expect(
1415 dynamicThreadPool.addTaskFunction('echo', {
1416 taskFunction: echoTaskFunction,
1417 strategy: WorkerChoiceStrategies.LEAST_ELU,
1418 })
1419 ).resolves.toBe(true)
1420 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1421 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1422 taskFunction: echoTaskFunction,
1423 strategy: WorkerChoiceStrategies.LEAST_ELU,
1424 })
1425 expect([
1426 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1427 ]).toStrictEqual([
1428 WorkerChoiceStrategies.ROUND_ROBIN,
1429 WorkerChoiceStrategies.LEAST_ELU,
1430 ])
1431 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1432 { name: DEFAULT_TASK_NAME },
1433 { name: 'test' },
1434 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
1435 ])
1436 const taskFunctionData = { test: 'test' }
1437 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1438 expect(echoResult).toStrictEqual(taskFunctionData)
1439 for (const workerNode of dynamicThreadPool.workerNodes) {
1440 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1441 tasks: {
1442 executed: expect.any(Number),
1443 executing: 0,
1444 queued: 0,
1445 sequentiallyStolen: 0,
1446 stolen: 0,
1447 failed: 0,
1448 },
1449 runTime: {
1450 history: expect.any(CircularBuffer),
1451 },
1452 waitTime: {
1453 history: expect.any(CircularBuffer),
1454 },
1455 elu: expect.objectContaining({
1456 idle: expect.objectContaining({
1457 history: expect.any(CircularBuffer),
1458 }),
1459 active: expect.objectContaining({
1460 history: expect.any(CircularBuffer),
1461 }),
1462 }),
1463 })
1464 expect(
1465 workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
1466 ).toBeGreaterThan(0)
1467 if (
1468 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
1469 null
1470 ) {
1471 expect(
1472 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1473 ).toBeUndefined()
1474 } else {
1475 expect(
1476 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1477 ).toBeGreaterThan(0)
1478 }
1479 if (
1480 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
1481 ) {
1482 expect(
1483 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1484 ).toBeUndefined()
1485 } else {
1486 expect(
1487 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1488 ).toBeGreaterThanOrEqual(0)
1489 }
1490 if (
1491 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
1492 ) {
1493 expect(
1494 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1495 ).toBeUndefined()
1496 } else {
1497 expect(
1498 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1499 ).toBeGreaterThanOrEqual(0)
1500 expect(
1501 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1502 ).toBeLessThanOrEqual(1)
1503 }
1504 }
1505 await dynamicThreadPool.destroy()
1506 })
1507
1508 it('Verify that removeTaskFunction() is working', async () => {
1509 const dynamicThreadPool = new DynamicThreadPool(
1510 Math.floor(numberOfWorkers / 2),
1511 numberOfWorkers,
1512 './tests/worker-files/thread/testWorker.mjs'
1513 )
1514 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1515 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1516 { name: DEFAULT_TASK_NAME },
1517 { name: 'test' },
1518 ])
1519 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1520 new Error('Cannot remove a task function not handled on the pool side')
1521 )
1522 const echoTaskFunction = data => {
1523 return data
1524 }
1525 await dynamicThreadPool.addTaskFunction('echo', {
1526 taskFunction: echoTaskFunction,
1527 strategy: WorkerChoiceStrategies.LEAST_ELU,
1528 })
1529 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1530 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1531 taskFunction: echoTaskFunction,
1532 strategy: WorkerChoiceStrategies.LEAST_ELU,
1533 })
1534 expect([
1535 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1536 ]).toStrictEqual([
1537 WorkerChoiceStrategies.ROUND_ROBIN,
1538 WorkerChoiceStrategies.LEAST_ELU,
1539 ])
1540 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1541 { name: DEFAULT_TASK_NAME },
1542 { name: 'test' },
1543 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
1544 ])
1545 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1546 true
1547 )
1548 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1549 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1550 expect([
1551 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1552 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1553 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1554 { name: DEFAULT_TASK_NAME },
1555 { name: 'test' },
1556 ])
1557 await dynamicThreadPool.destroy()
1558 })
1559
1560 it('Verify that listTaskFunctionsProperties() is working', async () => {
1561 const dynamicThreadPool = new DynamicThreadPool(
1562 Math.floor(numberOfWorkers / 2),
1563 numberOfWorkers,
1564 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1565 )
1566 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1567 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1568 { name: DEFAULT_TASK_NAME },
1569 { name: 'jsonIntegerSerialization' },
1570 { name: 'factorial' },
1571 { name: 'fibonacci' },
1572 ])
1573 await dynamicThreadPool.destroy()
1574 const fixedClusterPool = new FixedClusterPool(
1575 numberOfWorkers,
1576 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1577 )
1578 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1579 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1580 { name: DEFAULT_TASK_NAME },
1581 { name: 'jsonIntegerSerialization' },
1582 { name: 'factorial' },
1583 { name: 'fibonacci' },
1584 ])
1585 await fixedClusterPool.destroy()
1586 })
1587
1588 it('Verify that setDefaultTaskFunction() is working', async () => {
1589 const dynamicThreadPool = new DynamicThreadPool(
1590 Math.floor(numberOfWorkers / 2),
1591 numberOfWorkers,
1592 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1593 )
1594 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1595 const workerId = dynamicThreadPool.workerNodes[0].info.id
1596 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1597 new Error(
1598 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1599 )
1600 )
1601 await expect(
1602 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1603 ).rejects.toThrow(
1604 new Error(
1605 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1606 )
1607 )
1608 await expect(
1609 dynamicThreadPool.setDefaultTaskFunction('unknown')
1610 ).rejects.toThrow(
1611 new Error(
1612 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1613 )
1614 )
1615 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1616 { name: DEFAULT_TASK_NAME },
1617 { name: 'jsonIntegerSerialization' },
1618 { name: 'factorial' },
1619 { name: 'fibonacci' },
1620 ])
1621 await expect(
1622 dynamicThreadPool.setDefaultTaskFunction('factorial')
1623 ).resolves.toBe(true)
1624 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1625 { name: DEFAULT_TASK_NAME },
1626 { name: 'factorial' },
1627 { name: 'jsonIntegerSerialization' },
1628 { name: 'fibonacci' },
1629 ])
1630 await expect(
1631 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1632 ).resolves.toBe(true)
1633 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1634 { name: DEFAULT_TASK_NAME },
1635 { name: 'fibonacci' },
1636 { name: 'jsonIntegerSerialization' },
1637 { name: 'factorial' },
1638 ])
1639 await dynamicThreadPool.destroy()
1640 })
1641
1642 it('Verify that multiple task functions worker is working', async () => {
1643 const pool = new DynamicClusterPool(
1644 Math.floor(numberOfWorkers / 2),
1645 numberOfWorkers,
1646 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1647 )
1648 const data = { n: 10 }
1649 const result0 = await pool.execute(data)
1650 expect(result0).toStrictEqual({ ok: 1 })
1651 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1652 expect(result1).toStrictEqual({ ok: 1 })
1653 const result2 = await pool.execute(data, 'factorial')
1654 expect(result2).toBe(3628800)
1655 const result3 = await pool.execute(data, 'fibonacci')
1656 expect(result3).toBe(55)
1657 expect(pool.info.executingTasks).toBe(0)
1658 expect(pool.info.executedTasks).toBe(4)
1659 for (const workerNode of pool.workerNodes) {
1660 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1661 { name: DEFAULT_TASK_NAME },
1662 { name: 'jsonIntegerSerialization' },
1663 { name: 'factorial' },
1664 { name: 'fibonacci' },
1665 ])
1666 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1667 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1668 expect(workerNode.tasksQueue.enablePriority).toBe(false)
1669 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1670 expect(
1671 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1672 ).toStrictEqual({
1673 tasks: {
1674 executed: expect.any(Number),
1675 executing: 0,
1676 failed: 0,
1677 queued: 0,
1678 sequentiallyStolen: 0,
1679 stolen: 0,
1680 },
1681 runTime: {
1682 history: expect.any(CircularBuffer),
1683 },
1684 waitTime: {
1685 history: expect.any(CircularBuffer),
1686 },
1687 elu: {
1688 idle: {
1689 history: expect.any(CircularBuffer),
1690 },
1691 active: {
1692 history: expect.any(CircularBuffer),
1693 },
1694 },
1695 })
1696 expect(
1697 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1698 .tasks.executed
1699 ).toBeGreaterThan(0)
1700 }
1701 expect(
1702 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1703 ).toStrictEqual(
1704 workerNode.getTaskFunctionWorkerUsage(
1705 workerNode.info.taskFunctionsProperties[1].name
1706 )
1707 )
1708 }
1709 await pool.destroy()
1710 })
1711
1712 it('Verify that mapExecute() is working', async () => {
1713 const pool = new DynamicThreadPool(
1714 Math.floor(numberOfWorkers / 2),
1715 numberOfWorkers,
1716 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1717 )
1718 expect(() => pool.mapExecute()).toThrow(
1719 new TypeError('data argument must be a defined iterable')
1720 )
1721 expect(() => pool.mapExecute(0)).toThrow(
1722 new TypeError('data argument must be an iterable')
1723 )
1724 let results = await pool.mapExecute([{}, {}, {}, {}])
1725 expect(results).toStrictEqual([{ ok: 1 }, { ok: 1 }, { ok: 1 }, { ok: 1 }])
1726 expect(pool.info.executingTasks).toBe(0)
1727 expect(pool.info.executedTasks).toBe(4)
1728 results = await pool.mapExecute(
1729 [{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }],
1730 'factorial'
1731 )
1732 expect(results).toStrictEqual([
1733 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47,
1734 ])
1735 expect(pool.info.executingTasks).toBe(0)
1736 expect(pool.info.executedTasks).toBe(8)
1737 results = await pool.mapExecute(
1738 new Set([{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }]),
1739 'factorial'
1740 )
1741 expect(results).toStrictEqual([
1742 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47,
1743 ])
1744 expect(pool.info.executingTasks).toBe(0)
1745 expect(pool.info.executedTasks).toBe(12)
1746 await pool.destroy()
1747 })
1748
1749 it('Verify that task function objects worker is working', async () => {
1750 const pool = new DynamicThreadPool(
1751 Math.floor(numberOfWorkers / 2),
1752 numberOfWorkers,
1753 './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs'
1754 )
1755 const data = { n: 10 }
1756 const result0 = await pool.execute(data)
1757 expect(result0).toStrictEqual({ ok: 1 })
1758 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1759 expect(result1).toStrictEqual({ ok: 1 })
1760 const result2 = await pool.execute(data, 'factorial')
1761 expect(result2).toBe(3628800)
1762 const result3 = await pool.execute(data, 'fibonacci')
1763 expect(result3).toBe(55)
1764 expect(pool.info.executingTasks).toBe(0)
1765 expect(pool.info.executedTasks).toBe(4)
1766 for (const workerNode of pool.workerNodes) {
1767 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1768 { name: DEFAULT_TASK_NAME },
1769 { name: 'jsonIntegerSerialization' },
1770 { name: 'factorial' },
1771 { name: 'fibonacci', priority: -5 },
1772 ])
1773 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1774 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1775 expect(workerNode.tasksQueue.enablePriority).toBe(true)
1776 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1777 expect(
1778 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1779 ).toStrictEqual({
1780 tasks: {
1781 executed: expect.any(Number),
1782 executing: 0,
1783 failed: 0,
1784 queued: 0,
1785 sequentiallyStolen: 0,
1786 stolen: 0,
1787 },
1788 runTime: {
1789 history: expect.any(CircularBuffer),
1790 },
1791 waitTime: {
1792 history: expect.any(CircularBuffer),
1793 },
1794 elu: {
1795 idle: {
1796 history: expect.any(CircularBuffer),
1797 },
1798 active: {
1799 history: expect.any(CircularBuffer),
1800 },
1801 },
1802 })
1803 expect(
1804 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1805 .tasks.executed
1806 ).toBeGreaterThan(0)
1807 }
1808 expect(
1809 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1810 ).toStrictEqual(
1811 workerNode.getTaskFunctionWorkerUsage(
1812 workerNode.info.taskFunctionsProperties[1].name
1813 )
1814 )
1815 }
1816 await pool.destroy()
1817 })
1818
1819 it('Verify sendKillMessageToWorker()', async () => {
1820 const pool = new DynamicClusterPool(
1821 Math.floor(numberOfWorkers / 2),
1822 numberOfWorkers,
1823 './tests/worker-files/cluster/testWorker.cjs'
1824 )
1825 const workerNodeKey = 0
1826 await expect(
1827 pool.sendKillMessageToWorker(workerNodeKey)
1828 ).resolves.toBeUndefined()
1829 await pool.destroy()
1830 })
1831
1832 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1833 const pool = new DynamicClusterPool(
1834 Math.floor(numberOfWorkers / 2),
1835 numberOfWorkers,
1836 './tests/worker-files/cluster/testWorker.cjs'
1837 )
1838 const workerNodeKey = 0
1839 await expect(
1840 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1841 taskFunctionOperation: 'add',
1842 taskFunctionProperties: { name: 'empty' },
1843 taskFunction: (() => {}).toString(),
1844 })
1845 ).resolves.toBe(true)
1846 expect(
1847 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
1848 ).toStrictEqual([
1849 { name: DEFAULT_TASK_NAME },
1850 { name: 'test' },
1851 { name: 'empty' },
1852 ])
1853 await pool.destroy()
1854 })
1855
1856 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1857 const pool = new DynamicClusterPool(
1858 Math.floor(numberOfWorkers / 2),
1859 numberOfWorkers,
1860 './tests/worker-files/cluster/testWorker.cjs'
1861 )
1862 await expect(
1863 pool.sendTaskFunctionOperationToWorkers({
1864 taskFunctionOperation: 'add',
1865 taskFunctionProperties: { name: 'empty' },
1866 taskFunction: (() => {}).toString(),
1867 })
1868 ).resolves.toBe(true)
1869 for (const workerNode of pool.workerNodes) {
1870 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1871 { name: DEFAULT_TASK_NAME },
1872 { name: 'test' },
1873 { name: 'empty' },
1874 ])
1875 }
1876 await pool.destroy()
1877 })
1878 })