89cc8995033c12c55f64e4cf8f1c7b1d59d66f42
[poolifier.git] / tests / pools / abstract-pool.test.mjs
1 import { createHook, executionAsyncId } from 'node:async_hooks'
2 import { EventEmitterAsyncResource } from 'node:events'
3 import { readFileSync } from 'node:fs'
4 import { dirname, join } from 'node:path'
5 import { fileURLToPath } from 'node:url'
6
7 import { expect } from 'expect'
8 import { restore, stub } from 'sinon'
9
10 import { CircularArray } from '../../lib/circular-array.cjs'
11 import { Deque } from '../../lib/deque.cjs'
12 import {
13 DynamicClusterPool,
14 DynamicThreadPool,
15 FixedClusterPool,
16 FixedThreadPool,
17 PoolEvents,
18 PoolTypes,
19 WorkerChoiceStrategies,
20 WorkerTypes
21 } from '../../lib/index.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
24 import { waitPoolEvents } from '../test-utils.cjs'
25
26 describe('Abstract pool test suite', () => {
27 const version = JSON.parse(
28 readFileSync(
29 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
30 'utf8'
31 )
32 ).version
33 const numberOfWorkers = 2
34 class StubPoolWithIsMain extends FixedThreadPool {
35 isMain () {
36 return false
37 }
38 }
39
40 afterEach(() => {
41 restore()
42 })
43
44 it('Verify that pool can be created and destroyed', async () => {
45 const pool = new FixedThreadPool(
46 numberOfWorkers,
47 './tests/worker-files/thread/testWorker.mjs'
48 )
49 expect(pool).toBeInstanceOf(FixedThreadPool)
50 await pool.destroy()
51 })
52
53 it('Verify that pool cannot be created from a non main thread/process', () => {
54 expect(
55 () =>
56 new StubPoolWithIsMain(
57 numberOfWorkers,
58 './tests/worker-files/thread/testWorker.mjs',
59 {
60 errorHandler: e => console.error(e)
61 }
62 )
63 ).toThrow(
64 new Error(
65 'Cannot start a pool from a worker with the same type as the pool'
66 )
67 )
68 })
69
70 it('Verify that pool statuses properties are set', async () => {
71 const pool = new FixedThreadPool(
72 numberOfWorkers,
73 './tests/worker-files/thread/testWorker.mjs'
74 )
75 expect(pool.started).toBe(true)
76 expect(pool.starting).toBe(false)
77 expect(pool.destroying).toBe(false)
78 await pool.destroy()
79 })
80
81 it('Verify that filePath is checked', () => {
82 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
83 new TypeError('The worker file path must be specified')
84 )
85 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
86 new TypeError('The worker file path must be a string')
87 )
88 expect(
89 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
90 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 })
92
93 it('Verify that numberOfWorkers is checked', () => {
94 expect(
95 () =>
96 new FixedThreadPool(
97 undefined,
98 './tests/worker-files/thread/testWorker.mjs'
99 )
100 ).toThrow(
101 new Error(
102 'Cannot instantiate a pool without specifying the number of workers'
103 )
104 )
105 })
106
107 it('Verify that a negative number of workers is checked', () => {
108 expect(
109 () =>
110 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
111 ).toThrow(
112 new RangeError(
113 'Cannot instantiate a pool with a negative number of workers'
114 )
115 )
116 })
117
118 it('Verify that a non integer number of workers is checked', () => {
119 expect(
120 () =>
121 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 ).toThrow(
123 new TypeError(
124 'Cannot instantiate a pool with a non safe integer number of workers'
125 )
126 )
127 })
128
129 it('Verify that pool arguments number and pool type are checked', () => {
130 expect(
131 () =>
132 new FixedThreadPool(
133 numberOfWorkers,
134 './tests/worker-files/thread/testWorker.mjs',
135 undefined,
136 numberOfWorkers * 2
137 )
138 ).toThrow(
139 new Error(
140 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
141 )
142 )
143 })
144
145 it('Verify that dynamic pool sizing is checked', () => {
146 expect(
147 () =>
148 new DynamicClusterPool(
149 1,
150 undefined,
151 './tests/worker-files/cluster/testWorker.cjs'
152 )
153 ).toThrow(
154 new TypeError(
155 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
156 )
157 )
158 expect(
159 () =>
160 new DynamicThreadPool(
161 0.5,
162 1,
163 './tests/worker-files/thread/testWorker.mjs'
164 )
165 ).toThrow(
166 new TypeError(
167 'Cannot instantiate a pool with a non safe integer number of workers'
168 )
169 )
170 expect(
171 () =>
172 new DynamicClusterPool(
173 0,
174 0.5,
175 './tests/worker-files/cluster/testWorker.cjs'
176 )
177 ).toThrow(
178 new TypeError(
179 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
180 )
181 )
182 expect(
183 () =>
184 new DynamicThreadPool(
185 2,
186 1,
187 './tests/worker-files/thread/testWorker.mjs'
188 )
189 ).toThrow(
190 new RangeError(
191 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
192 )
193 )
194 expect(
195 () =>
196 new DynamicThreadPool(
197 0,
198 0,
199 './tests/worker-files/thread/testWorker.mjs'
200 )
201 ).toThrow(
202 new RangeError(
203 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
204 )
205 )
206 expect(
207 () =>
208 new DynamicClusterPool(
209 1,
210 1,
211 './tests/worker-files/cluster/testWorker.cjs'
212 )
213 ).toThrow(
214 new RangeError(
215 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
216 )
217 )
218 })
219
220 it('Verify that pool options are checked', async () => {
221 let pool = new FixedThreadPool(
222 numberOfWorkers,
223 './tests/worker-files/thread/testWorker.mjs'
224 )
225 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
226 expect(pool.emitter.eventNames()).toStrictEqual([])
227 expect(pool.opts).toStrictEqual({
228 startWorkers: true,
229 enableEvents: true,
230 restartWorkerOnError: true,
231 enableTasksQueue: false,
232 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
233 })
234 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
235 .workerChoiceStrategies) {
236 expect(workerChoiceStrategy.opts).toStrictEqual({
237 runTime: { median: false },
238 waitTime: { median: false },
239 elu: { median: false },
240 weights: expect.objectContaining({
241 0: expect.any(Number),
242 [pool.info.maxSize - 1]: expect.any(Number)
243 })
244 })
245 }
246 await pool.destroy()
247 const testHandler = () => console.info('test handler executed')
248 pool = new FixedThreadPool(
249 numberOfWorkers,
250 './tests/worker-files/thread/testWorker.mjs',
251 {
252 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
253 workerChoiceStrategyOptions: {
254 runTime: { median: true },
255 weights: { 0: 300, 1: 200 }
256 },
257 enableEvents: false,
258 restartWorkerOnError: false,
259 enableTasksQueue: true,
260 tasksQueueOptions: { concurrency: 2 },
261 messageHandler: testHandler,
262 errorHandler: testHandler,
263 onlineHandler: testHandler,
264 exitHandler: testHandler
265 }
266 )
267 expect(pool.emitter).toBeUndefined()
268 expect(pool.opts).toStrictEqual({
269 startWorkers: true,
270 enableEvents: false,
271 restartWorkerOnError: false,
272 enableTasksQueue: true,
273 tasksQueueOptions: {
274 concurrency: 2,
275 size: Math.pow(numberOfWorkers, 2),
276 taskStealing: true,
277 tasksStealingOnBackPressure: true,
278 tasksFinishedTimeout: 2000
279 },
280 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
281 workerChoiceStrategyOptions: {
282 runTime: { median: true },
283 weights: { 0: 300, 1: 200 }
284 },
285 onlineHandler: testHandler,
286 messageHandler: testHandler,
287 errorHandler: testHandler,
288 exitHandler: testHandler
289 })
290 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
291 .workerChoiceStrategies) {
292 expect(workerChoiceStrategy.opts).toStrictEqual({
293 runTime: { median: true },
294 waitTime: { median: false },
295 elu: { median: false },
296 weights: { 0: 300, 1: 200 }
297 })
298 }
299 await pool.destroy()
300 })
301
302 it('Verify that pool options are validated', () => {
303 expect(
304 () =>
305 new FixedThreadPool(
306 numberOfWorkers,
307 './tests/worker-files/thread/testWorker.mjs',
308 {
309 workerChoiceStrategy: 'invalidStrategy'
310 }
311 )
312 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
313 expect(
314 () =>
315 new FixedThreadPool(
316 numberOfWorkers,
317 './tests/worker-files/thread/testWorker.mjs',
318 {
319 workerChoiceStrategyOptions: { weights: {} }
320 }
321 )
322 ).toThrow(
323 new Error(
324 'Invalid worker choice strategy options: must have a weight for each worker node'
325 )
326 )
327 expect(
328 () =>
329 new FixedThreadPool(
330 numberOfWorkers,
331 './tests/worker-files/thread/testWorker.mjs',
332 {
333 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
334 }
335 )
336 ).toThrow(
337 new Error(
338 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
339 )
340 )
341 expect(
342 () =>
343 new FixedThreadPool(
344 numberOfWorkers,
345 './tests/worker-files/thread/testWorker.mjs',
346 {
347 enableTasksQueue: true,
348 tasksQueueOptions: 'invalidTasksQueueOptions'
349 }
350 )
351 ).toThrow(
352 new TypeError('Invalid tasks queue options: must be a plain object')
353 )
354 expect(
355 () =>
356 new FixedThreadPool(
357 numberOfWorkers,
358 './tests/worker-files/thread/testWorker.mjs',
359 {
360 enableTasksQueue: true,
361 tasksQueueOptions: { concurrency: 0 }
362 }
363 )
364 ).toThrow(
365 new RangeError(
366 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
367 )
368 )
369 expect(
370 () =>
371 new FixedThreadPool(
372 numberOfWorkers,
373 './tests/worker-files/thread/testWorker.mjs',
374 {
375 enableTasksQueue: true,
376 tasksQueueOptions: { concurrency: -1 }
377 }
378 )
379 ).toThrow(
380 new RangeError(
381 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
382 )
383 )
384 expect(
385 () =>
386 new FixedThreadPool(
387 numberOfWorkers,
388 './tests/worker-files/thread/testWorker.mjs',
389 {
390 enableTasksQueue: true,
391 tasksQueueOptions: { concurrency: 0.2 }
392 }
393 )
394 ).toThrow(
395 new TypeError('Invalid worker node tasks concurrency: must be an integer')
396 )
397 expect(
398 () =>
399 new FixedThreadPool(
400 numberOfWorkers,
401 './tests/worker-files/thread/testWorker.mjs',
402 {
403 enableTasksQueue: true,
404 tasksQueueOptions: { size: 0 }
405 }
406 )
407 ).toThrow(
408 new RangeError(
409 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
410 )
411 )
412 expect(
413 () =>
414 new FixedThreadPool(
415 numberOfWorkers,
416 './tests/worker-files/thread/testWorker.mjs',
417 {
418 enableTasksQueue: true,
419 tasksQueueOptions: { size: -1 }
420 }
421 )
422 ).toThrow(
423 new RangeError(
424 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
425 )
426 )
427 expect(
428 () =>
429 new FixedThreadPool(
430 numberOfWorkers,
431 './tests/worker-files/thread/testWorker.mjs',
432 {
433 enableTasksQueue: true,
434 tasksQueueOptions: { size: 0.2 }
435 }
436 )
437 ).toThrow(
438 new TypeError('Invalid worker node tasks queue size: must be an integer')
439 )
440 })
441
442 it('Verify that pool worker choice strategy options can be set', async () => {
443 const pool = new FixedThreadPool(
444 numberOfWorkers,
445 './tests/worker-files/thread/testWorker.mjs',
446 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
447 )
448 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
449 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
450 .workerChoiceStrategies) {
451 expect(workerChoiceStrategy.opts).toStrictEqual({
452 runTime: { median: false },
453 waitTime: { median: false },
454 elu: { median: false },
455 weights: expect.objectContaining({
456 0: expect.any(Number),
457 [pool.info.maxSize - 1]: expect.any(Number)
458 })
459 })
460 }
461 expect(
462 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
463 ).toStrictEqual({
464 runTime: {
465 aggregate: true,
466 average: true,
467 median: false
468 },
469 waitTime: {
470 aggregate: false,
471 average: false,
472 median: false
473 },
474 elu: {
475 aggregate: true,
476 average: true,
477 median: false
478 }
479 })
480 pool.setWorkerChoiceStrategyOptions({
481 runTime: { median: true },
482 elu: { median: true }
483 })
484 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
485 runTime: { median: true },
486 elu: { median: true }
487 })
488 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
489 .workerChoiceStrategies) {
490 expect(workerChoiceStrategy.opts).toStrictEqual({
491 runTime: { median: true },
492 waitTime: { median: false },
493 elu: { median: true },
494 weights: expect.objectContaining({
495 0: expect.any(Number),
496 [pool.info.maxSize - 1]: expect.any(Number)
497 })
498 })
499 }
500 expect(
501 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
502 ).toStrictEqual({
503 runTime: {
504 aggregate: true,
505 average: false,
506 median: true
507 },
508 waitTime: {
509 aggregate: false,
510 average: false,
511 median: false
512 },
513 elu: {
514 aggregate: true,
515 average: false,
516 median: true
517 }
518 })
519 pool.setWorkerChoiceStrategyOptions({
520 runTime: { median: false },
521 elu: { median: false }
522 })
523 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
524 runTime: { median: false },
525 elu: { median: false }
526 })
527 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
528 .workerChoiceStrategies) {
529 expect(workerChoiceStrategy.opts).toStrictEqual({
530 runTime: { median: false },
531 waitTime: { median: false },
532 elu: { median: false },
533 weights: expect.objectContaining({
534 0: expect.any(Number),
535 [pool.info.maxSize - 1]: expect.any(Number)
536 })
537 })
538 }
539 expect(
540 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
541 ).toStrictEqual({
542 runTime: {
543 aggregate: true,
544 average: true,
545 median: false
546 },
547 waitTime: {
548 aggregate: false,
549 average: false,
550 median: false
551 },
552 elu: {
553 aggregate: true,
554 average: true,
555 median: false
556 }
557 })
558 expect(() =>
559 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
560 ).toThrow(
561 new TypeError(
562 'Invalid worker choice strategy options: must be a plain object'
563 )
564 )
565 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
566 new Error(
567 'Invalid worker choice strategy options: must have a weight for each worker node'
568 )
569 )
570 expect(() =>
571 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
572 ).toThrow(
573 new Error(
574 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
575 )
576 )
577 await pool.destroy()
578 })
579
580 it('Verify that pool tasks queue can be enabled/disabled', async () => {
581 const pool = new FixedThreadPool(
582 numberOfWorkers,
583 './tests/worker-files/thread/testWorker.mjs'
584 )
585 expect(pool.opts.enableTasksQueue).toBe(false)
586 expect(pool.opts.tasksQueueOptions).toBeUndefined()
587 pool.enableTasksQueue(true)
588 expect(pool.opts.enableTasksQueue).toBe(true)
589 expect(pool.opts.tasksQueueOptions).toStrictEqual({
590 concurrency: 1,
591 size: Math.pow(numberOfWorkers, 2),
592 taskStealing: true,
593 tasksStealingOnBackPressure: true,
594 tasksFinishedTimeout: 2000
595 })
596 pool.enableTasksQueue(true, { concurrency: 2 })
597 expect(pool.opts.enableTasksQueue).toBe(true)
598 expect(pool.opts.tasksQueueOptions).toStrictEqual({
599 concurrency: 2,
600 size: Math.pow(numberOfWorkers, 2),
601 taskStealing: true,
602 tasksStealingOnBackPressure: true,
603 tasksFinishedTimeout: 2000
604 })
605 pool.enableTasksQueue(false)
606 expect(pool.opts.enableTasksQueue).toBe(false)
607 expect(pool.opts.tasksQueueOptions).toBeUndefined()
608 await pool.destroy()
609 })
610
611 it('Verify that pool tasks queue options can be set', async () => {
612 const pool = new FixedThreadPool(
613 numberOfWorkers,
614 './tests/worker-files/thread/testWorker.mjs',
615 { enableTasksQueue: true }
616 )
617 expect(pool.opts.tasksQueueOptions).toStrictEqual({
618 concurrency: 1,
619 size: Math.pow(numberOfWorkers, 2),
620 taskStealing: true,
621 tasksStealingOnBackPressure: true,
622 tasksFinishedTimeout: 2000
623 })
624 for (const workerNode of pool.workerNodes) {
625 expect(workerNode.tasksQueueBackPressureSize).toBe(
626 pool.opts.tasksQueueOptions.size
627 )
628 }
629 pool.setTasksQueueOptions({
630 concurrency: 2,
631 size: 2,
632 taskStealing: false,
633 tasksStealingOnBackPressure: false,
634 tasksFinishedTimeout: 3000
635 })
636 expect(pool.opts.tasksQueueOptions).toStrictEqual({
637 concurrency: 2,
638 size: 2,
639 taskStealing: false,
640 tasksStealingOnBackPressure: false,
641 tasksFinishedTimeout: 3000
642 })
643 for (const workerNode of pool.workerNodes) {
644 expect(workerNode.tasksQueueBackPressureSize).toBe(
645 pool.opts.tasksQueueOptions.size
646 )
647 }
648 pool.setTasksQueueOptions({
649 concurrency: 1,
650 taskStealing: true,
651 tasksStealingOnBackPressure: true
652 })
653 expect(pool.opts.tasksQueueOptions).toStrictEqual({
654 concurrency: 1,
655 size: Math.pow(numberOfWorkers, 2),
656 taskStealing: true,
657 tasksStealingOnBackPressure: true,
658 tasksFinishedTimeout: 2000
659 })
660 for (const workerNode of pool.workerNodes) {
661 expect(workerNode.tasksQueueBackPressureSize).toBe(
662 pool.opts.tasksQueueOptions.size
663 )
664 }
665 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
666 new TypeError('Invalid tasks queue options: must be a plain object')
667 )
668 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
669 new RangeError(
670 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
671 )
672 )
673 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
674 new RangeError(
675 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
676 )
677 )
678 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
679 new TypeError('Invalid worker node tasks concurrency: must be an integer')
680 )
681 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
682 new RangeError(
683 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
684 )
685 )
686 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
687 new RangeError(
688 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
689 )
690 )
691 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
692 new TypeError('Invalid worker node tasks queue size: must be an integer')
693 )
694 await pool.destroy()
695 })
696
697 it('Verify that pool info is set', async () => {
698 let pool = new FixedThreadPool(
699 numberOfWorkers,
700 './tests/worker-files/thread/testWorker.mjs'
701 )
702 expect(pool.info).toStrictEqual({
703 version,
704 type: PoolTypes.fixed,
705 worker: WorkerTypes.thread,
706 started: true,
707 ready: true,
708 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
709 strategyRetries: 0,
710 minSize: numberOfWorkers,
711 maxSize: numberOfWorkers,
712 workerNodes: numberOfWorkers,
713 idleWorkerNodes: numberOfWorkers,
714 busyWorkerNodes: 0,
715 executedTasks: 0,
716 executingTasks: 0,
717 failedTasks: 0
718 })
719 await pool.destroy()
720 pool = new DynamicClusterPool(
721 Math.floor(numberOfWorkers / 2),
722 numberOfWorkers,
723 './tests/worker-files/cluster/testWorker.cjs'
724 )
725 expect(pool.info).toStrictEqual({
726 version,
727 type: PoolTypes.dynamic,
728 worker: WorkerTypes.cluster,
729 started: true,
730 ready: true,
731 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
732 strategyRetries: 0,
733 minSize: Math.floor(numberOfWorkers / 2),
734 maxSize: numberOfWorkers,
735 workerNodes: Math.floor(numberOfWorkers / 2),
736 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
737 busyWorkerNodes: 0,
738 executedTasks: 0,
739 executingTasks: 0,
740 failedTasks: 0
741 })
742 await pool.destroy()
743 })
744
745 it('Verify that pool worker tasks usage are initialized', async () => {
746 const pool = new FixedClusterPool(
747 numberOfWorkers,
748 './tests/worker-files/cluster/testWorker.cjs'
749 )
750 for (const workerNode of pool.workerNodes) {
751 expect(workerNode).toBeInstanceOf(WorkerNode)
752 expect(workerNode.usage).toStrictEqual({
753 tasks: {
754 executed: 0,
755 executing: 0,
756 queued: 0,
757 maxQueued: 0,
758 sequentiallyStolen: 0,
759 stolen: 0,
760 failed: 0
761 },
762 runTime: {
763 history: new CircularArray()
764 },
765 waitTime: {
766 history: new CircularArray()
767 },
768 elu: {
769 idle: {
770 history: new CircularArray()
771 },
772 active: {
773 history: new CircularArray()
774 }
775 }
776 })
777 }
778 await pool.destroy()
779 })
780
781 it('Verify that pool worker tasks queue are initialized', async () => {
782 let pool = new FixedClusterPool(
783 numberOfWorkers,
784 './tests/worker-files/cluster/testWorker.cjs'
785 )
786 for (const workerNode of pool.workerNodes) {
787 expect(workerNode).toBeInstanceOf(WorkerNode)
788 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
789 expect(workerNode.tasksQueue.size).toBe(0)
790 expect(workerNode.tasksQueue.maxSize).toBe(0)
791 }
792 await pool.destroy()
793 pool = new DynamicThreadPool(
794 Math.floor(numberOfWorkers / 2),
795 numberOfWorkers,
796 './tests/worker-files/thread/testWorker.mjs'
797 )
798 for (const workerNode of pool.workerNodes) {
799 expect(workerNode).toBeInstanceOf(WorkerNode)
800 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
801 expect(workerNode.tasksQueue.size).toBe(0)
802 expect(workerNode.tasksQueue.maxSize).toBe(0)
803 }
804 await pool.destroy()
805 })
806
807 it('Verify that pool worker info are initialized', async () => {
808 let pool = new FixedClusterPool(
809 numberOfWorkers,
810 './tests/worker-files/cluster/testWorker.cjs'
811 )
812 for (const workerNode of pool.workerNodes) {
813 expect(workerNode).toBeInstanceOf(WorkerNode)
814 expect(workerNode.info).toStrictEqual({
815 id: expect.any(Number),
816 type: WorkerTypes.cluster,
817 dynamic: false,
818 ready: true,
819 stealing: false
820 })
821 }
822 await pool.destroy()
823 pool = new DynamicThreadPool(
824 Math.floor(numberOfWorkers / 2),
825 numberOfWorkers,
826 './tests/worker-files/thread/testWorker.mjs'
827 )
828 for (const workerNode of pool.workerNodes) {
829 expect(workerNode).toBeInstanceOf(WorkerNode)
830 expect(workerNode.info).toStrictEqual({
831 id: expect.any(Number),
832 type: WorkerTypes.thread,
833 dynamic: false,
834 ready: true,
835 stealing: false
836 })
837 }
838 await pool.destroy()
839 })
840
841 it('Verify that pool statuses are checked at start or destroy', async () => {
842 const pool = new FixedThreadPool(
843 numberOfWorkers,
844 './tests/worker-files/thread/testWorker.mjs'
845 )
846 expect(pool.info.started).toBe(true)
847 expect(pool.info.ready).toBe(true)
848 expect(() => pool.start()).toThrow(
849 new Error('Cannot start an already started pool')
850 )
851 await pool.destroy()
852 expect(pool.info.started).toBe(false)
853 expect(pool.info.ready).toBe(false)
854 await expect(pool.destroy()).rejects.toThrow(
855 new Error('Cannot destroy an already destroyed pool')
856 )
857 })
858
859 it('Verify that pool can be started after initialization', async () => {
860 const pool = new FixedClusterPool(
861 numberOfWorkers,
862 './tests/worker-files/cluster/testWorker.cjs',
863 {
864 startWorkers: false
865 }
866 )
867 expect(pool.info.started).toBe(false)
868 expect(pool.info.ready).toBe(false)
869 expect(pool.workerNodes).toStrictEqual([])
870 expect(pool.readyEventEmitted).toBe(false)
871 await expect(pool.execute()).rejects.toThrow(
872 new Error('Cannot execute a task on not started pool')
873 )
874 pool.start()
875 expect(pool.info.started).toBe(true)
876 expect(pool.info.ready).toBe(true)
877 await waitPoolEvents(pool, PoolEvents.ready, 1)
878 expect(pool.readyEventEmitted).toBe(true)
879 expect(pool.workerNodes.length).toBe(numberOfWorkers)
880 for (const workerNode of pool.workerNodes) {
881 expect(workerNode).toBeInstanceOf(WorkerNode)
882 }
883 await pool.destroy()
884 })
885
886 it('Verify that pool execute() arguments are checked', async () => {
887 const pool = new FixedClusterPool(
888 numberOfWorkers,
889 './tests/worker-files/cluster/testWorker.cjs'
890 )
891 await expect(pool.execute(undefined, 0)).rejects.toThrow(
892 new TypeError('name argument must be a string')
893 )
894 await expect(pool.execute(undefined, '')).rejects.toThrow(
895 new TypeError('name argument must not be an empty string')
896 )
897 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
898 new TypeError('transferList argument must be an array')
899 )
900 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
901 "Task function 'unknown' not found"
902 )
903 await pool.destroy()
904 await expect(pool.execute()).rejects.toThrow(
905 new Error('Cannot execute a task on not started pool')
906 )
907 })
908
909 it('Verify that pool worker tasks usage are computed', async () => {
910 const pool = new FixedClusterPool(
911 numberOfWorkers,
912 './tests/worker-files/cluster/testWorker.cjs'
913 )
914 const promises = new Set()
915 const maxMultiplier = 2
916 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
917 promises.add(pool.execute())
918 }
919 for (const workerNode of pool.workerNodes) {
920 expect(workerNode.usage).toStrictEqual({
921 tasks: {
922 executed: 0,
923 executing: maxMultiplier,
924 queued: 0,
925 maxQueued: 0,
926 sequentiallyStolen: 0,
927 stolen: 0,
928 failed: 0
929 },
930 runTime: {
931 history: expect.any(CircularArray)
932 },
933 waitTime: {
934 history: expect.any(CircularArray)
935 },
936 elu: {
937 idle: {
938 history: expect.any(CircularArray)
939 },
940 active: {
941 history: expect.any(CircularArray)
942 }
943 }
944 })
945 }
946 await Promise.all(promises)
947 for (const workerNode of pool.workerNodes) {
948 expect(workerNode.usage).toStrictEqual({
949 tasks: {
950 executed: maxMultiplier,
951 executing: 0,
952 queued: 0,
953 maxQueued: 0,
954 sequentiallyStolen: 0,
955 stolen: 0,
956 failed: 0
957 },
958 runTime: {
959 history: expect.any(CircularArray)
960 },
961 waitTime: {
962 history: expect.any(CircularArray)
963 },
964 elu: {
965 idle: {
966 history: expect.any(CircularArray)
967 },
968 active: {
969 history: expect.any(CircularArray)
970 }
971 }
972 })
973 }
974 await pool.destroy()
975 })
976
977 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
978 const pool = new DynamicThreadPool(
979 Math.floor(numberOfWorkers / 2),
980 numberOfWorkers,
981 './tests/worker-files/thread/testWorker.mjs'
982 )
983 const promises = new Set()
984 const maxMultiplier = 2
985 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
986 promises.add(pool.execute())
987 }
988 await Promise.all(promises)
989 for (const workerNode of pool.workerNodes) {
990 expect(workerNode.usage).toStrictEqual({
991 tasks: {
992 executed: expect.any(Number),
993 executing: 0,
994 queued: 0,
995 maxQueued: 0,
996 sequentiallyStolen: 0,
997 stolen: 0,
998 failed: 0
999 },
1000 runTime: {
1001 history: expect.any(CircularArray)
1002 },
1003 waitTime: {
1004 history: expect.any(CircularArray)
1005 },
1006 elu: {
1007 idle: {
1008 history: expect.any(CircularArray)
1009 },
1010 active: {
1011 history: expect.any(CircularArray)
1012 }
1013 }
1014 })
1015 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1016 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1017 numberOfWorkers * maxMultiplier
1018 )
1019 expect(workerNode.usage.runTime.history.length).toBe(0)
1020 expect(workerNode.usage.waitTime.history.length).toBe(0)
1021 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1022 expect(workerNode.usage.elu.active.history.length).toBe(0)
1023 }
1024 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1025 for (const workerNode of pool.workerNodes) {
1026 expect(workerNode.usage).toStrictEqual({
1027 tasks: {
1028 executed: 0,
1029 executing: 0,
1030 queued: 0,
1031 maxQueued: 0,
1032 sequentiallyStolen: 0,
1033 stolen: 0,
1034 failed: 0
1035 },
1036 runTime: {
1037 history: expect.any(CircularArray)
1038 },
1039 waitTime: {
1040 history: expect.any(CircularArray)
1041 },
1042 elu: {
1043 idle: {
1044 history: expect.any(CircularArray)
1045 },
1046 active: {
1047 history: expect.any(CircularArray)
1048 }
1049 }
1050 })
1051 expect(workerNode.usage.runTime.history.length).toBe(0)
1052 expect(workerNode.usage.waitTime.history.length).toBe(0)
1053 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1054 expect(workerNode.usage.elu.active.history.length).toBe(0)
1055 }
1056 await pool.destroy()
1057 })
1058
1059 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1060 const pool = new DynamicClusterPool(
1061 Math.floor(numberOfWorkers / 2),
1062 numberOfWorkers,
1063 './tests/worker-files/cluster/testWorker.cjs'
1064 )
1065 expect(pool.emitter.eventNames()).toStrictEqual([])
1066 let poolInfo
1067 let poolReady = 0
1068 pool.emitter.on(PoolEvents.ready, info => {
1069 ++poolReady
1070 poolInfo = info
1071 })
1072 await waitPoolEvents(pool, PoolEvents.ready, 1)
1073 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1074 expect(poolReady).toBe(1)
1075 expect(poolInfo).toStrictEqual({
1076 version,
1077 type: PoolTypes.dynamic,
1078 worker: WorkerTypes.cluster,
1079 started: true,
1080 ready: true,
1081 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1082 strategyRetries: expect.any(Number),
1083 minSize: expect.any(Number),
1084 maxSize: expect.any(Number),
1085 workerNodes: expect.any(Number),
1086 idleWorkerNodes: expect.any(Number),
1087 busyWorkerNodes: expect.any(Number),
1088 executedTasks: expect.any(Number),
1089 executingTasks: expect.any(Number),
1090 failedTasks: expect.any(Number)
1091 })
1092 await pool.destroy()
1093 })
1094
1095 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1096 const pool = new FixedThreadPool(
1097 numberOfWorkers,
1098 './tests/worker-files/thread/testWorker.mjs'
1099 )
1100 expect(pool.emitter.eventNames()).toStrictEqual([])
1101 const promises = new Set()
1102 let poolBusy = 0
1103 let poolInfo
1104 pool.emitter.on(PoolEvents.busy, info => {
1105 ++poolBusy
1106 poolInfo = info
1107 })
1108 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1109 for (let i = 0; i < numberOfWorkers * 2; i++) {
1110 promises.add(pool.execute())
1111 }
1112 await Promise.all(promises)
1113 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1114 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1115 expect(poolBusy).toBe(numberOfWorkers + 1)
1116 expect(poolInfo).toStrictEqual({
1117 version,
1118 type: PoolTypes.fixed,
1119 worker: WorkerTypes.thread,
1120 started: true,
1121 ready: true,
1122 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1123 strategyRetries: expect.any(Number),
1124 minSize: expect.any(Number),
1125 maxSize: expect.any(Number),
1126 workerNodes: expect.any(Number),
1127 idleWorkerNodes: expect.any(Number),
1128 busyWorkerNodes: expect.any(Number),
1129 executedTasks: expect.any(Number),
1130 executingTasks: expect.any(Number),
1131 failedTasks: expect.any(Number)
1132 })
1133 await pool.destroy()
1134 })
1135
1136 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1137 const pool = new DynamicThreadPool(
1138 Math.floor(numberOfWorkers / 2),
1139 numberOfWorkers,
1140 './tests/worker-files/thread/testWorker.mjs'
1141 )
1142 expect(pool.emitter.eventNames()).toStrictEqual([])
1143 const promises = new Set()
1144 let poolFull = 0
1145 let poolInfo
1146 pool.emitter.on(PoolEvents.full, info => {
1147 ++poolFull
1148 poolInfo = info
1149 })
1150 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1151 for (let i = 0; i < numberOfWorkers * 2; i++) {
1152 promises.add(pool.execute())
1153 }
1154 await Promise.all(promises)
1155 expect(poolFull).toBe(1)
1156 expect(poolInfo).toStrictEqual({
1157 version,
1158 type: PoolTypes.dynamic,
1159 worker: WorkerTypes.thread,
1160 started: true,
1161 ready: true,
1162 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1163 strategyRetries: expect.any(Number),
1164 minSize: expect.any(Number),
1165 maxSize: expect.any(Number),
1166 workerNodes: expect.any(Number),
1167 idleWorkerNodes: expect.any(Number),
1168 busyWorkerNodes: expect.any(Number),
1169 executedTasks: expect.any(Number),
1170 executingTasks: expect.any(Number),
1171 failedTasks: expect.any(Number)
1172 })
1173 await pool.destroy()
1174 })
1175
1176 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1177 const pool = new FixedThreadPool(
1178 numberOfWorkers,
1179 './tests/worker-files/thread/testWorker.mjs',
1180 {
1181 enableTasksQueue: true
1182 }
1183 )
1184 stub(pool, 'hasBackPressure').returns(true)
1185 expect(pool.emitter.eventNames()).toStrictEqual([])
1186 const promises = new Set()
1187 let poolBackPressure = 0
1188 let poolInfo
1189 pool.emitter.on(PoolEvents.backPressure, info => {
1190 ++poolBackPressure
1191 poolInfo = info
1192 })
1193 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1194 for (let i = 0; i < numberOfWorkers + 1; i++) {
1195 promises.add(pool.execute())
1196 }
1197 await Promise.all(promises)
1198 expect(poolBackPressure).toBe(1)
1199 expect(poolInfo).toStrictEqual({
1200 version,
1201 type: PoolTypes.fixed,
1202 worker: WorkerTypes.thread,
1203 started: true,
1204 ready: true,
1205 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1206 strategyRetries: expect.any(Number),
1207 minSize: expect.any(Number),
1208 maxSize: expect.any(Number),
1209 workerNodes: expect.any(Number),
1210 idleWorkerNodes: expect.any(Number),
1211 stealingWorkerNodes: expect.any(Number),
1212 busyWorkerNodes: expect.any(Number),
1213 executedTasks: expect.any(Number),
1214 executingTasks: expect.any(Number),
1215 maxQueuedTasks: expect.any(Number),
1216 queuedTasks: expect.any(Number),
1217 backPressure: true,
1218 stolenTasks: expect.any(Number),
1219 failedTasks: expect.any(Number)
1220 })
1221 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1222 await pool.destroy()
1223 })
1224
1225 it('Verify that destroy() waits for queued tasks to finish', async () => {
1226 const tasksFinishedTimeout = 2500
1227 const pool = new FixedThreadPool(
1228 numberOfWorkers,
1229 './tests/worker-files/thread/asyncWorker.mjs',
1230 {
1231 enableTasksQueue: true,
1232 tasksQueueOptions: { tasksFinishedTimeout }
1233 }
1234 )
1235 const maxMultiplier = 4
1236 let tasksFinished = 0
1237 for (const workerNode of pool.workerNodes) {
1238 workerNode.on('taskFinished', () => {
1239 ++tasksFinished
1240 })
1241 }
1242 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1243 pool.execute()
1244 }
1245 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1246 const startTime = performance.now()
1247 await pool.destroy()
1248 const elapsedTime = performance.now() - startTime
1249 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1250 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1251 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1252 })
1253
1254 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1255 const tasksFinishedTimeout = 1000
1256 const pool = new FixedThreadPool(
1257 numberOfWorkers,
1258 './tests/worker-files/thread/asyncWorker.mjs',
1259 {
1260 enableTasksQueue: true,
1261 tasksQueueOptions: { tasksFinishedTimeout }
1262 }
1263 )
1264 const maxMultiplier = 4
1265 let tasksFinished = 0
1266 for (const workerNode of pool.workerNodes) {
1267 workerNode.on('taskFinished', () => {
1268 ++tasksFinished
1269 })
1270 }
1271 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1272 pool.execute()
1273 }
1274 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1275 const startTime = performance.now()
1276 await pool.destroy()
1277 const elapsedTime = performance.now() - startTime
1278 expect(tasksFinished).toBe(0)
1279 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1280 })
1281
1282 it('Verify that pool asynchronous resource track tasks execution', async () => {
1283 let taskAsyncId
1284 let initCalls = 0
1285 let beforeCalls = 0
1286 let afterCalls = 0
1287 let resolveCalls = 0
1288 const hook = createHook({
1289 init (asyncId, type) {
1290 if (type === 'poolifier:task') {
1291 initCalls++
1292 taskAsyncId = asyncId
1293 }
1294 },
1295 before (asyncId) {
1296 if (asyncId === taskAsyncId) beforeCalls++
1297 },
1298 after (asyncId) {
1299 if (asyncId === taskAsyncId) afterCalls++
1300 },
1301 promiseResolve () {
1302 if (executionAsyncId() === taskAsyncId) resolveCalls++
1303 }
1304 })
1305 const pool = new FixedThreadPool(
1306 numberOfWorkers,
1307 './tests/worker-files/thread/testWorker.mjs'
1308 )
1309 hook.enable()
1310 await pool.execute()
1311 hook.disable()
1312 expect(initCalls).toBe(1)
1313 expect(beforeCalls).toBe(1)
1314 expect(afterCalls).toBe(1)
1315 expect(resolveCalls).toBe(1)
1316 await pool.destroy()
1317 })
1318
1319 it('Verify that hasTaskFunction() is working', async () => {
1320 const dynamicThreadPool = new DynamicThreadPool(
1321 Math.floor(numberOfWorkers / 2),
1322 numberOfWorkers,
1323 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1324 )
1325 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1326 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1327 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1328 true
1329 )
1330 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1331 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1332 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1333 await dynamicThreadPool.destroy()
1334 const fixedClusterPool = new FixedClusterPool(
1335 numberOfWorkers,
1336 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1337 )
1338 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1339 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1340 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1341 true
1342 )
1343 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1344 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1345 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1346 await fixedClusterPool.destroy()
1347 })
1348
1349 it('Verify that addTaskFunction() is working', async () => {
1350 const dynamicThreadPool = new DynamicThreadPool(
1351 Math.floor(numberOfWorkers / 2),
1352 numberOfWorkers,
1353 './tests/worker-files/thread/testWorker.mjs'
1354 )
1355 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1356 await expect(
1357 dynamicThreadPool.addTaskFunction(0, () => {})
1358 ).rejects.toThrow(new TypeError('name argument must be a string'))
1359 await expect(
1360 dynamicThreadPool.addTaskFunction('', () => {})
1361 ).rejects.toThrow(
1362 new TypeError('name argument must not be an empty string')
1363 )
1364 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1365 new TypeError('fn argument must be a function')
1366 )
1367 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1368 new TypeError('fn argument must be a function')
1369 )
1370 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1371 DEFAULT_TASK_NAME,
1372 'test'
1373 ])
1374 const echoTaskFunction = data => {
1375 return data
1376 }
1377 await expect(
1378 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1379 ).resolves.toBe(true)
1380 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1381 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1382 echoTaskFunction
1383 )
1384 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1385 DEFAULT_TASK_NAME,
1386 'test',
1387 'echo'
1388 ])
1389 const taskFunctionData = { test: 'test' }
1390 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1391 expect(echoResult).toStrictEqual(taskFunctionData)
1392 for (const workerNode of dynamicThreadPool.workerNodes) {
1393 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1394 tasks: {
1395 executed: expect.any(Number),
1396 executing: 0,
1397 queued: 0,
1398 sequentiallyStolen: 0,
1399 stolen: 0,
1400 failed: 0
1401 },
1402 runTime: {
1403 history: new CircularArray()
1404 },
1405 waitTime: {
1406 history: new CircularArray()
1407 },
1408 elu: {
1409 idle: {
1410 history: new CircularArray()
1411 },
1412 active: {
1413 history: new CircularArray()
1414 }
1415 }
1416 })
1417 }
1418 await dynamicThreadPool.destroy()
1419 })
1420
1421 it('Verify that removeTaskFunction() is working', async () => {
1422 const dynamicThreadPool = new DynamicThreadPool(
1423 Math.floor(numberOfWorkers / 2),
1424 numberOfWorkers,
1425 './tests/worker-files/thread/testWorker.mjs'
1426 )
1427 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1428 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1429 DEFAULT_TASK_NAME,
1430 'test'
1431 ])
1432 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1433 new Error('Cannot remove a task function not handled on the pool side')
1434 )
1435 const echoTaskFunction = data => {
1436 return data
1437 }
1438 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1439 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1440 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1441 echoTaskFunction
1442 )
1443 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1444 DEFAULT_TASK_NAME,
1445 'test',
1446 'echo'
1447 ])
1448 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1449 true
1450 )
1451 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1452 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1453 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1454 DEFAULT_TASK_NAME,
1455 'test'
1456 ])
1457 await dynamicThreadPool.destroy()
1458 })
1459
1460 it('Verify that listTaskFunctionNames() is working', async () => {
1461 const dynamicThreadPool = new DynamicThreadPool(
1462 Math.floor(numberOfWorkers / 2),
1463 numberOfWorkers,
1464 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1465 )
1466 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1467 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1468 DEFAULT_TASK_NAME,
1469 'jsonIntegerSerialization',
1470 'factorial',
1471 'fibonacci'
1472 ])
1473 await dynamicThreadPool.destroy()
1474 const fixedClusterPool = new FixedClusterPool(
1475 numberOfWorkers,
1476 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1477 )
1478 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1479 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1480 DEFAULT_TASK_NAME,
1481 'jsonIntegerSerialization',
1482 'factorial',
1483 'fibonacci'
1484 ])
1485 await fixedClusterPool.destroy()
1486 })
1487
1488 it('Verify that setDefaultTaskFunction() is working', async () => {
1489 const dynamicThreadPool = new DynamicThreadPool(
1490 Math.floor(numberOfWorkers / 2),
1491 numberOfWorkers,
1492 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1493 )
1494 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1495 const workerId = dynamicThreadPool.workerNodes[0].info.id
1496 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1497 new Error(
1498 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1499 )
1500 )
1501 await expect(
1502 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1503 ).rejects.toThrow(
1504 new Error(
1505 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1506 )
1507 )
1508 await expect(
1509 dynamicThreadPool.setDefaultTaskFunction('unknown')
1510 ).rejects.toThrow(
1511 new Error(
1512 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1513 )
1514 )
1515 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1516 DEFAULT_TASK_NAME,
1517 'jsonIntegerSerialization',
1518 'factorial',
1519 'fibonacci'
1520 ])
1521 await expect(
1522 dynamicThreadPool.setDefaultTaskFunction('factorial')
1523 ).resolves.toBe(true)
1524 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1525 DEFAULT_TASK_NAME,
1526 'factorial',
1527 'jsonIntegerSerialization',
1528 'fibonacci'
1529 ])
1530 await expect(
1531 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1532 ).resolves.toBe(true)
1533 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1534 DEFAULT_TASK_NAME,
1535 'fibonacci',
1536 'jsonIntegerSerialization',
1537 'factorial'
1538 ])
1539 await dynamicThreadPool.destroy()
1540 })
1541
1542 it('Verify that multiple task functions worker is working', async () => {
1543 const pool = new DynamicClusterPool(
1544 Math.floor(numberOfWorkers / 2),
1545 numberOfWorkers,
1546 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1547 )
1548 const data = { n: 10 }
1549 const result0 = await pool.execute(data)
1550 expect(result0).toStrictEqual({ ok: 1 })
1551 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1552 expect(result1).toStrictEqual({ ok: 1 })
1553 const result2 = await pool.execute(data, 'factorial')
1554 expect(result2).toBe(3628800)
1555 const result3 = await pool.execute(data, 'fibonacci')
1556 expect(result3).toBe(55)
1557 expect(pool.info.executingTasks).toBe(0)
1558 expect(pool.info.executedTasks).toBe(4)
1559 for (const workerNode of pool.workerNodes) {
1560 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1561 DEFAULT_TASK_NAME,
1562 'jsonIntegerSerialization',
1563 'factorial',
1564 'fibonacci'
1565 ])
1566 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1567 for (const name of pool.listTaskFunctionNames()) {
1568 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1569 tasks: {
1570 executed: expect.any(Number),
1571 executing: 0,
1572 failed: 0,
1573 queued: 0,
1574 sequentiallyStolen: 0,
1575 stolen: 0
1576 },
1577 runTime: {
1578 history: expect.any(CircularArray)
1579 },
1580 waitTime: {
1581 history: expect.any(CircularArray)
1582 },
1583 elu: {
1584 idle: {
1585 history: expect.any(CircularArray)
1586 },
1587 active: {
1588 history: expect.any(CircularArray)
1589 }
1590 }
1591 })
1592 expect(
1593 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1594 ).toBeGreaterThan(0)
1595 }
1596 expect(
1597 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1598 ).toStrictEqual(
1599 workerNode.getTaskFunctionWorkerUsage(
1600 workerNode.info.taskFunctionNames[1]
1601 )
1602 )
1603 }
1604 await pool.destroy()
1605 })
1606
1607 it('Verify sendKillMessageToWorker()', async () => {
1608 const pool = new DynamicClusterPool(
1609 Math.floor(numberOfWorkers / 2),
1610 numberOfWorkers,
1611 './tests/worker-files/cluster/testWorker.cjs'
1612 )
1613 const workerNodeKey = 0
1614 await expect(
1615 pool.sendKillMessageToWorker(workerNodeKey)
1616 ).resolves.toBeUndefined()
1617 await pool.destroy()
1618 })
1619
1620 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1621 const pool = new DynamicClusterPool(
1622 Math.floor(numberOfWorkers / 2),
1623 numberOfWorkers,
1624 './tests/worker-files/cluster/testWorker.cjs'
1625 )
1626 const workerNodeKey = 0
1627 await expect(
1628 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1629 taskFunctionOperation: 'add',
1630 taskFunctionName: 'empty',
1631 taskFunction: (() => {}).toString()
1632 })
1633 ).resolves.toBe(true)
1634 expect(
1635 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1636 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1637 await pool.destroy()
1638 })
1639
1640 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1641 const pool = new DynamicClusterPool(
1642 Math.floor(numberOfWorkers / 2),
1643 numberOfWorkers,
1644 './tests/worker-files/cluster/testWorker.cjs'
1645 )
1646 await expect(
1647 pool.sendTaskFunctionOperationToWorkers({
1648 taskFunctionOperation: 'add',
1649 taskFunctionName: 'empty',
1650 taskFunction: (() => {}).toString()
1651 })
1652 ).resolves.toBe(true)
1653 for (const workerNode of pool.workerNodes) {
1654 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1655 DEFAULT_TASK_NAME,
1656 'test',
1657 'empty'
1658 ])
1659 }
1660 await pool.destroy()
1661 })
1662 })