docs: refine changelog entries
[poolifier.git] / tests / pools / abstract-pool.test.mjs
... / ...
CommitLineData
1import { EventEmitterAsyncResource } from 'node:events'
2import { dirname, join } from 'node:path'
3import { readFileSync } from 'node:fs'
4import { fileURLToPath } from 'node:url'
5import { createHook, executionAsyncId } from 'node:async_hooks'
6import { expect } from 'expect'
7import { restore, stub } from 'sinon'
8import {
9 DynamicClusterPool,
10 DynamicThreadPool,
11 FixedClusterPool,
12 FixedThreadPool,
13 PoolEvents,
14 PoolTypes,
15 WorkerChoiceStrategies,
16 WorkerTypes
17} from '../../lib/index.cjs'
18import { CircularArray } from '../../lib/circular-array.cjs'
19import { Deque } from '../../lib/deque.cjs'
20import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
21import { waitPoolEvents } from '../test-utils.cjs'
22import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23
24describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
26 readFileSync(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
28 'utf8'
29 )
30 ).version
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
33 isMain () {
34 return false
35 }
36 }
37
38 afterEach(() => {
39 restore()
40 })
41
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
44 numberOfWorkers,
45 './tests/worker-files/thread/testWorker.mjs'
46 )
47 expect(pool).toBeInstanceOf(FixedThreadPool)
48 await pool.destroy()
49 })
50
51 it('Verify that pool cannot be created from a non main thread/process', () => {
52 expect(
53 () =>
54 new StubPoolWithIsMain(
55 numberOfWorkers,
56 './tests/worker-files/thread/testWorker.mjs',
57 {
58 errorHandler: e => console.error(e)
59 }
60 )
61 ).toThrow(
62 new Error(
63 'Cannot start a pool from a worker with the same type as the pool'
64 )
65 )
66 })
67
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
70 numberOfWorkers,
71 './tests/worker-files/thread/testWorker.mjs'
72 )
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
76 await pool.destroy()
77 })
78
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
82 )
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
85 )
86 expect(
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
89 })
90
91 it('Verify that numberOfWorkers is checked', () => {
92 expect(
93 () =>
94 new FixedThreadPool(
95 undefined,
96 './tests/worker-files/thread/testWorker.mjs'
97 )
98 ).toThrow(
99 new Error(
100 'Cannot instantiate a pool without specifying the number of workers'
101 )
102 )
103 })
104
105 it('Verify that a negative number of workers is checked', () => {
106 expect(
107 () =>
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
109 ).toThrow(
110 new RangeError(
111 'Cannot instantiate a pool with a negative number of workers'
112 )
113 )
114 })
115
116 it('Verify that a non integer number of workers is checked', () => {
117 expect(
118 () =>
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
120 ).toThrow(
121 new TypeError(
122 'Cannot instantiate a pool with a non safe integer number of workers'
123 )
124 )
125 })
126
127 it('Verify that pool arguments number and pool type are checked', () => {
128 expect(
129 () =>
130 new FixedThreadPool(
131 numberOfWorkers,
132 './tests/worker-files/thread/testWorker.mjs',
133 undefined,
134 numberOfWorkers * 2
135 )
136 ).toThrow(
137 new Error(
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
139 )
140 )
141 })
142
143 it('Verify that dynamic pool sizing is checked', () => {
144 expect(
145 () =>
146 new DynamicClusterPool(
147 1,
148 undefined,
149 './tests/worker-files/cluster/testWorker.cjs'
150 )
151 ).toThrow(
152 new TypeError(
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
154 )
155 )
156 expect(
157 () =>
158 new DynamicThreadPool(
159 0.5,
160 1,
161 './tests/worker-files/thread/testWorker.mjs'
162 )
163 ).toThrow(
164 new TypeError(
165 'Cannot instantiate a pool with a non safe integer number of workers'
166 )
167 )
168 expect(
169 () =>
170 new DynamicClusterPool(
171 0,
172 0.5,
173 './tests/worker-files/cluster/testWorker.cjs'
174 )
175 ).toThrow(
176 new TypeError(
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
178 )
179 )
180 expect(
181 () =>
182 new DynamicThreadPool(
183 2,
184 1,
185 './tests/worker-files/thread/testWorker.mjs'
186 )
187 ).toThrow(
188 new RangeError(
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
190 )
191 )
192 expect(
193 () =>
194 new DynamicThreadPool(
195 0,
196 0,
197 './tests/worker-files/thread/testWorker.mjs'
198 )
199 ).toThrow(
200 new RangeError(
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
202 )
203 )
204 expect(
205 () =>
206 new DynamicClusterPool(
207 1,
208 1,
209 './tests/worker-files/cluster/testWorker.cjs'
210 )
211 ).toThrow(
212 new RangeError(
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
214 )
215 )
216 })
217
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
220 numberOfWorkers,
221 './tests/worker-files/thread/testWorker.mjs'
222 )
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.emitter.eventNames()).toStrictEqual([])
225 expect(pool.opts).toStrictEqual({
226 startWorkers: true,
227 enableEvents: true,
228 restartWorkerOnError: true,
229 enableTasksQueue: false,
230 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 })
232 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
233 .workerChoiceStrategies) {
234 expect(workerChoiceStrategy.opts).toStrictEqual({
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
241 })
242 })
243 }
244 await pool.destroy()
245 const testHandler = () => console.info('test handler executed')
246 pool = new FixedThreadPool(
247 numberOfWorkers,
248 './tests/worker-files/thread/testWorker.mjs',
249 {
250 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
251 workerChoiceStrategyOptions: {
252 runTime: { median: true },
253 weights: { 0: 300, 1: 200 }
254 },
255 enableEvents: false,
256 restartWorkerOnError: false,
257 enableTasksQueue: true,
258 tasksQueueOptions: { concurrency: 2 },
259 messageHandler: testHandler,
260 errorHandler: testHandler,
261 onlineHandler: testHandler,
262 exitHandler: testHandler
263 }
264 )
265 expect(pool.emitter).toBeUndefined()
266 expect(pool.opts).toStrictEqual({
267 startWorkers: true,
268 enableEvents: false,
269 restartWorkerOnError: false,
270 enableTasksQueue: true,
271 tasksQueueOptions: {
272 concurrency: 2,
273 size: Math.pow(numberOfWorkers, 2),
274 taskStealing: true,
275 tasksStealingOnBackPressure: true,
276 tasksFinishedTimeout: 2000
277 },
278 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
279 workerChoiceStrategyOptions: {
280 runTime: { median: true },
281 weights: { 0: 300, 1: 200 }
282 },
283 onlineHandler: testHandler,
284 messageHandler: testHandler,
285 errorHandler: testHandler,
286 exitHandler: testHandler
287 })
288 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
289 .workerChoiceStrategies) {
290 expect(workerChoiceStrategy.opts).toStrictEqual({
291 runTime: { median: true },
292 waitTime: { median: false },
293 elu: { median: false },
294 weights: { 0: 300, 1: 200 }
295 })
296 }
297 await pool.destroy()
298 })
299
300 it('Verify that pool options are validated', () => {
301 expect(
302 () =>
303 new FixedThreadPool(
304 numberOfWorkers,
305 './tests/worker-files/thread/testWorker.mjs',
306 {
307 workerChoiceStrategy: 'invalidStrategy'
308 }
309 )
310 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
311 expect(
312 () =>
313 new FixedThreadPool(
314 numberOfWorkers,
315 './tests/worker-files/thread/testWorker.mjs',
316 {
317 workerChoiceStrategyOptions: { weights: {} }
318 }
319 )
320 ).toThrow(
321 new Error(
322 'Invalid worker choice strategy options: must have a weight for each worker node'
323 )
324 )
325 expect(
326 () =>
327 new FixedThreadPool(
328 numberOfWorkers,
329 './tests/worker-files/thread/testWorker.mjs',
330 {
331 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
332 }
333 )
334 ).toThrow(
335 new Error(
336 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
337 )
338 )
339 expect(
340 () =>
341 new FixedThreadPool(
342 numberOfWorkers,
343 './tests/worker-files/thread/testWorker.mjs',
344 {
345 enableTasksQueue: true,
346 tasksQueueOptions: 'invalidTasksQueueOptions'
347 }
348 )
349 ).toThrow(
350 new TypeError('Invalid tasks queue options: must be a plain object')
351 )
352 expect(
353 () =>
354 new FixedThreadPool(
355 numberOfWorkers,
356 './tests/worker-files/thread/testWorker.mjs',
357 {
358 enableTasksQueue: true,
359 tasksQueueOptions: { concurrency: 0 }
360 }
361 )
362 ).toThrow(
363 new RangeError(
364 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
365 )
366 )
367 expect(
368 () =>
369 new FixedThreadPool(
370 numberOfWorkers,
371 './tests/worker-files/thread/testWorker.mjs',
372 {
373 enableTasksQueue: true,
374 tasksQueueOptions: { concurrency: -1 }
375 }
376 )
377 ).toThrow(
378 new RangeError(
379 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
380 )
381 )
382 expect(
383 () =>
384 new FixedThreadPool(
385 numberOfWorkers,
386 './tests/worker-files/thread/testWorker.mjs',
387 {
388 enableTasksQueue: true,
389 tasksQueueOptions: { concurrency: 0.2 }
390 }
391 )
392 ).toThrow(
393 new TypeError('Invalid worker node tasks concurrency: must be an integer')
394 )
395 expect(
396 () =>
397 new FixedThreadPool(
398 numberOfWorkers,
399 './tests/worker-files/thread/testWorker.mjs',
400 {
401 enableTasksQueue: true,
402 tasksQueueOptions: { size: 0 }
403 }
404 )
405 ).toThrow(
406 new RangeError(
407 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
408 )
409 )
410 expect(
411 () =>
412 new FixedThreadPool(
413 numberOfWorkers,
414 './tests/worker-files/thread/testWorker.mjs',
415 {
416 enableTasksQueue: true,
417 tasksQueueOptions: { size: -1 }
418 }
419 )
420 ).toThrow(
421 new RangeError(
422 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
423 )
424 )
425 expect(
426 () =>
427 new FixedThreadPool(
428 numberOfWorkers,
429 './tests/worker-files/thread/testWorker.mjs',
430 {
431 enableTasksQueue: true,
432 tasksQueueOptions: { size: 0.2 }
433 }
434 )
435 ).toThrow(
436 new TypeError('Invalid worker node tasks queue size: must be an integer')
437 )
438 })
439
440 it('Verify that pool worker choice strategy options can be set', async () => {
441 const pool = new FixedThreadPool(
442 numberOfWorkers,
443 './tests/worker-files/thread/testWorker.mjs',
444 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
445 )
446 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
447 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
448 .workerChoiceStrategies) {
449 expect(workerChoiceStrategy.opts).toStrictEqual({
450 runTime: { median: false },
451 waitTime: { median: false },
452 elu: { median: false },
453 weights: expect.objectContaining({
454 0: expect.any(Number),
455 [pool.info.maxSize - 1]: expect.any(Number)
456 })
457 })
458 }
459 expect(
460 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
461 ).toStrictEqual({
462 runTime: {
463 aggregate: true,
464 average: true,
465 median: false
466 },
467 waitTime: {
468 aggregate: false,
469 average: false,
470 median: false
471 },
472 elu: {
473 aggregate: true,
474 average: true,
475 median: false
476 }
477 })
478 pool.setWorkerChoiceStrategyOptions({
479 runTime: { median: true },
480 elu: { median: true }
481 })
482 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
483 runTime: { median: true },
484 elu: { median: true }
485 })
486 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
487 .workerChoiceStrategies) {
488 expect(workerChoiceStrategy.opts).toStrictEqual({
489 runTime: { median: true },
490 waitTime: { median: false },
491 elu: { median: true },
492 weights: expect.objectContaining({
493 0: expect.any(Number),
494 [pool.info.maxSize - 1]: expect.any(Number)
495 })
496 })
497 }
498 expect(
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
500 ).toStrictEqual({
501 runTime: {
502 aggregate: true,
503 average: false,
504 median: true
505 },
506 waitTime: {
507 aggregate: false,
508 average: false,
509 median: false
510 },
511 elu: {
512 aggregate: true,
513 average: false,
514 median: true
515 }
516 })
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: false },
519 elu: { median: false }
520 })
521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
522 runTime: { median: false },
523 elu: { median: false }
524 })
525 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
526 .workerChoiceStrategies) {
527 expect(workerChoiceStrategy.opts).toStrictEqual({
528 runTime: { median: false },
529 waitTime: { median: false },
530 elu: { median: false },
531 weights: expect.objectContaining({
532 0: expect.any(Number),
533 [pool.info.maxSize - 1]: expect.any(Number)
534 })
535 })
536 }
537 expect(
538 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
539 ).toStrictEqual({
540 runTime: {
541 aggregate: true,
542 average: true,
543 median: false
544 },
545 waitTime: {
546 aggregate: false,
547 average: false,
548 median: false
549 },
550 elu: {
551 aggregate: true,
552 average: true,
553 median: false
554 }
555 })
556 expect(() =>
557 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
558 ).toThrow(
559 new TypeError(
560 'Invalid worker choice strategy options: must be a plain object'
561 )
562 )
563 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
564 new Error(
565 'Invalid worker choice strategy options: must have a weight for each worker node'
566 )
567 )
568 expect(() =>
569 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
570 ).toThrow(
571 new Error(
572 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
573 )
574 )
575 await pool.destroy()
576 })
577
578 it('Verify that pool tasks queue can be enabled/disabled', async () => {
579 const pool = new FixedThreadPool(
580 numberOfWorkers,
581 './tests/worker-files/thread/testWorker.mjs'
582 )
583 expect(pool.opts.enableTasksQueue).toBe(false)
584 expect(pool.opts.tasksQueueOptions).toBeUndefined()
585 pool.enableTasksQueue(true)
586 expect(pool.opts.enableTasksQueue).toBe(true)
587 expect(pool.opts.tasksQueueOptions).toStrictEqual({
588 concurrency: 1,
589 size: Math.pow(numberOfWorkers, 2),
590 taskStealing: true,
591 tasksStealingOnBackPressure: true,
592 tasksFinishedTimeout: 2000
593 })
594 pool.enableTasksQueue(true, { concurrency: 2 })
595 expect(pool.opts.enableTasksQueue).toBe(true)
596 expect(pool.opts.tasksQueueOptions).toStrictEqual({
597 concurrency: 2,
598 size: Math.pow(numberOfWorkers, 2),
599 taskStealing: true,
600 tasksStealingOnBackPressure: true,
601 tasksFinishedTimeout: 2000
602 })
603 pool.enableTasksQueue(false)
604 expect(pool.opts.enableTasksQueue).toBe(false)
605 expect(pool.opts.tasksQueueOptions).toBeUndefined()
606 await pool.destroy()
607 })
608
609 it('Verify that pool tasks queue options can be set', async () => {
610 const pool = new FixedThreadPool(
611 numberOfWorkers,
612 './tests/worker-files/thread/testWorker.mjs',
613 { enableTasksQueue: true }
614 )
615 expect(pool.opts.tasksQueueOptions).toStrictEqual({
616 concurrency: 1,
617 size: Math.pow(numberOfWorkers, 2),
618 taskStealing: true,
619 tasksStealingOnBackPressure: true,
620 tasksFinishedTimeout: 2000
621 })
622 for (const workerNode of pool.workerNodes) {
623 expect(workerNode.tasksQueueBackPressureSize).toBe(
624 pool.opts.tasksQueueOptions.size
625 )
626 }
627 pool.setTasksQueueOptions({
628 concurrency: 2,
629 size: 2,
630 taskStealing: false,
631 tasksStealingOnBackPressure: false,
632 tasksFinishedTimeout: 3000
633 })
634 expect(pool.opts.tasksQueueOptions).toStrictEqual({
635 concurrency: 2,
636 size: 2,
637 taskStealing: false,
638 tasksStealingOnBackPressure: false,
639 tasksFinishedTimeout: 3000
640 })
641 for (const workerNode of pool.workerNodes) {
642 expect(workerNode.tasksQueueBackPressureSize).toBe(
643 pool.opts.tasksQueueOptions.size
644 )
645 }
646 pool.setTasksQueueOptions({
647 concurrency: 1,
648 taskStealing: true,
649 tasksStealingOnBackPressure: true
650 })
651 expect(pool.opts.tasksQueueOptions).toStrictEqual({
652 concurrency: 1,
653 size: Math.pow(numberOfWorkers, 2),
654 taskStealing: true,
655 tasksStealingOnBackPressure: true,
656 tasksFinishedTimeout: 2000
657 })
658 for (const workerNode of pool.workerNodes) {
659 expect(workerNode.tasksQueueBackPressureSize).toBe(
660 pool.opts.tasksQueueOptions.size
661 )
662 }
663 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
664 new TypeError('Invalid tasks queue options: must be a plain object')
665 )
666 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
667 new RangeError(
668 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
669 )
670 )
671 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
672 new RangeError(
673 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
674 )
675 )
676 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
677 new TypeError('Invalid worker node tasks concurrency: must be an integer')
678 )
679 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
680 new RangeError(
681 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
682 )
683 )
684 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
685 new RangeError(
686 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
687 )
688 )
689 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
690 new TypeError('Invalid worker node tasks queue size: must be an integer')
691 )
692 await pool.destroy()
693 })
694
695 it('Verify that pool info is set', async () => {
696 let pool = new FixedThreadPool(
697 numberOfWorkers,
698 './tests/worker-files/thread/testWorker.mjs'
699 )
700 expect(pool.info).toStrictEqual({
701 version,
702 type: PoolTypes.fixed,
703 worker: WorkerTypes.thread,
704 started: true,
705 ready: true,
706 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
707 minSize: numberOfWorkers,
708 maxSize: numberOfWorkers,
709 workerNodes: numberOfWorkers,
710 idleWorkerNodes: numberOfWorkers,
711 busyWorkerNodes: 0,
712 executedTasks: 0,
713 executingTasks: 0,
714 failedTasks: 0
715 })
716 await pool.destroy()
717 pool = new DynamicClusterPool(
718 Math.floor(numberOfWorkers / 2),
719 numberOfWorkers,
720 './tests/worker-files/cluster/testWorker.cjs'
721 )
722 expect(pool.info).toStrictEqual({
723 version,
724 type: PoolTypes.dynamic,
725 worker: WorkerTypes.cluster,
726 started: true,
727 ready: true,
728 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
729 minSize: Math.floor(numberOfWorkers / 2),
730 maxSize: numberOfWorkers,
731 workerNodes: Math.floor(numberOfWorkers / 2),
732 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
733 busyWorkerNodes: 0,
734 executedTasks: 0,
735 executingTasks: 0,
736 failedTasks: 0
737 })
738 await pool.destroy()
739 })
740
741 it('Verify that pool worker tasks usage are initialized', async () => {
742 const pool = new FixedClusterPool(
743 numberOfWorkers,
744 './tests/worker-files/cluster/testWorker.cjs'
745 )
746 for (const workerNode of pool.workerNodes) {
747 expect(workerNode).toBeInstanceOf(WorkerNode)
748 expect(workerNode.usage).toStrictEqual({
749 tasks: {
750 executed: 0,
751 executing: 0,
752 queued: 0,
753 maxQueued: 0,
754 sequentiallyStolen: 0,
755 stolen: 0,
756 failed: 0
757 },
758 runTime: {
759 history: new CircularArray()
760 },
761 waitTime: {
762 history: new CircularArray()
763 },
764 elu: {
765 idle: {
766 history: new CircularArray()
767 },
768 active: {
769 history: new CircularArray()
770 }
771 }
772 })
773 }
774 await pool.destroy()
775 })
776
777 it('Verify that pool worker tasks queue are initialized', async () => {
778 let pool = new FixedClusterPool(
779 numberOfWorkers,
780 './tests/worker-files/cluster/testWorker.cjs'
781 )
782 for (const workerNode of pool.workerNodes) {
783 expect(workerNode).toBeInstanceOf(WorkerNode)
784 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
785 expect(workerNode.tasksQueue.size).toBe(0)
786 expect(workerNode.tasksQueue.maxSize).toBe(0)
787 }
788 await pool.destroy()
789 pool = new DynamicThreadPool(
790 Math.floor(numberOfWorkers / 2),
791 numberOfWorkers,
792 './tests/worker-files/thread/testWorker.mjs'
793 )
794 for (const workerNode of pool.workerNodes) {
795 expect(workerNode).toBeInstanceOf(WorkerNode)
796 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
797 expect(workerNode.tasksQueue.size).toBe(0)
798 expect(workerNode.tasksQueue.maxSize).toBe(0)
799 }
800 await pool.destroy()
801 })
802
803 it('Verify that pool worker info are initialized', async () => {
804 let pool = new FixedClusterPool(
805 numberOfWorkers,
806 './tests/worker-files/cluster/testWorker.cjs'
807 )
808 for (const workerNode of pool.workerNodes) {
809 expect(workerNode).toBeInstanceOf(WorkerNode)
810 expect(workerNode.info).toStrictEqual({
811 id: expect.any(Number),
812 type: WorkerTypes.cluster,
813 dynamic: false,
814 ready: true,
815 stealing: false
816 })
817 }
818 await pool.destroy()
819 pool = new DynamicThreadPool(
820 Math.floor(numberOfWorkers / 2),
821 numberOfWorkers,
822 './tests/worker-files/thread/testWorker.mjs'
823 )
824 for (const workerNode of pool.workerNodes) {
825 expect(workerNode).toBeInstanceOf(WorkerNode)
826 expect(workerNode.info).toStrictEqual({
827 id: expect.any(Number),
828 type: WorkerTypes.thread,
829 dynamic: false,
830 ready: true,
831 stealing: false
832 })
833 }
834 await pool.destroy()
835 })
836
837 it('Verify that pool statuses are checked at start or destroy', async () => {
838 const pool = new FixedThreadPool(
839 numberOfWorkers,
840 './tests/worker-files/thread/testWorker.mjs'
841 )
842 expect(pool.info.started).toBe(true)
843 expect(pool.info.ready).toBe(true)
844 expect(() => pool.start()).toThrow(
845 new Error('Cannot start an already started pool')
846 )
847 await pool.destroy()
848 expect(pool.info.started).toBe(false)
849 expect(pool.info.ready).toBe(false)
850 await expect(pool.destroy()).rejects.toThrow(
851 new Error('Cannot destroy an already destroyed pool')
852 )
853 })
854
855 it('Verify that pool can be started after initialization', async () => {
856 const pool = new FixedClusterPool(
857 numberOfWorkers,
858 './tests/worker-files/cluster/testWorker.cjs',
859 {
860 startWorkers: false
861 }
862 )
863 expect(pool.info.started).toBe(false)
864 expect(pool.info.ready).toBe(false)
865 expect(pool.readyEventEmitted).toBe(false)
866 expect(pool.workerNodes).toStrictEqual([])
867 await expect(pool.execute()).rejects.toThrow(
868 new Error('Cannot execute a task on not started pool')
869 )
870 pool.start()
871 expect(pool.info.started).toBe(true)
872 expect(pool.info.ready).toBe(true)
873 await waitPoolEvents(pool, PoolEvents.ready, 1)
874 expect(pool.readyEventEmitted).toBe(true)
875 expect(pool.workerNodes.length).toBe(numberOfWorkers)
876 for (const workerNode of pool.workerNodes) {
877 expect(workerNode).toBeInstanceOf(WorkerNode)
878 }
879 await pool.destroy()
880 })
881
882 it('Verify that pool execute() arguments are checked', async () => {
883 const pool = new FixedClusterPool(
884 numberOfWorkers,
885 './tests/worker-files/cluster/testWorker.cjs'
886 )
887 await expect(pool.execute(undefined, 0)).rejects.toThrow(
888 new TypeError('name argument must be a string')
889 )
890 await expect(pool.execute(undefined, '')).rejects.toThrow(
891 new TypeError('name argument must not be an empty string')
892 )
893 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
894 new TypeError('transferList argument must be an array')
895 )
896 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
897 "Task function 'unknown' not found"
898 )
899 await pool.destroy()
900 await expect(pool.execute()).rejects.toThrow(
901 new Error('Cannot execute a task on not started pool')
902 )
903 })
904
905 it('Verify that pool worker tasks usage are computed', async () => {
906 const pool = new FixedClusterPool(
907 numberOfWorkers,
908 './tests/worker-files/cluster/testWorker.cjs'
909 )
910 const promises = new Set()
911 const maxMultiplier = 2
912 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
913 promises.add(pool.execute())
914 }
915 for (const workerNode of pool.workerNodes) {
916 expect(workerNode.usage).toStrictEqual({
917 tasks: {
918 executed: 0,
919 executing: maxMultiplier,
920 queued: 0,
921 maxQueued: 0,
922 sequentiallyStolen: 0,
923 stolen: 0,
924 failed: 0
925 },
926 runTime: {
927 history: expect.any(CircularArray)
928 },
929 waitTime: {
930 history: expect.any(CircularArray)
931 },
932 elu: {
933 idle: {
934 history: expect.any(CircularArray)
935 },
936 active: {
937 history: expect.any(CircularArray)
938 }
939 }
940 })
941 }
942 await Promise.all(promises)
943 for (const workerNode of pool.workerNodes) {
944 expect(workerNode.usage).toStrictEqual({
945 tasks: {
946 executed: maxMultiplier,
947 executing: 0,
948 queued: 0,
949 maxQueued: 0,
950 sequentiallyStolen: 0,
951 stolen: 0,
952 failed: 0
953 },
954 runTime: {
955 history: expect.any(CircularArray)
956 },
957 waitTime: {
958 history: expect.any(CircularArray)
959 },
960 elu: {
961 idle: {
962 history: expect.any(CircularArray)
963 },
964 active: {
965 history: expect.any(CircularArray)
966 }
967 }
968 })
969 }
970 await pool.destroy()
971 })
972
973 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
974 const pool = new DynamicThreadPool(
975 Math.floor(numberOfWorkers / 2),
976 numberOfWorkers,
977 './tests/worker-files/thread/testWorker.mjs'
978 )
979 const promises = new Set()
980 const maxMultiplier = 2
981 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
982 promises.add(pool.execute())
983 }
984 await Promise.all(promises)
985 for (const workerNode of pool.workerNodes) {
986 expect(workerNode.usage).toStrictEqual({
987 tasks: {
988 executed: expect.any(Number),
989 executing: 0,
990 queued: 0,
991 maxQueued: 0,
992 sequentiallyStolen: 0,
993 stolen: 0,
994 failed: 0
995 },
996 runTime: {
997 history: expect.any(CircularArray)
998 },
999 waitTime: {
1000 history: expect.any(CircularArray)
1001 },
1002 elu: {
1003 idle: {
1004 history: expect.any(CircularArray)
1005 },
1006 active: {
1007 history: expect.any(CircularArray)
1008 }
1009 }
1010 })
1011 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1012 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1013 numberOfWorkers * maxMultiplier
1014 )
1015 expect(workerNode.usage.runTime.history.length).toBe(0)
1016 expect(workerNode.usage.waitTime.history.length).toBe(0)
1017 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1018 expect(workerNode.usage.elu.active.history.length).toBe(0)
1019 }
1020 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1021 for (const workerNode of pool.workerNodes) {
1022 expect(workerNode.usage).toStrictEqual({
1023 tasks: {
1024 executed: 0,
1025 executing: 0,
1026 queued: 0,
1027 maxQueued: 0,
1028 sequentiallyStolen: 0,
1029 stolen: 0,
1030 failed: 0
1031 },
1032 runTime: {
1033 history: expect.any(CircularArray)
1034 },
1035 waitTime: {
1036 history: expect.any(CircularArray)
1037 },
1038 elu: {
1039 idle: {
1040 history: expect.any(CircularArray)
1041 },
1042 active: {
1043 history: expect.any(CircularArray)
1044 }
1045 }
1046 })
1047 expect(workerNode.usage.runTime.history.length).toBe(0)
1048 expect(workerNode.usage.waitTime.history.length).toBe(0)
1049 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1050 expect(workerNode.usage.elu.active.history.length).toBe(0)
1051 }
1052 await pool.destroy()
1053 })
1054
1055 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1056 const pool = new DynamicClusterPool(
1057 Math.floor(numberOfWorkers / 2),
1058 numberOfWorkers,
1059 './tests/worker-files/cluster/testWorker.cjs'
1060 )
1061 expect(pool.emitter.eventNames()).toStrictEqual([])
1062 let poolInfo
1063 let poolReady = 0
1064 pool.emitter.on(PoolEvents.ready, info => {
1065 ++poolReady
1066 poolInfo = info
1067 })
1068 await waitPoolEvents(pool, PoolEvents.ready, 1)
1069 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1070 expect(poolReady).toBe(1)
1071 expect(poolInfo).toStrictEqual({
1072 version,
1073 type: PoolTypes.dynamic,
1074 worker: WorkerTypes.cluster,
1075 started: true,
1076 ready: true,
1077 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1078 minSize: expect.any(Number),
1079 maxSize: expect.any(Number),
1080 workerNodes: expect.any(Number),
1081 idleWorkerNodes: expect.any(Number),
1082 busyWorkerNodes: expect.any(Number),
1083 executedTasks: expect.any(Number),
1084 executingTasks: expect.any(Number),
1085 failedTasks: expect.any(Number)
1086 })
1087 await pool.destroy()
1088 })
1089
1090 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1091 const pool = new FixedThreadPool(
1092 numberOfWorkers,
1093 './tests/worker-files/thread/testWorker.mjs'
1094 )
1095 expect(pool.emitter.eventNames()).toStrictEqual([])
1096 const promises = new Set()
1097 let poolBusy = 0
1098 let poolInfo
1099 pool.emitter.on(PoolEvents.busy, info => {
1100 ++poolBusy
1101 poolInfo = info
1102 })
1103 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1104 for (let i = 0; i < numberOfWorkers * 2; i++) {
1105 promises.add(pool.execute())
1106 }
1107 await Promise.all(promises)
1108 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1109 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1110 expect(poolBusy).toBe(numberOfWorkers + 1)
1111 expect(poolInfo).toStrictEqual({
1112 version,
1113 type: PoolTypes.fixed,
1114 worker: WorkerTypes.thread,
1115 started: true,
1116 ready: true,
1117 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1118 minSize: expect.any(Number),
1119 maxSize: expect.any(Number),
1120 workerNodes: expect.any(Number),
1121 idleWorkerNodes: expect.any(Number),
1122 busyWorkerNodes: expect.any(Number),
1123 executedTasks: expect.any(Number),
1124 executingTasks: expect.any(Number),
1125 failedTasks: expect.any(Number)
1126 })
1127 await pool.destroy()
1128 })
1129
1130 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1131 const pool = new DynamicThreadPool(
1132 Math.floor(numberOfWorkers / 2),
1133 numberOfWorkers,
1134 './tests/worker-files/thread/testWorker.mjs'
1135 )
1136 expect(pool.emitter.eventNames()).toStrictEqual([])
1137 const promises = new Set()
1138 let poolFull = 0
1139 let poolInfo
1140 pool.emitter.on(PoolEvents.full, info => {
1141 ++poolFull
1142 poolInfo = info
1143 })
1144 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1145 for (let i = 0; i < numberOfWorkers * 2; i++) {
1146 promises.add(pool.execute())
1147 }
1148 await Promise.all(promises)
1149 expect(poolFull).toBe(1)
1150 expect(poolInfo).toStrictEqual({
1151 version,
1152 type: PoolTypes.dynamic,
1153 worker: WorkerTypes.thread,
1154 started: true,
1155 ready: true,
1156 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1157 minSize: expect.any(Number),
1158 maxSize: expect.any(Number),
1159 workerNodes: expect.any(Number),
1160 idleWorkerNodes: expect.any(Number),
1161 busyWorkerNodes: expect.any(Number),
1162 executedTasks: expect.any(Number),
1163 executingTasks: expect.any(Number),
1164 failedTasks: expect.any(Number)
1165 })
1166 await pool.destroy()
1167 })
1168
1169 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1170 const pool = new FixedThreadPool(
1171 numberOfWorkers,
1172 './tests/worker-files/thread/testWorker.mjs',
1173 {
1174 enableTasksQueue: true
1175 }
1176 )
1177 stub(pool, 'hasBackPressure').returns(true)
1178 expect(pool.emitter.eventNames()).toStrictEqual([])
1179 const promises = new Set()
1180 let poolBackPressure = 0
1181 let poolInfo
1182 pool.emitter.on(PoolEvents.backPressure, info => {
1183 ++poolBackPressure
1184 poolInfo = info
1185 })
1186 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1187 for (let i = 0; i < numberOfWorkers + 1; i++) {
1188 promises.add(pool.execute())
1189 }
1190 await Promise.all(promises)
1191 expect(poolBackPressure).toBe(1)
1192 expect(poolInfo).toStrictEqual({
1193 version,
1194 type: PoolTypes.fixed,
1195 worker: WorkerTypes.thread,
1196 started: true,
1197 ready: true,
1198 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1199 minSize: expect.any(Number),
1200 maxSize: expect.any(Number),
1201 workerNodes: expect.any(Number),
1202 idleWorkerNodes: expect.any(Number),
1203 stealingWorkerNodes: expect.any(Number),
1204 busyWorkerNodes: expect.any(Number),
1205 executedTasks: expect.any(Number),
1206 executingTasks: expect.any(Number),
1207 maxQueuedTasks: expect.any(Number),
1208 queuedTasks: expect.any(Number),
1209 backPressure: true,
1210 stolenTasks: expect.any(Number),
1211 failedTasks: expect.any(Number)
1212 })
1213 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1214 await pool.destroy()
1215 })
1216
1217 it('Verify that destroy() waits for queued tasks to finish', async () => {
1218 const tasksFinishedTimeout = 2500
1219 const pool = new FixedThreadPool(
1220 numberOfWorkers,
1221 './tests/worker-files/thread/asyncWorker.mjs',
1222 {
1223 enableTasksQueue: true,
1224 tasksQueueOptions: { tasksFinishedTimeout }
1225 }
1226 )
1227 const maxMultiplier = 4
1228 let tasksFinished = 0
1229 for (const workerNode of pool.workerNodes) {
1230 workerNode.on('taskFinished', () => {
1231 ++tasksFinished
1232 })
1233 }
1234 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1235 pool.execute()
1236 }
1237 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1238 const startTime = performance.now()
1239 await pool.destroy()
1240 const elapsedTime = performance.now() - startTime
1241 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1242 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1243 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1244 })
1245
1246 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1247 const tasksFinishedTimeout = 1000
1248 const pool = new FixedThreadPool(
1249 numberOfWorkers,
1250 './tests/worker-files/thread/asyncWorker.mjs',
1251 {
1252 enableTasksQueue: true,
1253 tasksQueueOptions: { tasksFinishedTimeout }
1254 }
1255 )
1256 const maxMultiplier = 4
1257 let tasksFinished = 0
1258 for (const workerNode of pool.workerNodes) {
1259 workerNode.on('taskFinished', () => {
1260 ++tasksFinished
1261 })
1262 }
1263 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1264 pool.execute()
1265 }
1266 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1267 const startTime = performance.now()
1268 await pool.destroy()
1269 const elapsedTime = performance.now() - startTime
1270 expect(tasksFinished).toBe(0)
1271 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1272 })
1273
1274 it('Verify that pool asynchronous resource track tasks execution', async () => {
1275 let taskAsyncId
1276 let initCalls = 0
1277 let beforeCalls = 0
1278 let afterCalls = 0
1279 let resolveCalls = 0
1280 const hook = createHook({
1281 init (asyncId, type) {
1282 if (type === 'poolifier:task') {
1283 initCalls++
1284 taskAsyncId = asyncId
1285 }
1286 },
1287 before (asyncId) {
1288 if (asyncId === taskAsyncId) beforeCalls++
1289 },
1290 after (asyncId) {
1291 if (asyncId === taskAsyncId) afterCalls++
1292 },
1293 promiseResolve () {
1294 if (executionAsyncId() === taskAsyncId) resolveCalls++
1295 }
1296 })
1297 const pool = new FixedThreadPool(
1298 numberOfWorkers,
1299 './tests/worker-files/thread/testWorker.mjs'
1300 )
1301 hook.enable()
1302 await pool.execute()
1303 hook.disable()
1304 expect(initCalls).toBe(1)
1305 expect(beforeCalls).toBe(1)
1306 expect(afterCalls).toBe(1)
1307 expect(resolveCalls).toBe(1)
1308 await pool.destroy()
1309 })
1310
1311 it('Verify that hasTaskFunction() is working', async () => {
1312 const dynamicThreadPool = new DynamicThreadPool(
1313 Math.floor(numberOfWorkers / 2),
1314 numberOfWorkers,
1315 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1316 )
1317 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1318 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1319 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1320 true
1321 )
1322 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1323 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1324 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1325 await dynamicThreadPool.destroy()
1326 const fixedClusterPool = new FixedClusterPool(
1327 numberOfWorkers,
1328 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1329 )
1330 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1331 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1332 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1333 true
1334 )
1335 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1336 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1337 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1338 await fixedClusterPool.destroy()
1339 })
1340
1341 it('Verify that addTaskFunction() is working', async () => {
1342 const dynamicThreadPool = new DynamicThreadPool(
1343 Math.floor(numberOfWorkers / 2),
1344 numberOfWorkers,
1345 './tests/worker-files/thread/testWorker.mjs'
1346 )
1347 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1348 await expect(
1349 dynamicThreadPool.addTaskFunction(0, () => {})
1350 ).rejects.toThrow(new TypeError('name argument must be a string'))
1351 await expect(
1352 dynamicThreadPool.addTaskFunction('', () => {})
1353 ).rejects.toThrow(
1354 new TypeError('name argument must not be an empty string')
1355 )
1356 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1357 new TypeError('fn argument must be a function')
1358 )
1359 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1360 new TypeError('fn argument must be a function')
1361 )
1362 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1363 DEFAULT_TASK_NAME,
1364 'test'
1365 ])
1366 const echoTaskFunction = data => {
1367 return data
1368 }
1369 await expect(
1370 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1371 ).resolves.toBe(true)
1372 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1373 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1374 echoTaskFunction
1375 )
1376 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1377 DEFAULT_TASK_NAME,
1378 'test',
1379 'echo'
1380 ])
1381 const taskFunctionData = { test: 'test' }
1382 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1383 expect(echoResult).toStrictEqual(taskFunctionData)
1384 for (const workerNode of dynamicThreadPool.workerNodes) {
1385 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1386 tasks: {
1387 executed: expect.any(Number),
1388 executing: 0,
1389 queued: 0,
1390 sequentiallyStolen: 0,
1391 stolen: 0,
1392 failed: 0
1393 },
1394 runTime: {
1395 history: new CircularArray()
1396 },
1397 waitTime: {
1398 history: new CircularArray()
1399 },
1400 elu: {
1401 idle: {
1402 history: new CircularArray()
1403 },
1404 active: {
1405 history: new CircularArray()
1406 }
1407 }
1408 })
1409 }
1410 await dynamicThreadPool.destroy()
1411 })
1412
1413 it('Verify that removeTaskFunction() is working', async () => {
1414 const dynamicThreadPool = new DynamicThreadPool(
1415 Math.floor(numberOfWorkers / 2),
1416 numberOfWorkers,
1417 './tests/worker-files/thread/testWorker.mjs'
1418 )
1419 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1420 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1421 DEFAULT_TASK_NAME,
1422 'test'
1423 ])
1424 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1425 new Error('Cannot remove a task function not handled on the pool side')
1426 )
1427 const echoTaskFunction = data => {
1428 return data
1429 }
1430 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1431 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1432 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1433 echoTaskFunction
1434 )
1435 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1436 DEFAULT_TASK_NAME,
1437 'test',
1438 'echo'
1439 ])
1440 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1441 true
1442 )
1443 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1444 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1445 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1446 DEFAULT_TASK_NAME,
1447 'test'
1448 ])
1449 await dynamicThreadPool.destroy()
1450 })
1451
1452 it('Verify that listTaskFunctionNames() is working', async () => {
1453 const dynamicThreadPool = new DynamicThreadPool(
1454 Math.floor(numberOfWorkers / 2),
1455 numberOfWorkers,
1456 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1457 )
1458 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1459 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1460 DEFAULT_TASK_NAME,
1461 'jsonIntegerSerialization',
1462 'factorial',
1463 'fibonacci'
1464 ])
1465 await dynamicThreadPool.destroy()
1466 const fixedClusterPool = new FixedClusterPool(
1467 numberOfWorkers,
1468 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1469 )
1470 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1471 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1472 DEFAULT_TASK_NAME,
1473 'jsonIntegerSerialization',
1474 'factorial',
1475 'fibonacci'
1476 ])
1477 await fixedClusterPool.destroy()
1478 })
1479
1480 it('Verify that setDefaultTaskFunction() is working', async () => {
1481 const dynamicThreadPool = new DynamicThreadPool(
1482 Math.floor(numberOfWorkers / 2),
1483 numberOfWorkers,
1484 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1485 )
1486 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1487 const workerId = dynamicThreadPool.workerNodes[0].info.id
1488 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1489 new Error(
1490 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1491 )
1492 )
1493 await expect(
1494 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1495 ).rejects.toThrow(
1496 new Error(
1497 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1498 )
1499 )
1500 await expect(
1501 dynamicThreadPool.setDefaultTaskFunction('unknown')
1502 ).rejects.toThrow(
1503 new Error(
1504 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1505 )
1506 )
1507 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1508 DEFAULT_TASK_NAME,
1509 'jsonIntegerSerialization',
1510 'factorial',
1511 'fibonacci'
1512 ])
1513 await expect(
1514 dynamicThreadPool.setDefaultTaskFunction('factorial')
1515 ).resolves.toBe(true)
1516 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1517 DEFAULT_TASK_NAME,
1518 'factorial',
1519 'jsonIntegerSerialization',
1520 'fibonacci'
1521 ])
1522 await expect(
1523 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1524 ).resolves.toBe(true)
1525 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1526 DEFAULT_TASK_NAME,
1527 'fibonacci',
1528 'jsonIntegerSerialization',
1529 'factorial'
1530 ])
1531 await dynamicThreadPool.destroy()
1532 })
1533
1534 it('Verify that multiple task functions worker is working', async () => {
1535 const pool = new DynamicClusterPool(
1536 Math.floor(numberOfWorkers / 2),
1537 numberOfWorkers,
1538 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1539 )
1540 const data = { n: 10 }
1541 const result0 = await pool.execute(data)
1542 expect(result0).toStrictEqual({ ok: 1 })
1543 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1544 expect(result1).toStrictEqual({ ok: 1 })
1545 const result2 = await pool.execute(data, 'factorial')
1546 expect(result2).toBe(3628800)
1547 const result3 = await pool.execute(data, 'fibonacci')
1548 expect(result3).toBe(55)
1549 expect(pool.info.executingTasks).toBe(0)
1550 expect(pool.info.executedTasks).toBe(4)
1551 for (const workerNode of pool.workerNodes) {
1552 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1553 DEFAULT_TASK_NAME,
1554 'jsonIntegerSerialization',
1555 'factorial',
1556 'fibonacci'
1557 ])
1558 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1559 for (const name of pool.listTaskFunctionNames()) {
1560 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1561 tasks: {
1562 executed: expect.any(Number),
1563 executing: 0,
1564 failed: 0,
1565 queued: 0,
1566 sequentiallyStolen: 0,
1567 stolen: 0
1568 },
1569 runTime: {
1570 history: expect.any(CircularArray)
1571 },
1572 waitTime: {
1573 history: expect.any(CircularArray)
1574 },
1575 elu: {
1576 idle: {
1577 history: expect.any(CircularArray)
1578 },
1579 active: {
1580 history: expect.any(CircularArray)
1581 }
1582 }
1583 })
1584 expect(
1585 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1586 ).toBeGreaterThan(0)
1587 }
1588 expect(
1589 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1590 ).toStrictEqual(
1591 workerNode.getTaskFunctionWorkerUsage(
1592 workerNode.info.taskFunctionNames[1]
1593 )
1594 )
1595 }
1596 await pool.destroy()
1597 })
1598
1599 it('Verify sendKillMessageToWorker()', async () => {
1600 const pool = new DynamicClusterPool(
1601 Math.floor(numberOfWorkers / 2),
1602 numberOfWorkers,
1603 './tests/worker-files/cluster/testWorker.cjs'
1604 )
1605 const workerNodeKey = 0
1606 await expect(
1607 pool.sendKillMessageToWorker(workerNodeKey)
1608 ).resolves.toBeUndefined()
1609 await pool.destroy()
1610 })
1611
1612 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1613 const pool = new DynamicClusterPool(
1614 Math.floor(numberOfWorkers / 2),
1615 numberOfWorkers,
1616 './tests/worker-files/cluster/testWorker.cjs'
1617 )
1618 const workerNodeKey = 0
1619 await expect(
1620 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1621 taskFunctionOperation: 'add',
1622 taskFunctionName: 'empty',
1623 taskFunction: (() => {}).toString()
1624 })
1625 ).resolves.toBe(true)
1626 expect(
1627 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1628 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1629 await pool.destroy()
1630 })
1631
1632 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1633 const pool = new DynamicClusterPool(
1634 Math.floor(numberOfWorkers / 2),
1635 numberOfWorkers,
1636 './tests/worker-files/cluster/testWorker.cjs'
1637 )
1638 await expect(
1639 pool.sendTaskFunctionOperationToWorkers({
1640 taskFunctionOperation: 'add',
1641 taskFunctionName: 'empty',
1642 taskFunction: (() => {}).toString()
1643 })
1644 ).resolves.toBe(true)
1645 for (const workerNode of pool.workerNodes) {
1646 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1647 DEFAULT_TASK_NAME,
1648 'test',
1649 'empty'
1650 ])
1651 }
1652 await pool.destroy()
1653 })
1654})