a44cdea527fcef97363d15063f4d432fcb366ec6
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const sinon = require('sinon')
3 const {
4 DynamicClusterPool,
5 DynamicThreadPool,
6 FixedClusterPool,
7 FixedThreadPool,
8 PoolEvents,
9 PoolTypes,
10 WorkerChoiceStrategies,
11 WorkerTypes
12 } = require('../../../lib')
13 const { CircularArray } = require('../../../lib/circular-array')
14 const { Deque } = require('../../../lib/deque')
15 const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
16 const { version } = require('../../../package.json')
17 const { waitPoolEvents } = require('../../test-utils')
18
19 describe('Abstract pool test suite', () => {
20 const numberOfWorkers = 2
21 class StubPoolWithIsMain extends FixedThreadPool {
22 isMain () {
23 return false
24 }
25 }
26
27 afterEach(() => {
28 sinon.restore()
29 })
30
31 it('Simulate pool creation from a non main thread/process', () => {
32 expect(
33 () =>
34 new StubPoolWithIsMain(
35 numberOfWorkers,
36 './tests/worker-files/thread/testWorker.js',
37 {
38 errorHandler: (e) => console.error(e)
39 }
40 )
41 ).toThrowError(
42 new Error(
43 'Cannot start a pool from a worker with the same type as the pool'
44 )
45 )
46 })
47
48 it('Verify that pool statuses properties are set', async () => {
49 const pool = new FixedThreadPool(
50 numberOfWorkers,
51 './tests/worker-files/thread/testWorker.js'
52 )
53 expect(pool.starting).toBe(false)
54 expect(pool.started).toBe(true)
55 await pool.destroy()
56 })
57
58 it('Verify that filePath is checked', () => {
59 const expectedError = new Error(
60 'Please specify a file with a worker implementation'
61 )
62 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
63 expectedError
64 )
65 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
66 expectedError
67 )
68 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
69 expectedError
70 )
71 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
72 expectedError
73 )
74 expect(
75 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
76 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
77 })
78
79 it('Verify that numberOfWorkers is checked', () => {
80 expect(() => new FixedThreadPool()).toThrowError(
81 new Error(
82 'Cannot instantiate a pool without specifying the number of workers'
83 )
84 )
85 })
86
87 it('Verify that a negative number of workers is checked', () => {
88 expect(
89 () =>
90 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
91 ).toThrowError(
92 new RangeError(
93 'Cannot instantiate a pool with a negative number of workers'
94 )
95 )
96 })
97
98 it('Verify that a non integer number of workers is checked', () => {
99 expect(
100 () =>
101 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
102 ).toThrowError(
103 new TypeError(
104 'Cannot instantiate a pool with a non safe integer number of workers'
105 )
106 )
107 })
108
109 it('Verify that dynamic pool sizing is checked', () => {
110 expect(
111 () =>
112 new DynamicClusterPool(
113 1,
114 undefined,
115 './tests/worker-files/cluster/testWorker.js'
116 )
117 ).toThrowError(
118 new TypeError(
119 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
120 )
121 )
122 expect(
123 () =>
124 new DynamicThreadPool(
125 0.5,
126 1,
127 './tests/worker-files/thread/testWorker.js'
128 )
129 ).toThrowError(
130 new TypeError(
131 'Cannot instantiate a pool with a non safe integer number of workers'
132 )
133 )
134 expect(
135 () =>
136 new DynamicClusterPool(
137 0,
138 0.5,
139 './tests/worker-files/cluster/testWorker.js'
140 )
141 ).toThrowError(
142 new TypeError(
143 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
144 )
145 )
146 expect(
147 () =>
148 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
149 ).toThrowError(
150 new RangeError(
151 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
152 )
153 )
154 expect(
155 () =>
156 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
157 ).toThrowError(
158 new RangeError(
159 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
160 )
161 )
162 expect(
163 () =>
164 new DynamicClusterPool(
165 1,
166 1,
167 './tests/worker-files/cluster/testWorker.js'
168 )
169 ).toThrowError(
170 new RangeError(
171 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
172 )
173 )
174 })
175
176 it('Verify that pool options are checked', async () => {
177 let pool = new FixedThreadPool(
178 numberOfWorkers,
179 './tests/worker-files/thread/testWorker.js'
180 )
181 expect(pool.emitter).toBeDefined()
182 expect(pool.opts.enableEvents).toBe(true)
183 expect(pool.opts.restartWorkerOnError).toBe(true)
184 expect(pool.opts.enableTasksQueue).toBe(false)
185 expect(pool.opts.tasksQueueOptions).toBeUndefined()
186 expect(pool.opts.workerChoiceStrategy).toBe(
187 WorkerChoiceStrategies.ROUND_ROBIN
188 )
189 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
190 retries: 6,
191 runTime: { median: false },
192 waitTime: { median: false },
193 elu: { median: false }
194 })
195 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
196 retries: 6,
197 runTime: { median: false },
198 waitTime: { median: false },
199 elu: { median: false }
200 })
201 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
202 .workerChoiceStrategies) {
203 expect(workerChoiceStrategy.opts).toStrictEqual({
204 retries: 6,
205 runTime: { median: false },
206 waitTime: { median: false },
207 elu: { median: false }
208 })
209 }
210 expect(pool.opts.messageHandler).toBeUndefined()
211 expect(pool.opts.errorHandler).toBeUndefined()
212 expect(pool.opts.onlineHandler).toBeUndefined()
213 expect(pool.opts.exitHandler).toBeUndefined()
214 await pool.destroy()
215 const testHandler = () => console.info('test handler executed')
216 pool = new FixedThreadPool(
217 numberOfWorkers,
218 './tests/worker-files/thread/testWorker.js',
219 {
220 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
221 workerChoiceStrategyOptions: {
222 runTime: { median: true },
223 weights: { 0: 300, 1: 200 }
224 },
225 enableEvents: false,
226 restartWorkerOnError: false,
227 enableTasksQueue: true,
228 tasksQueueOptions: { concurrency: 2 },
229 messageHandler: testHandler,
230 errorHandler: testHandler,
231 onlineHandler: testHandler,
232 exitHandler: testHandler
233 }
234 )
235 expect(pool.emitter).toBeUndefined()
236 expect(pool.opts.enableEvents).toBe(false)
237 expect(pool.opts.restartWorkerOnError).toBe(false)
238 expect(pool.opts.enableTasksQueue).toBe(true)
239 expect(pool.opts.tasksQueueOptions).toStrictEqual({
240 concurrency: 2,
241 size: 4
242 })
243 expect(pool.opts.workerChoiceStrategy).toBe(
244 WorkerChoiceStrategies.LEAST_USED
245 )
246 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
247 retries: 6,
248 runTime: { median: true },
249 waitTime: { median: false },
250 elu: { median: false },
251 weights: { 0: 300, 1: 200 }
252 })
253 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
254 retries: 6,
255 runTime: { median: true },
256 waitTime: { median: false },
257 elu: { median: false },
258 weights: { 0: 300, 1: 200 }
259 })
260 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
261 .workerChoiceStrategies) {
262 expect(workerChoiceStrategy.opts).toStrictEqual({
263 retries: 6,
264 runTime: { median: true },
265 waitTime: { median: false },
266 elu: { median: false },
267 weights: { 0: 300, 1: 200 }
268 })
269 }
270 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
271 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
272 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
273 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
274 await pool.destroy()
275 })
276
277 it('Verify that pool options are validated', async () => {
278 expect(
279 () =>
280 new FixedThreadPool(
281 numberOfWorkers,
282 './tests/worker-files/thread/testWorker.js',
283 {
284 workerChoiceStrategy: 'invalidStrategy'
285 }
286 )
287 ).toThrowError(
288 new Error("Invalid worker choice strategy 'invalidStrategy'")
289 )
290 expect(
291 () =>
292 new FixedThreadPool(
293 numberOfWorkers,
294 './tests/worker-files/thread/testWorker.js',
295 {
296 workerChoiceStrategyOptions: {
297 retries: 'invalidChoiceRetries'
298 }
299 }
300 )
301 ).toThrowError(
302 new TypeError(
303 'Invalid worker choice strategy options: retries must be an integer'
304 )
305 )
306 expect(
307 () =>
308 new FixedThreadPool(
309 numberOfWorkers,
310 './tests/worker-files/thread/testWorker.js',
311 {
312 workerChoiceStrategyOptions: {
313 retries: -1
314 }
315 }
316 )
317 ).toThrowError(
318 new RangeError(
319 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
320 )
321 )
322 expect(
323 () =>
324 new FixedThreadPool(
325 numberOfWorkers,
326 './tests/worker-files/thread/testWorker.js',
327 {
328 workerChoiceStrategyOptions: { weights: {} }
329 }
330 )
331 ).toThrowError(
332 new Error(
333 'Invalid worker choice strategy options: must have a weight for each worker node'
334 )
335 )
336 expect(
337 () =>
338 new FixedThreadPool(
339 numberOfWorkers,
340 './tests/worker-files/thread/testWorker.js',
341 {
342 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
343 }
344 )
345 ).toThrowError(
346 new Error(
347 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
348 )
349 )
350 expect(
351 () =>
352 new FixedThreadPool(
353 numberOfWorkers,
354 './tests/worker-files/thread/testWorker.js',
355 {
356 enableTasksQueue: true,
357 tasksQueueOptions: 'invalidTasksQueueOptions'
358 }
359 )
360 ).toThrowError(
361 new TypeError('Invalid tasks queue options: must be a plain object')
362 )
363 expect(
364 () =>
365 new FixedThreadPool(
366 numberOfWorkers,
367 './tests/worker-files/thread/testWorker.js',
368 {
369 enableTasksQueue: true,
370 tasksQueueOptions: { concurrency: 0 }
371 }
372 )
373 ).toThrowError(
374 new RangeError(
375 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
376 )
377 )
378 expect(
379 () =>
380 new FixedThreadPool(
381 numberOfWorkers,
382 './tests/worker-files/thread/testWorker.js',
383 {
384 enableTasksQueue: true,
385 tasksQueueOptions: { concurrency: -1 }
386 }
387 )
388 ).toThrowError(
389 new RangeError(
390 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
391 )
392 )
393 expect(
394 () =>
395 new FixedThreadPool(
396 numberOfWorkers,
397 './tests/worker-files/thread/testWorker.js',
398 {
399 enableTasksQueue: true,
400 tasksQueueOptions: { concurrency: 0.2 }
401 }
402 )
403 ).toThrowError(
404 new TypeError('Invalid worker node tasks concurrency: must be an integer')
405 )
406 expect(
407 () =>
408 new FixedThreadPool(
409 numberOfWorkers,
410 './tests/worker-files/thread/testWorker.js',
411 {
412 enableTasksQueue: true,
413 tasksQueueOptions: { queueMaxSize: 2 }
414 }
415 )
416 ).toThrowError(
417 new Error(
418 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
419 )
420 )
421 expect(
422 () =>
423 new FixedThreadPool(
424 numberOfWorkers,
425 './tests/worker-files/thread/testWorker.js',
426 {
427 enableTasksQueue: true,
428 tasksQueueOptions: { size: 0 }
429 }
430 )
431 ).toThrowError(
432 new RangeError(
433 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
434 )
435 )
436 expect(
437 () =>
438 new FixedThreadPool(
439 numberOfWorkers,
440 './tests/worker-files/thread/testWorker.js',
441 {
442 enableTasksQueue: true,
443 tasksQueueOptions: { size: -1 }
444 }
445 )
446 ).toThrowError(
447 new RangeError(
448 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
449 )
450 )
451 expect(
452 () =>
453 new FixedThreadPool(
454 numberOfWorkers,
455 './tests/worker-files/thread/testWorker.js',
456 {
457 enableTasksQueue: true,
458 tasksQueueOptions: { size: 0.2 }
459 }
460 )
461 ).toThrowError(
462 new TypeError('Invalid worker node tasks queue size: must be an integer')
463 )
464 })
465
466 it('Verify that pool worker choice strategy options can be set', async () => {
467 const pool = new FixedThreadPool(
468 numberOfWorkers,
469 './tests/worker-files/thread/testWorker.js',
470 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
471 )
472 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
473 retries: 6,
474 runTime: { median: false },
475 waitTime: { median: false },
476 elu: { median: false }
477 })
478 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
479 retries: 6,
480 runTime: { median: false },
481 waitTime: { median: false },
482 elu: { median: false }
483 })
484 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
485 .workerChoiceStrategies) {
486 expect(workerChoiceStrategy.opts).toStrictEqual({
487 retries: 6,
488 runTime: { median: false },
489 waitTime: { median: false },
490 elu: { median: false }
491 })
492 }
493 expect(
494 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
495 ).toStrictEqual({
496 runTime: {
497 aggregate: true,
498 average: true,
499 median: false
500 },
501 waitTime: {
502 aggregate: false,
503 average: false,
504 median: false
505 },
506 elu: {
507 aggregate: true,
508 average: true,
509 median: false
510 }
511 })
512 pool.setWorkerChoiceStrategyOptions({
513 runTime: { median: true },
514 elu: { median: true }
515 })
516 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
517 retries: 6,
518 runTime: { median: true },
519 waitTime: { median: false },
520 elu: { median: true }
521 })
522 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
523 retries: 6,
524 runTime: { median: true },
525 waitTime: { median: false },
526 elu: { median: true }
527 })
528 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
529 .workerChoiceStrategies) {
530 expect(workerChoiceStrategy.opts).toStrictEqual({
531 retries: 6,
532 runTime: { median: true },
533 waitTime: { median: false },
534 elu: { median: true }
535 })
536 }
537 expect(
538 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
539 ).toStrictEqual({
540 runTime: {
541 aggregate: true,
542 average: false,
543 median: true
544 },
545 waitTime: {
546 aggregate: false,
547 average: false,
548 median: false
549 },
550 elu: {
551 aggregate: true,
552 average: false,
553 median: true
554 }
555 })
556 pool.setWorkerChoiceStrategyOptions({
557 runTime: { median: false },
558 elu: { median: false }
559 })
560 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
561 retries: 6,
562 runTime: { median: false },
563 waitTime: { median: false },
564 elu: { median: false }
565 })
566 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
567 retries: 6,
568 runTime: { median: false },
569 waitTime: { median: false },
570 elu: { median: false }
571 })
572 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
573 .workerChoiceStrategies) {
574 expect(workerChoiceStrategy.opts).toStrictEqual({
575 retries: 6,
576 runTime: { median: false },
577 waitTime: { median: false },
578 elu: { median: false }
579 })
580 }
581 expect(
582 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
583 ).toStrictEqual({
584 runTime: {
585 aggregate: true,
586 average: true,
587 median: false
588 },
589 waitTime: {
590 aggregate: false,
591 average: false,
592 median: false
593 },
594 elu: {
595 aggregate: true,
596 average: true,
597 median: false
598 }
599 })
600 expect(() =>
601 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
602 ).toThrowError(
603 new TypeError(
604 'Invalid worker choice strategy options: must be a plain object'
605 )
606 )
607 expect(() =>
608 pool.setWorkerChoiceStrategyOptions({
609 retries: 'invalidChoiceRetries'
610 })
611 ).toThrowError(
612 new TypeError(
613 'Invalid worker choice strategy options: retries must be an integer'
614 )
615 )
616 expect(() =>
617 pool.setWorkerChoiceStrategyOptions({ retries: -1 })
618 ).toThrowError(
619 new RangeError(
620 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
621 )
622 )
623 expect(() =>
624 pool.setWorkerChoiceStrategyOptions({ weights: {} })
625 ).toThrowError(
626 new Error(
627 'Invalid worker choice strategy options: must have a weight for each worker node'
628 )
629 )
630 expect(() =>
631 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
632 ).toThrowError(
633 new Error(
634 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
635 )
636 )
637 await pool.destroy()
638 })
639
640 it('Verify that pool tasks queue can be enabled/disabled', async () => {
641 const pool = new FixedThreadPool(
642 numberOfWorkers,
643 './tests/worker-files/thread/testWorker.js'
644 )
645 expect(pool.opts.enableTasksQueue).toBe(false)
646 expect(pool.opts.tasksQueueOptions).toBeUndefined()
647 pool.enableTasksQueue(true)
648 expect(pool.opts.enableTasksQueue).toBe(true)
649 expect(pool.opts.tasksQueueOptions).toStrictEqual({
650 concurrency: 1,
651 size: 4
652 })
653 pool.enableTasksQueue(true, { concurrency: 2 })
654 expect(pool.opts.enableTasksQueue).toBe(true)
655 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 concurrency: 2,
657 size: 4
658 })
659 pool.enableTasksQueue(false)
660 expect(pool.opts.enableTasksQueue).toBe(false)
661 expect(pool.opts.tasksQueueOptions).toBeUndefined()
662 await pool.destroy()
663 })
664
665 it('Verify that pool tasks queue options can be set', async () => {
666 const pool = new FixedThreadPool(
667 numberOfWorkers,
668 './tests/worker-files/thread/testWorker.js',
669 { enableTasksQueue: true }
670 )
671 expect(pool.opts.tasksQueueOptions).toStrictEqual({
672 concurrency: 1,
673 size: 4
674 })
675 pool.setTasksQueueOptions({ concurrency: 2 })
676 expect(pool.opts.tasksQueueOptions).toStrictEqual({
677 concurrency: 2,
678 size: 4
679 })
680 expect(() =>
681 pool.setTasksQueueOptions('invalidTasksQueueOptions')
682 ).toThrowError(
683 new TypeError('Invalid tasks queue options: must be a plain object')
684 )
685 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
686 new RangeError(
687 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
688 )
689 )
690 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
691 new RangeError(
692 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
693 )
694 )
695 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
696 new TypeError('Invalid worker node tasks concurrency: must be an integer')
697 )
698 expect(() => pool.setTasksQueueOptions({ queueMaxSize: 2 })).toThrowError(
699 new Error(
700 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
701 )
702 )
703 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
704 new RangeError(
705 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
706 )
707 )
708 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError(
709 new RangeError(
710 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
711 )
712 )
713 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError(
714 new TypeError('Invalid worker node tasks queue size: must be an integer')
715 )
716 await pool.destroy()
717 })
718
719 it('Verify that pool info is set', async () => {
720 let pool = new FixedThreadPool(
721 numberOfWorkers,
722 './tests/worker-files/thread/testWorker.js'
723 )
724 expect(pool.info).toStrictEqual({
725 version,
726 type: PoolTypes.fixed,
727 worker: WorkerTypes.thread,
728 ready: true,
729 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
730 minSize: numberOfWorkers,
731 maxSize: numberOfWorkers,
732 workerNodes: numberOfWorkers,
733 idleWorkerNodes: numberOfWorkers,
734 busyWorkerNodes: 0,
735 executedTasks: 0,
736 executingTasks: 0,
737 failedTasks: 0
738 })
739 await pool.destroy()
740 pool = new DynamicClusterPool(
741 Math.floor(numberOfWorkers / 2),
742 numberOfWorkers,
743 './tests/worker-files/cluster/testWorker.js'
744 )
745 expect(pool.info).toStrictEqual({
746 version,
747 type: PoolTypes.dynamic,
748 worker: WorkerTypes.cluster,
749 ready: true,
750 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
751 minSize: Math.floor(numberOfWorkers / 2),
752 maxSize: numberOfWorkers,
753 workerNodes: Math.floor(numberOfWorkers / 2),
754 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
755 busyWorkerNodes: 0,
756 executedTasks: 0,
757 executingTasks: 0,
758 failedTasks: 0
759 })
760 await pool.destroy()
761 })
762
763 it('Verify that pool worker tasks usage are initialized', async () => {
764 const pool = new FixedClusterPool(
765 numberOfWorkers,
766 './tests/worker-files/cluster/testWorker.js'
767 )
768 for (const workerNode of pool.workerNodes) {
769 expect(workerNode.usage).toStrictEqual({
770 tasks: {
771 executed: 0,
772 executing: 0,
773 queued: 0,
774 maxQueued: 0,
775 stolen: 0,
776 failed: 0
777 },
778 runTime: {
779 history: expect.any(CircularArray)
780 },
781 waitTime: {
782 history: expect.any(CircularArray)
783 },
784 elu: {
785 idle: {
786 history: expect.any(CircularArray)
787 },
788 active: {
789 history: expect.any(CircularArray)
790 }
791 }
792 })
793 }
794 await pool.destroy()
795 })
796
797 it('Verify that pool worker tasks queue are initialized', async () => {
798 let pool = new FixedClusterPool(
799 numberOfWorkers,
800 './tests/worker-files/cluster/testWorker.js'
801 )
802 for (const workerNode of pool.workerNodes) {
803 expect(workerNode.tasksQueue).toBeDefined()
804 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
805 expect(workerNode.tasksQueue.size).toBe(0)
806 expect(workerNode.tasksQueue.maxSize).toBe(0)
807 }
808 await pool.destroy()
809 pool = new DynamicThreadPool(
810 Math.floor(numberOfWorkers / 2),
811 numberOfWorkers,
812 './tests/worker-files/thread/testWorker.js'
813 )
814 for (const workerNode of pool.workerNodes) {
815 expect(workerNode.tasksQueue).toBeDefined()
816 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
817 expect(workerNode.tasksQueue.size).toBe(0)
818 expect(workerNode.tasksQueue.maxSize).toBe(0)
819 }
820 await pool.destroy()
821 })
822
823 it('Verify that pool worker info are initialized', async () => {
824 let pool = new FixedClusterPool(
825 numberOfWorkers,
826 './tests/worker-files/cluster/testWorker.js'
827 )
828 for (const workerNode of pool.workerNodes) {
829 expect(workerNode.info).toStrictEqual({
830 id: expect.any(Number),
831 type: WorkerTypes.cluster,
832 dynamic: false,
833 ready: true
834 })
835 }
836 await pool.destroy()
837 pool = new DynamicThreadPool(
838 Math.floor(numberOfWorkers / 2),
839 numberOfWorkers,
840 './tests/worker-files/thread/testWorker.js'
841 )
842 for (const workerNode of pool.workerNodes) {
843 expect(workerNode.info).toStrictEqual({
844 id: expect.any(Number),
845 type: WorkerTypes.thread,
846 dynamic: false,
847 ready: true
848 })
849 }
850 await pool.destroy()
851 })
852
853 it('Verify that pool execute() arguments are checked', async () => {
854 const pool = new FixedClusterPool(
855 numberOfWorkers,
856 './tests/worker-files/cluster/testWorker.js'
857 )
858 await expect(pool.execute(undefined, 0)).rejects.toThrowError(
859 new TypeError('name argument must be a string')
860 )
861 await expect(pool.execute(undefined, '')).rejects.toThrowError(
862 new TypeError('name argument must not be an empty string')
863 )
864 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
865 new TypeError('transferList argument must be an array')
866 )
867 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
868 "Task function 'unknown' not found"
869 )
870 await pool.destroy()
871 await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
872 new Error('Cannot execute a task on destroyed pool')
873 )
874 })
875
876 it('Verify that pool worker tasks usage are computed', async () => {
877 const pool = new FixedClusterPool(
878 numberOfWorkers,
879 './tests/worker-files/cluster/testWorker.js'
880 )
881 const promises = new Set()
882 const maxMultiplier = 2
883 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
884 promises.add(pool.execute())
885 }
886 for (const workerNode of pool.workerNodes) {
887 expect(workerNode.usage).toStrictEqual({
888 tasks: {
889 executed: 0,
890 executing: maxMultiplier,
891 queued: 0,
892 maxQueued: 0,
893 stolen: 0,
894 failed: 0
895 },
896 runTime: {
897 history: expect.any(CircularArray)
898 },
899 waitTime: {
900 history: expect.any(CircularArray)
901 },
902 elu: {
903 idle: {
904 history: expect.any(CircularArray)
905 },
906 active: {
907 history: expect.any(CircularArray)
908 }
909 }
910 })
911 }
912 await Promise.all(promises)
913 for (const workerNode of pool.workerNodes) {
914 expect(workerNode.usage).toStrictEqual({
915 tasks: {
916 executed: maxMultiplier,
917 executing: 0,
918 queued: 0,
919 maxQueued: 0,
920 stolen: 0,
921 failed: 0
922 },
923 runTime: {
924 history: expect.any(CircularArray)
925 },
926 waitTime: {
927 history: expect.any(CircularArray)
928 },
929 elu: {
930 idle: {
931 history: expect.any(CircularArray)
932 },
933 active: {
934 history: expect.any(CircularArray)
935 }
936 }
937 })
938 }
939 await pool.destroy()
940 })
941
942 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
943 const pool = new DynamicThreadPool(
944 Math.floor(numberOfWorkers / 2),
945 numberOfWorkers,
946 './tests/worker-files/thread/testWorker.js'
947 )
948 const promises = new Set()
949 const maxMultiplier = 2
950 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
951 promises.add(pool.execute())
952 }
953 await Promise.all(promises)
954 for (const workerNode of pool.workerNodes) {
955 expect(workerNode.usage).toStrictEqual({
956 tasks: {
957 executed: expect.any(Number),
958 executing: 0,
959 queued: 0,
960 maxQueued: 0,
961 stolen: 0,
962 failed: 0
963 },
964 runTime: {
965 history: expect.any(CircularArray)
966 },
967 waitTime: {
968 history: expect.any(CircularArray)
969 },
970 elu: {
971 idle: {
972 history: expect.any(CircularArray)
973 },
974 active: {
975 history: expect.any(CircularArray)
976 }
977 }
978 })
979 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
980 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
981 numberOfWorkers * maxMultiplier
982 )
983 expect(workerNode.usage.runTime.history.length).toBe(0)
984 expect(workerNode.usage.waitTime.history.length).toBe(0)
985 expect(workerNode.usage.elu.idle.history.length).toBe(0)
986 expect(workerNode.usage.elu.active.history.length).toBe(0)
987 }
988 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
989 for (const workerNode of pool.workerNodes) {
990 expect(workerNode.usage).toStrictEqual({
991 tasks: {
992 executed: 0,
993 executing: 0,
994 queued: 0,
995 maxQueued: 0,
996 stolen: 0,
997 failed: 0
998 },
999 runTime: {
1000 history: expect.any(CircularArray)
1001 },
1002 waitTime: {
1003 history: expect.any(CircularArray)
1004 },
1005 elu: {
1006 idle: {
1007 history: expect.any(CircularArray)
1008 },
1009 active: {
1010 history: expect.any(CircularArray)
1011 }
1012 }
1013 })
1014 expect(workerNode.usage.runTime.history.length).toBe(0)
1015 expect(workerNode.usage.waitTime.history.length).toBe(0)
1016 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1017 expect(workerNode.usage.elu.active.history.length).toBe(0)
1018 }
1019 await pool.destroy()
1020 })
1021
1022 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1023 const pool = new DynamicClusterPool(
1024 Math.floor(numberOfWorkers / 2),
1025 numberOfWorkers,
1026 './tests/worker-files/cluster/testWorker.js'
1027 )
1028 let poolInfo
1029 let poolReady = 0
1030 pool.emitter.on(PoolEvents.ready, (info) => {
1031 ++poolReady
1032 poolInfo = info
1033 })
1034 await waitPoolEvents(pool, PoolEvents.ready, 1)
1035 expect(poolReady).toBe(1)
1036 expect(poolInfo).toStrictEqual({
1037 version,
1038 type: PoolTypes.dynamic,
1039 worker: WorkerTypes.cluster,
1040 ready: true,
1041 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1042 minSize: expect.any(Number),
1043 maxSize: expect.any(Number),
1044 workerNodes: expect.any(Number),
1045 idleWorkerNodes: expect.any(Number),
1046 busyWorkerNodes: expect.any(Number),
1047 executedTasks: expect.any(Number),
1048 executingTasks: expect.any(Number),
1049 failedTasks: expect.any(Number)
1050 })
1051 await pool.destroy()
1052 })
1053
1054 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1055 const pool = new FixedThreadPool(
1056 numberOfWorkers,
1057 './tests/worker-files/thread/testWorker.js'
1058 )
1059 const promises = new Set()
1060 let poolBusy = 0
1061 let poolInfo
1062 pool.emitter.on(PoolEvents.busy, (info) => {
1063 ++poolBusy
1064 poolInfo = info
1065 })
1066 for (let i = 0; i < numberOfWorkers * 2; i++) {
1067 promises.add(pool.execute())
1068 }
1069 await Promise.all(promises)
1070 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1071 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1072 expect(poolBusy).toBe(numberOfWorkers + 1)
1073 expect(poolInfo).toStrictEqual({
1074 version,
1075 type: PoolTypes.fixed,
1076 worker: WorkerTypes.thread,
1077 ready: expect.any(Boolean),
1078 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1079 minSize: expect.any(Number),
1080 maxSize: expect.any(Number),
1081 workerNodes: expect.any(Number),
1082 idleWorkerNodes: expect.any(Number),
1083 busyWorkerNodes: expect.any(Number),
1084 executedTasks: expect.any(Number),
1085 executingTasks: expect.any(Number),
1086 failedTasks: expect.any(Number)
1087 })
1088 await pool.destroy()
1089 })
1090
1091 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1092 const pool = new DynamicThreadPool(
1093 Math.floor(numberOfWorkers / 2),
1094 numberOfWorkers,
1095 './tests/worker-files/thread/testWorker.js'
1096 )
1097 const promises = new Set()
1098 let poolFull = 0
1099 let poolInfo
1100 pool.emitter.on(PoolEvents.full, (info) => {
1101 ++poolFull
1102 poolInfo = info
1103 })
1104 for (let i = 0; i < numberOfWorkers * 2; i++) {
1105 promises.add(pool.execute())
1106 }
1107 await Promise.all(promises)
1108 expect(poolFull).toBe(1)
1109 expect(poolInfo).toStrictEqual({
1110 version,
1111 type: PoolTypes.dynamic,
1112 worker: WorkerTypes.thread,
1113 ready: expect.any(Boolean),
1114 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1115 minSize: expect.any(Number),
1116 maxSize: expect.any(Number),
1117 workerNodes: expect.any(Number),
1118 idleWorkerNodes: expect.any(Number),
1119 busyWorkerNodes: expect.any(Number),
1120 executedTasks: expect.any(Number),
1121 executingTasks: expect.any(Number),
1122 failedTasks: expect.any(Number)
1123 })
1124 await pool.destroy()
1125 })
1126
1127 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1128 const pool = new FixedThreadPool(
1129 numberOfWorkers,
1130 './tests/worker-files/thread/testWorker.js',
1131 {
1132 enableTasksQueue: true
1133 }
1134 )
1135 sinon.stub(pool, 'hasBackPressure').returns(true)
1136 const promises = new Set()
1137 let poolBackPressure = 0
1138 let poolInfo
1139 pool.emitter.on(PoolEvents.backPressure, (info) => {
1140 ++poolBackPressure
1141 poolInfo = info
1142 })
1143 for (let i = 0; i < numberOfWorkers + 1; i++) {
1144 promises.add(pool.execute())
1145 }
1146 await Promise.all(promises)
1147 expect(poolBackPressure).toBe(1)
1148 expect(poolInfo).toStrictEqual({
1149 version,
1150 type: PoolTypes.fixed,
1151 worker: WorkerTypes.thread,
1152 ready: expect.any(Boolean),
1153 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1154 minSize: expect.any(Number),
1155 maxSize: expect.any(Number),
1156 workerNodes: expect.any(Number),
1157 idleWorkerNodes: expect.any(Number),
1158 busyWorkerNodes: expect.any(Number),
1159 executedTasks: expect.any(Number),
1160 executingTasks: expect.any(Number),
1161 maxQueuedTasks: expect.any(Number),
1162 queuedTasks: expect.any(Number),
1163 backPressure: true,
1164 stolenTasks: expect.any(Number),
1165 failedTasks: expect.any(Number)
1166 })
1167 expect(pool.hasBackPressure.called).toBe(true)
1168 await pool.destroy()
1169 })
1170
1171 it('Verify that listTaskFunctions() is working', async () => {
1172 const dynamicThreadPool = new DynamicThreadPool(
1173 Math.floor(numberOfWorkers / 2),
1174 numberOfWorkers,
1175 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
1176 )
1177 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1178 expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
1179 DEFAULT_TASK_NAME,
1180 'jsonIntegerSerialization',
1181 'factorial',
1182 'fibonacci'
1183 ])
1184 const fixedClusterPool = new FixedClusterPool(
1185 numberOfWorkers,
1186 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1187 )
1188 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1189 expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
1190 DEFAULT_TASK_NAME,
1191 'jsonIntegerSerialization',
1192 'factorial',
1193 'fibonacci'
1194 ])
1195 await dynamicThreadPool.destroy()
1196 await fixedClusterPool.destroy()
1197 })
1198
1199 it('Verify that multiple task functions worker is working', async () => {
1200 const pool = new DynamicClusterPool(
1201 Math.floor(numberOfWorkers / 2),
1202 numberOfWorkers,
1203 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1204 )
1205 const data = { n: 10 }
1206 const result0 = await pool.execute(data)
1207 expect(result0).toStrictEqual({ ok: 1 })
1208 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1209 expect(result1).toStrictEqual({ ok: 1 })
1210 const result2 = await pool.execute(data, 'factorial')
1211 expect(result2).toBe(3628800)
1212 const result3 = await pool.execute(data, 'fibonacci')
1213 expect(result3).toBe(55)
1214 expect(pool.info.executingTasks).toBe(0)
1215 expect(pool.info.executedTasks).toBe(4)
1216 for (const workerNode of pool.workerNodes) {
1217 expect(workerNode.info.taskFunctions).toStrictEqual([
1218 DEFAULT_TASK_NAME,
1219 'jsonIntegerSerialization',
1220 'factorial',
1221 'fibonacci'
1222 ])
1223 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1224 for (const name of pool.listTaskFunctions()) {
1225 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1226 tasks: {
1227 executed: expect.any(Number),
1228 executing: expect.any(Number),
1229 failed: 0,
1230 queued: 0,
1231 stolen: 0
1232 },
1233 runTime: {
1234 history: expect.any(CircularArray)
1235 },
1236 waitTime: {
1237 history: expect.any(CircularArray)
1238 },
1239 elu: {
1240 idle: {
1241 history: expect.any(CircularArray)
1242 },
1243 active: {
1244 history: expect.any(CircularArray)
1245 }
1246 }
1247 })
1248 expect(
1249 workerNode.getTaskFunctionWorkerUsage(name).tasks.executing
1250 ).toBeGreaterThanOrEqual(0)
1251 }
1252 }
1253 await pool.destroy()
1254 })
1255 })