refactor: remove unneeded worker choice strategy storage in intermediate
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
8 import {
9 DynamicClusterPool,
10 DynamicThreadPool,
11 FixedClusterPool,
12 FixedThreadPool,
13 PoolEvents,
14 PoolTypes,
15 WorkerChoiceStrategies,
16 WorkerTypes
17 } from '../../lib/index.cjs'
18 import { CircularArray } from '../../lib/circular-array.cjs'
19 import { Deque } from '../../lib/deque.cjs'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
21 import { waitPoolEvents } from '../test-utils.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
26 readFileSync(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
28 'utf8'
29 )
30 ).version
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
33 isMain () {
34 return false
35 }
36 }
37
38 afterEach(() => {
39 restore()
40 })
41
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
52 expect(
53 () =>
54 new StubPoolWithIsMain(
55 numberOfWorkers,
56 './tests/worker-files/thread/testWorker.mjs',
57 {
58 errorHandler: e => console.error(e)
59 }
60 )
61 ).toThrow(
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
65 )
66 })
67
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
71 './tests/worker-files/thread/testWorker.mjs'
72 )
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
76 await pool.destroy()
77 })
78
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
96 './tests/worker-files/thread/testWorker.mjs'
97 )
98 ).toThrow(
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
109 ).toThrow(
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
120 ).toThrow(
121 new TypeError(
122 'Cannot instantiate a pool with a non safe integer number of workers'
123 )
124 )
125 })
126
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
143 it('Verify that dynamic pool sizing is checked', () => {
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.cjs'
150 )
151 ).toThrow(
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
161 './tests/worker-files/thread/testWorker.mjs'
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
173 './tests/worker-files/cluster/testWorker.cjs'
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
180 expect(
181 () =>
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
202 )
203 )
204 expect(
205 () =>
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.cjs'
210 )
211 ).toThrow(
212 new RangeError(
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
214 )
215 )
216 })
217
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
221 './tests/worker-files/thread/testWorker.mjs'
222 )
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
225 startWorkers: true,
226 enableEvents: true,
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
230 })
231 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
232 .workerChoiceStrategies) {
233 expect(workerChoiceStrategy.opts).toStrictEqual({
234 runTime: { median: false },
235 waitTime: { median: false },
236 elu: { median: false },
237 weights: expect.objectContaining({
238 0: expect.any(Number),
239 [pool.info.maxSize - 1]: expect.any(Number)
240 })
241 })
242 }
243 await pool.destroy()
244 const testHandler = () => console.info('test handler executed')
245 pool = new FixedThreadPool(
246 numberOfWorkers,
247 './tests/worker-files/thread/testWorker.mjs',
248 {
249 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
250 workerChoiceStrategyOptions: {
251 runTime: { median: true },
252 weights: { 0: 300, 1: 200 }
253 },
254 enableEvents: false,
255 restartWorkerOnError: false,
256 enableTasksQueue: true,
257 tasksQueueOptions: { concurrency: 2 },
258 messageHandler: testHandler,
259 errorHandler: testHandler,
260 onlineHandler: testHandler,
261 exitHandler: testHandler
262 }
263 )
264 expect(pool.emitter).toBeUndefined()
265 expect(pool.opts).toStrictEqual({
266 startWorkers: true,
267 enableEvents: false,
268 restartWorkerOnError: false,
269 enableTasksQueue: true,
270 tasksQueueOptions: {
271 concurrency: 2,
272 size: Math.pow(numberOfWorkers, 2),
273 taskStealing: true,
274 tasksStealingOnBackPressure: true,
275 tasksFinishedTimeout: 2000
276 },
277 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
278 workerChoiceStrategyOptions: {
279 runTime: { median: true },
280 weights: { 0: 300, 1: 200 }
281 },
282 onlineHandler: testHandler,
283 messageHandler: testHandler,
284 errorHandler: testHandler,
285 exitHandler: testHandler
286 })
287 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
288 .workerChoiceStrategies) {
289 expect(workerChoiceStrategy.opts).toStrictEqual({
290 runTime: { median: true },
291 waitTime: { median: false },
292 elu: { median: false },
293 weights: { 0: 300, 1: 200 }
294 })
295 }
296 await pool.destroy()
297 })
298
299 it('Verify that pool options are validated', () => {
300 expect(
301 () =>
302 new FixedThreadPool(
303 numberOfWorkers,
304 './tests/worker-files/thread/testWorker.mjs',
305 {
306 workerChoiceStrategy: 'invalidStrategy'
307 }
308 )
309 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
310 expect(
311 () =>
312 new FixedThreadPool(
313 numberOfWorkers,
314 './tests/worker-files/thread/testWorker.mjs',
315 {
316 workerChoiceStrategyOptions: { weights: {} }
317 }
318 )
319 ).toThrow(
320 new Error(
321 'Invalid worker choice strategy options: must have a weight for each worker node'
322 )
323 )
324 expect(
325 () =>
326 new FixedThreadPool(
327 numberOfWorkers,
328 './tests/worker-files/thread/testWorker.mjs',
329 {
330 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
331 }
332 )
333 ).toThrow(
334 new Error(
335 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
336 )
337 )
338 expect(
339 () =>
340 new FixedThreadPool(
341 numberOfWorkers,
342 './tests/worker-files/thread/testWorker.mjs',
343 {
344 enableTasksQueue: true,
345 tasksQueueOptions: 'invalidTasksQueueOptions'
346 }
347 )
348 ).toThrow(
349 new TypeError('Invalid tasks queue options: must be a plain object')
350 )
351 expect(
352 () =>
353 new FixedThreadPool(
354 numberOfWorkers,
355 './tests/worker-files/thread/testWorker.mjs',
356 {
357 enableTasksQueue: true,
358 tasksQueueOptions: { concurrency: 0 }
359 }
360 )
361 ).toThrow(
362 new RangeError(
363 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
364 )
365 )
366 expect(
367 () =>
368 new FixedThreadPool(
369 numberOfWorkers,
370 './tests/worker-files/thread/testWorker.mjs',
371 {
372 enableTasksQueue: true,
373 tasksQueueOptions: { concurrency: -1 }
374 }
375 )
376 ).toThrow(
377 new RangeError(
378 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
379 )
380 )
381 expect(
382 () =>
383 new FixedThreadPool(
384 numberOfWorkers,
385 './tests/worker-files/thread/testWorker.mjs',
386 {
387 enableTasksQueue: true,
388 tasksQueueOptions: { concurrency: 0.2 }
389 }
390 )
391 ).toThrow(
392 new TypeError('Invalid worker node tasks concurrency: must be an integer')
393 )
394 expect(
395 () =>
396 new FixedThreadPool(
397 numberOfWorkers,
398 './tests/worker-files/thread/testWorker.mjs',
399 {
400 enableTasksQueue: true,
401 tasksQueueOptions: { size: 0 }
402 }
403 )
404 ).toThrow(
405 new RangeError(
406 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
407 )
408 )
409 expect(
410 () =>
411 new FixedThreadPool(
412 numberOfWorkers,
413 './tests/worker-files/thread/testWorker.mjs',
414 {
415 enableTasksQueue: true,
416 tasksQueueOptions: { size: -1 }
417 }
418 )
419 ).toThrow(
420 new RangeError(
421 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
422 )
423 )
424 expect(
425 () =>
426 new FixedThreadPool(
427 numberOfWorkers,
428 './tests/worker-files/thread/testWorker.mjs',
429 {
430 enableTasksQueue: true,
431 tasksQueueOptions: { size: 0.2 }
432 }
433 )
434 ).toThrow(
435 new TypeError('Invalid worker node tasks queue size: must be an integer')
436 )
437 })
438
439 it('Verify that pool worker choice strategy options can be set', async () => {
440 const pool = new FixedThreadPool(
441 numberOfWorkers,
442 './tests/worker-files/thread/testWorker.mjs',
443 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
444 )
445 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
446 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
447 .workerChoiceStrategies) {
448 expect(workerChoiceStrategy.opts).toStrictEqual({
449 runTime: { median: false },
450 waitTime: { median: false },
451 elu: { median: false },
452 weights: expect.objectContaining({
453 0: expect.any(Number),
454 [pool.info.maxSize - 1]: expect.any(Number)
455 })
456 })
457 }
458 expect(
459 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
460 ).toStrictEqual({
461 runTime: {
462 aggregate: true,
463 average: true,
464 median: false
465 },
466 waitTime: {
467 aggregate: false,
468 average: false,
469 median: false
470 },
471 elu: {
472 aggregate: true,
473 average: true,
474 median: false
475 }
476 })
477 pool.setWorkerChoiceStrategyOptions({
478 runTime: { median: true },
479 elu: { median: true }
480 })
481 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
482 runTime: { median: true },
483 elu: { median: true }
484 })
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
487 expect(workerChoiceStrategy.opts).toStrictEqual({
488 runTime: { median: true },
489 waitTime: { median: false },
490 elu: { median: true },
491 weights: expect.objectContaining({
492 0: expect.any(Number),
493 [pool.info.maxSize - 1]: expect.any(Number)
494 })
495 })
496 }
497 expect(
498 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
499 ).toStrictEqual({
500 runTime: {
501 aggregate: true,
502 average: false,
503 median: true
504 },
505 waitTime: {
506 aggregate: false,
507 average: false,
508 median: false
509 },
510 elu: {
511 aggregate: true,
512 average: false,
513 median: true
514 }
515 })
516 pool.setWorkerChoiceStrategyOptions({
517 runTime: { median: false },
518 elu: { median: false }
519 })
520 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
521 runTime: { median: false },
522 elu: { median: false }
523 })
524 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
525 .workerChoiceStrategies) {
526 expect(workerChoiceStrategy.opts).toStrictEqual({
527 runTime: { median: false },
528 waitTime: { median: false },
529 elu: { median: false },
530 weights: expect.objectContaining({
531 0: expect.any(Number),
532 [pool.info.maxSize - 1]: expect.any(Number)
533 })
534 })
535 }
536 expect(
537 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
538 ).toStrictEqual({
539 runTime: {
540 aggregate: true,
541 average: true,
542 median: false
543 },
544 waitTime: {
545 aggregate: false,
546 average: false,
547 median: false
548 },
549 elu: {
550 aggregate: true,
551 average: true,
552 median: false
553 }
554 })
555 expect(() =>
556 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
557 ).toThrow(
558 new TypeError(
559 'Invalid worker choice strategy options: must be a plain object'
560 )
561 )
562 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
563 new Error(
564 'Invalid worker choice strategy options: must have a weight for each worker node'
565 )
566 )
567 expect(() =>
568 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
569 ).toThrow(
570 new Error(
571 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
572 )
573 )
574 await pool.destroy()
575 })
576
577 it('Verify that pool tasks queue can be enabled/disabled', async () => {
578 const pool = new FixedThreadPool(
579 numberOfWorkers,
580 './tests/worker-files/thread/testWorker.mjs'
581 )
582 expect(pool.opts.enableTasksQueue).toBe(false)
583 expect(pool.opts.tasksQueueOptions).toBeUndefined()
584 pool.enableTasksQueue(true)
585 expect(pool.opts.enableTasksQueue).toBe(true)
586 expect(pool.opts.tasksQueueOptions).toStrictEqual({
587 concurrency: 1,
588 size: Math.pow(numberOfWorkers, 2),
589 taskStealing: true,
590 tasksStealingOnBackPressure: true,
591 tasksFinishedTimeout: 2000
592 })
593 pool.enableTasksQueue(true, { concurrency: 2 })
594 expect(pool.opts.enableTasksQueue).toBe(true)
595 expect(pool.opts.tasksQueueOptions).toStrictEqual({
596 concurrency: 2,
597 size: Math.pow(numberOfWorkers, 2),
598 taskStealing: true,
599 tasksStealingOnBackPressure: true,
600 tasksFinishedTimeout: 2000
601 })
602 pool.enableTasksQueue(false)
603 expect(pool.opts.enableTasksQueue).toBe(false)
604 expect(pool.opts.tasksQueueOptions).toBeUndefined()
605 await pool.destroy()
606 })
607
608 it('Verify that pool tasks queue options can be set', async () => {
609 const pool = new FixedThreadPool(
610 numberOfWorkers,
611 './tests/worker-files/thread/testWorker.mjs',
612 { enableTasksQueue: true }
613 )
614 expect(pool.opts.tasksQueueOptions).toStrictEqual({
615 concurrency: 1,
616 size: Math.pow(numberOfWorkers, 2),
617 taskStealing: true,
618 tasksStealingOnBackPressure: true,
619 tasksFinishedTimeout: 2000
620 })
621 for (const workerNode of pool.workerNodes) {
622 expect(workerNode.tasksQueueBackPressureSize).toBe(
623 pool.opts.tasksQueueOptions.size
624 )
625 }
626 pool.setTasksQueueOptions({
627 concurrency: 2,
628 size: 2,
629 taskStealing: false,
630 tasksStealingOnBackPressure: false,
631 tasksFinishedTimeout: 3000
632 })
633 expect(pool.opts.tasksQueueOptions).toStrictEqual({
634 concurrency: 2,
635 size: 2,
636 taskStealing: false,
637 tasksStealingOnBackPressure: false,
638 tasksFinishedTimeout: 3000
639 })
640 for (const workerNode of pool.workerNodes) {
641 expect(workerNode.tasksQueueBackPressureSize).toBe(
642 pool.opts.tasksQueueOptions.size
643 )
644 }
645 pool.setTasksQueueOptions({
646 concurrency: 1,
647 taskStealing: true,
648 tasksStealingOnBackPressure: true
649 })
650 expect(pool.opts.tasksQueueOptions).toStrictEqual({
651 concurrency: 1,
652 size: Math.pow(numberOfWorkers, 2),
653 taskStealing: true,
654 tasksStealingOnBackPressure: true,
655 tasksFinishedTimeout: 2000
656 })
657 for (const workerNode of pool.workerNodes) {
658 expect(workerNode.tasksQueueBackPressureSize).toBe(
659 pool.opts.tasksQueueOptions.size
660 )
661 }
662 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
663 new TypeError('Invalid tasks queue options: must be a plain object')
664 )
665 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
666 new RangeError(
667 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
668 )
669 )
670 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
671 new RangeError(
672 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
673 )
674 )
675 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
676 new TypeError('Invalid worker node tasks concurrency: must be an integer')
677 )
678 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
679 new RangeError(
680 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
681 )
682 )
683 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
684 new RangeError(
685 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
686 )
687 )
688 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
689 new TypeError('Invalid worker node tasks queue size: must be an integer')
690 )
691 await pool.destroy()
692 })
693
694 it('Verify that pool info is set', async () => {
695 let pool = new FixedThreadPool(
696 numberOfWorkers,
697 './tests/worker-files/thread/testWorker.mjs'
698 )
699 expect(pool.info).toStrictEqual({
700 version,
701 type: PoolTypes.fixed,
702 worker: WorkerTypes.thread,
703 started: true,
704 ready: true,
705 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
706 minSize: numberOfWorkers,
707 maxSize: numberOfWorkers,
708 workerNodes: numberOfWorkers,
709 idleWorkerNodes: numberOfWorkers,
710 busyWorkerNodes: 0,
711 executedTasks: 0,
712 executingTasks: 0,
713 failedTasks: 0
714 })
715 await pool.destroy()
716 pool = new DynamicClusterPool(
717 Math.floor(numberOfWorkers / 2),
718 numberOfWorkers,
719 './tests/worker-files/cluster/testWorker.cjs'
720 )
721 expect(pool.info).toStrictEqual({
722 version,
723 type: PoolTypes.dynamic,
724 worker: WorkerTypes.cluster,
725 started: true,
726 ready: true,
727 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
728 minSize: Math.floor(numberOfWorkers / 2),
729 maxSize: numberOfWorkers,
730 workerNodes: Math.floor(numberOfWorkers / 2),
731 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
732 busyWorkerNodes: 0,
733 executedTasks: 0,
734 executingTasks: 0,
735 failedTasks: 0
736 })
737 await pool.destroy()
738 })
739
740 it('Verify that pool worker tasks usage are initialized', async () => {
741 const pool = new FixedClusterPool(
742 numberOfWorkers,
743 './tests/worker-files/cluster/testWorker.cjs'
744 )
745 for (const workerNode of pool.workerNodes) {
746 expect(workerNode).toBeInstanceOf(WorkerNode)
747 expect(workerNode.usage).toStrictEqual({
748 tasks: {
749 executed: 0,
750 executing: 0,
751 queued: 0,
752 maxQueued: 0,
753 sequentiallyStolen: 0,
754 stolen: 0,
755 failed: 0
756 },
757 runTime: {
758 history: new CircularArray()
759 },
760 waitTime: {
761 history: new CircularArray()
762 },
763 elu: {
764 idle: {
765 history: new CircularArray()
766 },
767 active: {
768 history: new CircularArray()
769 }
770 }
771 })
772 }
773 await pool.destroy()
774 })
775
776 it('Verify that pool worker tasks queue are initialized', async () => {
777 let pool = new FixedClusterPool(
778 numberOfWorkers,
779 './tests/worker-files/cluster/testWorker.cjs'
780 )
781 for (const workerNode of pool.workerNodes) {
782 expect(workerNode).toBeInstanceOf(WorkerNode)
783 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
784 expect(workerNode.tasksQueue.size).toBe(0)
785 expect(workerNode.tasksQueue.maxSize).toBe(0)
786 }
787 await pool.destroy()
788 pool = new DynamicThreadPool(
789 Math.floor(numberOfWorkers / 2),
790 numberOfWorkers,
791 './tests/worker-files/thread/testWorker.mjs'
792 )
793 for (const workerNode of pool.workerNodes) {
794 expect(workerNode).toBeInstanceOf(WorkerNode)
795 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
796 expect(workerNode.tasksQueue.size).toBe(0)
797 expect(workerNode.tasksQueue.maxSize).toBe(0)
798 }
799 await pool.destroy()
800 })
801
802 it('Verify that pool worker info are initialized', async () => {
803 let pool = new FixedClusterPool(
804 numberOfWorkers,
805 './tests/worker-files/cluster/testWorker.cjs'
806 )
807 for (const workerNode of pool.workerNodes) {
808 expect(workerNode).toBeInstanceOf(WorkerNode)
809 expect(workerNode.info).toStrictEqual({
810 id: expect.any(Number),
811 type: WorkerTypes.cluster,
812 dynamic: false,
813 ready: true,
814 stealing: false
815 })
816 }
817 await pool.destroy()
818 pool = new DynamicThreadPool(
819 Math.floor(numberOfWorkers / 2),
820 numberOfWorkers,
821 './tests/worker-files/thread/testWorker.mjs'
822 )
823 for (const workerNode of pool.workerNodes) {
824 expect(workerNode).toBeInstanceOf(WorkerNode)
825 expect(workerNode.info).toStrictEqual({
826 id: expect.any(Number),
827 type: WorkerTypes.thread,
828 dynamic: false,
829 ready: true,
830 stealing: false
831 })
832 }
833 await pool.destroy()
834 })
835
836 it('Verify that pool statuses are checked at start or destroy', async () => {
837 const pool = new FixedThreadPool(
838 numberOfWorkers,
839 './tests/worker-files/thread/testWorker.mjs'
840 )
841 expect(pool.info.started).toBe(true)
842 expect(pool.info.ready).toBe(true)
843 expect(() => pool.start()).toThrow(
844 new Error('Cannot start an already started pool')
845 )
846 await pool.destroy()
847 expect(pool.info.started).toBe(false)
848 expect(pool.info.ready).toBe(false)
849 await expect(pool.destroy()).rejects.toThrow(
850 new Error('Cannot destroy an already destroyed pool')
851 )
852 })
853
854 it('Verify that pool can be started after initialization', async () => {
855 const pool = new FixedClusterPool(
856 numberOfWorkers,
857 './tests/worker-files/cluster/testWorker.cjs',
858 {
859 startWorkers: false
860 }
861 )
862 expect(pool.info.started).toBe(false)
863 expect(pool.info.ready).toBe(false)
864 expect(pool.readyEventEmitted).toBe(false)
865 expect(pool.workerNodes).toStrictEqual([])
866 await expect(pool.execute()).rejects.toThrow(
867 new Error('Cannot execute a task on not started pool')
868 )
869 pool.start()
870 expect(pool.info.started).toBe(true)
871 expect(pool.info.ready).toBe(true)
872 await waitPoolEvents(pool, PoolEvents.ready, 1)
873 expect(pool.readyEventEmitted).toBe(true)
874 expect(pool.workerNodes.length).toBe(numberOfWorkers)
875 for (const workerNode of pool.workerNodes) {
876 expect(workerNode).toBeInstanceOf(WorkerNode)
877 }
878 await pool.destroy()
879 })
880
881 it('Verify that pool execute() arguments are checked', async () => {
882 const pool = new FixedClusterPool(
883 numberOfWorkers,
884 './tests/worker-files/cluster/testWorker.cjs'
885 )
886 await expect(pool.execute(undefined, 0)).rejects.toThrow(
887 new TypeError('name argument must be a string')
888 )
889 await expect(pool.execute(undefined, '')).rejects.toThrow(
890 new TypeError('name argument must not be an empty string')
891 )
892 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
893 new TypeError('transferList argument must be an array')
894 )
895 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
896 "Task function 'unknown' not found"
897 )
898 await pool.destroy()
899 await expect(pool.execute()).rejects.toThrow(
900 new Error('Cannot execute a task on not started pool')
901 )
902 })
903
904 it('Verify that pool worker tasks usage are computed', async () => {
905 const pool = new FixedClusterPool(
906 numberOfWorkers,
907 './tests/worker-files/cluster/testWorker.cjs'
908 )
909 const promises = new Set()
910 const maxMultiplier = 2
911 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
912 promises.add(pool.execute())
913 }
914 for (const workerNode of pool.workerNodes) {
915 expect(workerNode.usage).toStrictEqual({
916 tasks: {
917 executed: 0,
918 executing: maxMultiplier,
919 queued: 0,
920 maxQueued: 0,
921 sequentiallyStolen: 0,
922 stolen: 0,
923 failed: 0
924 },
925 runTime: {
926 history: expect.any(CircularArray)
927 },
928 waitTime: {
929 history: expect.any(CircularArray)
930 },
931 elu: {
932 idle: {
933 history: expect.any(CircularArray)
934 },
935 active: {
936 history: expect.any(CircularArray)
937 }
938 }
939 })
940 }
941 await Promise.all(promises)
942 for (const workerNode of pool.workerNodes) {
943 expect(workerNode.usage).toStrictEqual({
944 tasks: {
945 executed: maxMultiplier,
946 executing: 0,
947 queued: 0,
948 maxQueued: 0,
949 sequentiallyStolen: 0,
950 stolen: 0,
951 failed: 0
952 },
953 runTime: {
954 history: expect.any(CircularArray)
955 },
956 waitTime: {
957 history: expect.any(CircularArray)
958 },
959 elu: {
960 idle: {
961 history: expect.any(CircularArray)
962 },
963 active: {
964 history: expect.any(CircularArray)
965 }
966 }
967 })
968 }
969 await pool.destroy()
970 })
971
972 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
973 const pool = new DynamicThreadPool(
974 Math.floor(numberOfWorkers / 2),
975 numberOfWorkers,
976 './tests/worker-files/thread/testWorker.mjs'
977 )
978 const promises = new Set()
979 const maxMultiplier = 2
980 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
981 promises.add(pool.execute())
982 }
983 await Promise.all(promises)
984 for (const workerNode of pool.workerNodes) {
985 expect(workerNode.usage).toStrictEqual({
986 tasks: {
987 executed: expect.any(Number),
988 executing: 0,
989 queued: 0,
990 maxQueued: 0,
991 sequentiallyStolen: 0,
992 stolen: 0,
993 failed: 0
994 },
995 runTime: {
996 history: expect.any(CircularArray)
997 },
998 waitTime: {
999 history: expect.any(CircularArray)
1000 },
1001 elu: {
1002 idle: {
1003 history: expect.any(CircularArray)
1004 },
1005 active: {
1006 history: expect.any(CircularArray)
1007 }
1008 }
1009 })
1010 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1011 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1012 numberOfWorkers * maxMultiplier
1013 )
1014 expect(workerNode.usage.runTime.history.length).toBe(0)
1015 expect(workerNode.usage.waitTime.history.length).toBe(0)
1016 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1017 expect(workerNode.usage.elu.active.history.length).toBe(0)
1018 }
1019 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1020 for (const workerNode of pool.workerNodes) {
1021 expect(workerNode.usage).toStrictEqual({
1022 tasks: {
1023 executed: 0,
1024 executing: 0,
1025 queued: 0,
1026 maxQueued: 0,
1027 sequentiallyStolen: 0,
1028 stolen: 0,
1029 failed: 0
1030 },
1031 runTime: {
1032 history: expect.any(CircularArray)
1033 },
1034 waitTime: {
1035 history: expect.any(CircularArray)
1036 },
1037 elu: {
1038 idle: {
1039 history: expect.any(CircularArray)
1040 },
1041 active: {
1042 history: expect.any(CircularArray)
1043 }
1044 }
1045 })
1046 expect(workerNode.usage.runTime.history.length).toBe(0)
1047 expect(workerNode.usage.waitTime.history.length).toBe(0)
1048 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1049 expect(workerNode.usage.elu.active.history.length).toBe(0)
1050 }
1051 await pool.destroy()
1052 })
1053
1054 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1055 const pool = new DynamicClusterPool(
1056 Math.floor(numberOfWorkers / 2),
1057 numberOfWorkers,
1058 './tests/worker-files/cluster/testWorker.cjs'
1059 )
1060 expect(pool.emitter.eventNames()).toStrictEqual([])
1061 let poolInfo
1062 let poolReady = 0
1063 pool.emitter.on(PoolEvents.ready, info => {
1064 ++poolReady
1065 poolInfo = info
1066 })
1067 await waitPoolEvents(pool, PoolEvents.ready, 1)
1068 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1069 expect(poolReady).toBe(1)
1070 expect(poolInfo).toStrictEqual({
1071 version,
1072 type: PoolTypes.dynamic,
1073 worker: WorkerTypes.cluster,
1074 started: true,
1075 ready: true,
1076 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1077 minSize: expect.any(Number),
1078 maxSize: expect.any(Number),
1079 workerNodes: expect.any(Number),
1080 idleWorkerNodes: expect.any(Number),
1081 busyWorkerNodes: expect.any(Number),
1082 executedTasks: expect.any(Number),
1083 executingTasks: expect.any(Number),
1084 failedTasks: expect.any(Number)
1085 })
1086 await pool.destroy()
1087 })
1088
1089 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1090 const pool = new FixedThreadPool(
1091 numberOfWorkers,
1092 './tests/worker-files/thread/testWorker.mjs'
1093 )
1094 expect(pool.emitter.eventNames()).toStrictEqual([])
1095 const promises = new Set()
1096 let poolBusy = 0
1097 let poolInfo
1098 pool.emitter.on(PoolEvents.busy, info => {
1099 ++poolBusy
1100 poolInfo = info
1101 })
1102 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1103 for (let i = 0; i < numberOfWorkers * 2; i++) {
1104 promises.add(pool.execute())
1105 }
1106 await Promise.all(promises)
1107 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1108 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1109 expect(poolBusy).toBe(numberOfWorkers + 1)
1110 expect(poolInfo).toStrictEqual({
1111 version,
1112 type: PoolTypes.fixed,
1113 worker: WorkerTypes.thread,
1114 started: true,
1115 ready: true,
1116 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1117 minSize: expect.any(Number),
1118 maxSize: expect.any(Number),
1119 workerNodes: expect.any(Number),
1120 idleWorkerNodes: expect.any(Number),
1121 busyWorkerNodes: expect.any(Number),
1122 executedTasks: expect.any(Number),
1123 executingTasks: expect.any(Number),
1124 failedTasks: expect.any(Number)
1125 })
1126 await pool.destroy()
1127 })
1128
1129 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1130 const pool = new DynamicThreadPool(
1131 Math.floor(numberOfWorkers / 2),
1132 numberOfWorkers,
1133 './tests/worker-files/thread/testWorker.mjs'
1134 )
1135 expect(pool.emitter.eventNames()).toStrictEqual([])
1136 const promises = new Set()
1137 let poolFull = 0
1138 let poolInfo
1139 pool.emitter.on(PoolEvents.full, info => {
1140 ++poolFull
1141 poolInfo = info
1142 })
1143 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1144 for (let i = 0; i < numberOfWorkers * 2; i++) {
1145 promises.add(pool.execute())
1146 }
1147 await Promise.all(promises)
1148 expect(poolFull).toBe(1)
1149 expect(poolInfo).toStrictEqual({
1150 version,
1151 type: PoolTypes.dynamic,
1152 worker: WorkerTypes.thread,
1153 started: true,
1154 ready: true,
1155 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1156 minSize: expect.any(Number),
1157 maxSize: expect.any(Number),
1158 workerNodes: expect.any(Number),
1159 idleWorkerNodes: expect.any(Number),
1160 busyWorkerNodes: expect.any(Number),
1161 executedTasks: expect.any(Number),
1162 executingTasks: expect.any(Number),
1163 failedTasks: expect.any(Number)
1164 })
1165 await pool.destroy()
1166 })
1167
1168 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1169 const pool = new FixedThreadPool(
1170 numberOfWorkers,
1171 './tests/worker-files/thread/testWorker.mjs',
1172 {
1173 enableTasksQueue: true
1174 }
1175 )
1176 stub(pool, 'hasBackPressure').returns(true)
1177 expect(pool.emitter.eventNames()).toStrictEqual([])
1178 const promises = new Set()
1179 let poolBackPressure = 0
1180 let poolInfo
1181 pool.emitter.on(PoolEvents.backPressure, info => {
1182 ++poolBackPressure
1183 poolInfo = info
1184 })
1185 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1186 for (let i = 0; i < numberOfWorkers + 1; i++) {
1187 promises.add(pool.execute())
1188 }
1189 await Promise.all(promises)
1190 expect(poolBackPressure).toBe(1)
1191 expect(poolInfo).toStrictEqual({
1192 version,
1193 type: PoolTypes.fixed,
1194 worker: WorkerTypes.thread,
1195 started: true,
1196 ready: true,
1197 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1198 minSize: expect.any(Number),
1199 maxSize: expect.any(Number),
1200 workerNodes: expect.any(Number),
1201 idleWorkerNodes: expect.any(Number),
1202 stealingWorkerNodes: expect.any(Number),
1203 busyWorkerNodes: expect.any(Number),
1204 executedTasks: expect.any(Number),
1205 executingTasks: expect.any(Number),
1206 maxQueuedTasks: expect.any(Number),
1207 queuedTasks: expect.any(Number),
1208 backPressure: true,
1209 stolenTasks: expect.any(Number),
1210 failedTasks: expect.any(Number)
1211 })
1212 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1213 await pool.destroy()
1214 })
1215
1216 it('Verify that destroy() waits for queued tasks to finish', async () => {
1217 const tasksFinishedTimeout = 2500
1218 const pool = new FixedThreadPool(
1219 numberOfWorkers,
1220 './tests/worker-files/thread/asyncWorker.mjs',
1221 {
1222 enableTasksQueue: true,
1223 tasksQueueOptions: { tasksFinishedTimeout }
1224 }
1225 )
1226 const maxMultiplier = 4
1227 let tasksFinished = 0
1228 for (const workerNode of pool.workerNodes) {
1229 workerNode.on('taskFinished', () => {
1230 ++tasksFinished
1231 })
1232 }
1233 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1234 pool.execute()
1235 }
1236 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1237 const startTime = performance.now()
1238 await pool.destroy()
1239 const elapsedTime = performance.now() - startTime
1240 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1241 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1242 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 400)
1243 })
1244
1245 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1246 const tasksFinishedTimeout = 1000
1247 const pool = new FixedThreadPool(
1248 numberOfWorkers,
1249 './tests/worker-files/thread/asyncWorker.mjs',
1250 {
1251 enableTasksQueue: true,
1252 tasksQueueOptions: { tasksFinishedTimeout }
1253 }
1254 )
1255 const maxMultiplier = 4
1256 let tasksFinished = 0
1257 for (const workerNode of pool.workerNodes) {
1258 workerNode.on('taskFinished', () => {
1259 ++tasksFinished
1260 })
1261 }
1262 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1263 pool.execute()
1264 }
1265 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1266 const startTime = performance.now()
1267 await pool.destroy()
1268 const elapsedTime = performance.now() - startTime
1269 expect(tasksFinished).toBe(0)
1270 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1271 })
1272
1273 it('Verify that pool asynchronous resource track tasks execution', async () => {
1274 let taskAsyncId
1275 let initCalls = 0
1276 let beforeCalls = 0
1277 let afterCalls = 0
1278 let resolveCalls = 0
1279 const hook = createHook({
1280 init (asyncId, type) {
1281 if (type === 'poolifier:task') {
1282 initCalls++
1283 taskAsyncId = asyncId
1284 }
1285 },
1286 before (asyncId) {
1287 if (asyncId === taskAsyncId) beforeCalls++
1288 },
1289 after (asyncId) {
1290 if (asyncId === taskAsyncId) afterCalls++
1291 },
1292 promiseResolve () {
1293 if (executionAsyncId() === taskAsyncId) resolveCalls++
1294 }
1295 })
1296 const pool = new FixedThreadPool(
1297 numberOfWorkers,
1298 './tests/worker-files/thread/testWorker.mjs'
1299 )
1300 hook.enable()
1301 await pool.execute()
1302 hook.disable()
1303 expect(initCalls).toBe(1)
1304 expect(beforeCalls).toBe(1)
1305 expect(afterCalls).toBe(1)
1306 expect(resolveCalls).toBe(1)
1307 await pool.destroy()
1308 })
1309
1310 it('Verify that hasTaskFunction() is working', async () => {
1311 const dynamicThreadPool = new DynamicThreadPool(
1312 Math.floor(numberOfWorkers / 2),
1313 numberOfWorkers,
1314 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1315 )
1316 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1317 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1318 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1319 true
1320 )
1321 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1322 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1323 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1324 await dynamicThreadPool.destroy()
1325 const fixedClusterPool = new FixedClusterPool(
1326 numberOfWorkers,
1327 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1328 )
1329 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1330 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1331 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1332 true
1333 )
1334 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1335 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1336 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1337 await fixedClusterPool.destroy()
1338 })
1339
1340 it('Verify that addTaskFunction() is working', async () => {
1341 const dynamicThreadPool = new DynamicThreadPool(
1342 Math.floor(numberOfWorkers / 2),
1343 numberOfWorkers,
1344 './tests/worker-files/thread/testWorker.mjs'
1345 )
1346 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1347 await expect(
1348 dynamicThreadPool.addTaskFunction(0, () => {})
1349 ).rejects.toThrow(new TypeError('name argument must be a string'))
1350 await expect(
1351 dynamicThreadPool.addTaskFunction('', () => {})
1352 ).rejects.toThrow(
1353 new TypeError('name argument must not be an empty string')
1354 )
1355 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1356 new TypeError('fn argument must be a function')
1357 )
1358 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1359 new TypeError('fn argument must be a function')
1360 )
1361 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1362 DEFAULT_TASK_NAME,
1363 'test'
1364 ])
1365 const echoTaskFunction = data => {
1366 return data
1367 }
1368 await expect(
1369 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1370 ).resolves.toBe(true)
1371 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1372 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1373 echoTaskFunction
1374 )
1375 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1376 DEFAULT_TASK_NAME,
1377 'test',
1378 'echo'
1379 ])
1380 const taskFunctionData = { test: 'test' }
1381 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1382 expect(echoResult).toStrictEqual(taskFunctionData)
1383 for (const workerNode of dynamicThreadPool.workerNodes) {
1384 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1385 tasks: {
1386 executed: expect.any(Number),
1387 executing: 0,
1388 queued: 0,
1389 sequentiallyStolen: 0,
1390 stolen: 0,
1391 failed: 0
1392 },
1393 runTime: {
1394 history: new CircularArray()
1395 },
1396 waitTime: {
1397 history: new CircularArray()
1398 },
1399 elu: {
1400 idle: {
1401 history: new CircularArray()
1402 },
1403 active: {
1404 history: new CircularArray()
1405 }
1406 }
1407 })
1408 }
1409 await dynamicThreadPool.destroy()
1410 })
1411
1412 it('Verify that removeTaskFunction() is working', async () => {
1413 const dynamicThreadPool = new DynamicThreadPool(
1414 Math.floor(numberOfWorkers / 2),
1415 numberOfWorkers,
1416 './tests/worker-files/thread/testWorker.mjs'
1417 )
1418 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1419 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1420 DEFAULT_TASK_NAME,
1421 'test'
1422 ])
1423 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1424 new Error('Cannot remove a task function not handled on the pool side')
1425 )
1426 const echoTaskFunction = data => {
1427 return data
1428 }
1429 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1430 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1431 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1432 echoTaskFunction
1433 )
1434 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1435 DEFAULT_TASK_NAME,
1436 'test',
1437 'echo'
1438 ])
1439 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1440 true
1441 )
1442 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1443 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1444 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1445 DEFAULT_TASK_NAME,
1446 'test'
1447 ])
1448 await dynamicThreadPool.destroy()
1449 })
1450
1451 it('Verify that listTaskFunctionNames() is working', async () => {
1452 const dynamicThreadPool = new DynamicThreadPool(
1453 Math.floor(numberOfWorkers / 2),
1454 numberOfWorkers,
1455 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1456 )
1457 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1458 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1459 DEFAULT_TASK_NAME,
1460 'jsonIntegerSerialization',
1461 'factorial',
1462 'fibonacci'
1463 ])
1464 await dynamicThreadPool.destroy()
1465 const fixedClusterPool = new FixedClusterPool(
1466 numberOfWorkers,
1467 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1468 )
1469 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1470 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1471 DEFAULT_TASK_NAME,
1472 'jsonIntegerSerialization',
1473 'factorial',
1474 'fibonacci'
1475 ])
1476 await fixedClusterPool.destroy()
1477 })
1478
1479 it('Verify that setDefaultTaskFunction() is working', async () => {
1480 const dynamicThreadPool = new DynamicThreadPool(
1481 Math.floor(numberOfWorkers / 2),
1482 numberOfWorkers,
1483 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1484 )
1485 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1486 const workerId = dynamicThreadPool.workerNodes[0].info.id
1487 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1488 new Error(
1489 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1490 )
1491 )
1492 await expect(
1493 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1494 ).rejects.toThrow(
1495 new Error(
1496 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1497 )
1498 )
1499 await expect(
1500 dynamicThreadPool.setDefaultTaskFunction('unknown')
1501 ).rejects.toThrow(
1502 new Error(
1503 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1504 )
1505 )
1506 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1507 DEFAULT_TASK_NAME,
1508 'jsonIntegerSerialization',
1509 'factorial',
1510 'fibonacci'
1511 ])
1512 await expect(
1513 dynamicThreadPool.setDefaultTaskFunction('factorial')
1514 ).resolves.toBe(true)
1515 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1516 DEFAULT_TASK_NAME,
1517 'factorial',
1518 'jsonIntegerSerialization',
1519 'fibonacci'
1520 ])
1521 await expect(
1522 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1523 ).resolves.toBe(true)
1524 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1525 DEFAULT_TASK_NAME,
1526 'fibonacci',
1527 'jsonIntegerSerialization',
1528 'factorial'
1529 ])
1530 await dynamicThreadPool.destroy()
1531 })
1532
1533 it('Verify that multiple task functions worker is working', async () => {
1534 const pool = new DynamicClusterPool(
1535 Math.floor(numberOfWorkers / 2),
1536 numberOfWorkers,
1537 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1538 )
1539 const data = { n: 10 }
1540 const result0 = await pool.execute(data)
1541 expect(result0).toStrictEqual({ ok: 1 })
1542 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1543 expect(result1).toStrictEqual({ ok: 1 })
1544 const result2 = await pool.execute(data, 'factorial')
1545 expect(result2).toBe(3628800)
1546 const result3 = await pool.execute(data, 'fibonacci')
1547 expect(result3).toBe(55)
1548 expect(pool.info.executingTasks).toBe(0)
1549 expect(pool.info.executedTasks).toBe(4)
1550 for (const workerNode of pool.workerNodes) {
1551 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1552 DEFAULT_TASK_NAME,
1553 'jsonIntegerSerialization',
1554 'factorial',
1555 'fibonacci'
1556 ])
1557 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1558 for (const name of pool.listTaskFunctionNames()) {
1559 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1560 tasks: {
1561 executed: expect.any(Number),
1562 executing: 0,
1563 failed: 0,
1564 queued: 0,
1565 sequentiallyStolen: 0,
1566 stolen: 0
1567 },
1568 runTime: {
1569 history: expect.any(CircularArray)
1570 },
1571 waitTime: {
1572 history: expect.any(CircularArray)
1573 },
1574 elu: {
1575 idle: {
1576 history: expect.any(CircularArray)
1577 },
1578 active: {
1579 history: expect.any(CircularArray)
1580 }
1581 }
1582 })
1583 expect(
1584 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1585 ).toBeGreaterThan(0)
1586 }
1587 expect(
1588 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1589 ).toStrictEqual(
1590 workerNode.getTaskFunctionWorkerUsage(
1591 workerNode.info.taskFunctionNames[1]
1592 )
1593 )
1594 }
1595 await pool.destroy()
1596 })
1597
1598 it('Verify sendKillMessageToWorker()', async () => {
1599 const pool = new DynamicClusterPool(
1600 Math.floor(numberOfWorkers / 2),
1601 numberOfWorkers,
1602 './tests/worker-files/cluster/testWorker.cjs'
1603 )
1604 const workerNodeKey = 0
1605 await expect(
1606 pool.sendKillMessageToWorker(workerNodeKey)
1607 ).resolves.toBeUndefined()
1608 await pool.destroy()
1609 })
1610
1611 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1612 const pool = new DynamicClusterPool(
1613 Math.floor(numberOfWorkers / 2),
1614 numberOfWorkers,
1615 './tests/worker-files/cluster/testWorker.cjs'
1616 )
1617 const workerNodeKey = 0
1618 await expect(
1619 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1620 taskFunctionOperation: 'add',
1621 taskFunctionName: 'empty',
1622 taskFunction: (() => {}).toString()
1623 })
1624 ).resolves.toBe(true)
1625 expect(
1626 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1627 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1628 await pool.destroy()
1629 })
1630
1631 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1632 const pool = new DynamicClusterPool(
1633 Math.floor(numberOfWorkers / 2),
1634 numberOfWorkers,
1635 './tests/worker-files/cluster/testWorker.cjs'
1636 )
1637 await expect(
1638 pool.sendTaskFunctionOperationToWorkers({
1639 taskFunctionOperation: 'add',
1640 taskFunctionName: 'empty',
1641 taskFunction: (() => {}).toString()
1642 })
1643 ).resolves.toBe(true)
1644 for (const workerNode of pool.workerNodes) {
1645 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1646 DEFAULT_TASK_NAME,
1647 'test',
1648 'empty'
1649 ])
1650 }
1651 await pool.destroy()
1652 })
1653 })