feat: untangle worker choice strategies tasks distribution and dynamic worker creatio...
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const sinon = require('sinon')
3 const {
4 DynamicClusterPool,
5 DynamicThreadPool,
6 FixedClusterPool,
7 FixedThreadPool,
8 PoolEvents,
9 PoolTypes,
10 WorkerChoiceStrategies,
11 WorkerTypes
12 } = require('../../../lib')
13 const { CircularArray } = require('../../../lib/circular-array')
14 const { Queue } = require('../../../lib/queue')
15 const { version } = require('../../../package.json')
16 const { waitPoolEvents } = require('../../test-utils')
17
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers = 2
20 class StubPoolWithIsMain extends FixedThreadPool {
21 isMain () {
22 return false
23 }
24 }
25
26 it('Simulate pool creation from a non main thread/process', () => {
27 expect(
28 () =>
29 new StubPoolWithIsMain(
30 numberOfWorkers,
31 './tests/worker-files/thread/testWorker.js',
32 {
33 errorHandler: (e) => console.error(e)
34 }
35 )
36 ).toThrowError(
37 'Cannot start a pool from a worker with the same type as the pool'
38 )
39 })
40
41 it('Verify that filePath is checked', () => {
42 const expectedError = new Error(
43 'Please specify a file with a worker implementation'
44 )
45 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
46 expectedError
47 )
48 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
49 expectedError
50 )
51 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
52 expectedError
53 )
54 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
55 expectedError
56 )
57 expect(
58 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
59 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
60 })
61
62 it('Verify that numberOfWorkers is checked', () => {
63 expect(() => new FixedThreadPool()).toThrowError(
64 'Cannot instantiate a pool without specifying the number of workers'
65 )
66 })
67
68 it('Verify that a negative number of workers is checked', () => {
69 expect(
70 () =>
71 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
72 ).toThrowError(
73 new RangeError(
74 'Cannot instantiate a pool with a negative number of workers'
75 )
76 )
77 })
78
79 it('Verify that a non integer number of workers is checked', () => {
80 expect(
81 () =>
82 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
83 ).toThrowError(
84 new TypeError(
85 'Cannot instantiate a pool with a non safe integer number of workers'
86 )
87 )
88 })
89
90 it('Verify that dynamic pool sizing is checked', () => {
91 expect(
92 () =>
93 new DynamicClusterPool(
94 1,
95 undefined,
96 './tests/worker-files/cluster/testWorker.js'
97 )
98 ).toThrowError(
99 new TypeError(
100 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
101 )
102 )
103 expect(
104 () =>
105 new DynamicThreadPool(
106 0.5,
107 1,
108 './tests/worker-files/thread/testWorker.js'
109 )
110 ).toThrowError(
111 new TypeError(
112 'Cannot instantiate a pool with a non safe integer number of workers'
113 )
114 )
115 expect(
116 () =>
117 new DynamicClusterPool(
118 0,
119 0.5,
120 './tests/worker-files/cluster/testWorker.js'
121 )
122 ).toThrowError(
123 new TypeError(
124 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
125 )
126 )
127 expect(
128 () =>
129 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
130 ).toThrowError(
131 new RangeError(
132 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
133 )
134 )
135 expect(
136 () =>
137 new DynamicClusterPool(
138 1,
139 1,
140 './tests/worker-files/cluster/testWorker.js'
141 )
142 ).toThrowError(
143 new RangeError(
144 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
145 )
146 )
147 expect(
148 () =>
149 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
150 ).toThrowError(
151 new RangeError(
152 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
153 )
154 )
155 })
156
157 it('Verify that pool options are checked', async () => {
158 let pool = new FixedThreadPool(
159 numberOfWorkers,
160 './tests/worker-files/thread/testWorker.js'
161 )
162 expect(pool.emitter).toBeDefined()
163 expect(pool.opts.enableEvents).toBe(true)
164 expect(pool.opts.restartWorkerOnError).toBe(true)
165 expect(pool.opts.enableTasksQueue).toBe(false)
166 expect(pool.opts.tasksQueueOptions).toBeUndefined()
167 expect(pool.opts.workerChoiceStrategy).toBe(
168 WorkerChoiceStrategies.ROUND_ROBIN
169 )
170 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
171 choiceRetries: 6,
172 runTime: { median: false },
173 waitTime: { median: false },
174 elu: { median: false }
175 })
176 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
177 choiceRetries: 6,
178 runTime: { median: false },
179 waitTime: { median: false },
180 elu: { median: false }
181 })
182 expect(pool.opts.messageHandler).toBeUndefined()
183 expect(pool.opts.errorHandler).toBeUndefined()
184 expect(pool.opts.onlineHandler).toBeUndefined()
185 expect(pool.opts.exitHandler).toBeUndefined()
186 await pool.destroy()
187 const testHandler = () => console.info('test handler executed')
188 pool = new FixedThreadPool(
189 numberOfWorkers,
190 './tests/worker-files/thread/testWorker.js',
191 {
192 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
193 workerChoiceStrategyOptions: {
194 runTime: { median: true },
195 weights: { 0: 300, 1: 200 }
196 },
197 enableEvents: false,
198 restartWorkerOnError: false,
199 enableTasksQueue: true,
200 tasksQueueOptions: { concurrency: 2 },
201 messageHandler: testHandler,
202 errorHandler: testHandler,
203 onlineHandler: testHandler,
204 exitHandler: testHandler
205 }
206 )
207 expect(pool.emitter).toBeUndefined()
208 expect(pool.opts.enableEvents).toBe(false)
209 expect(pool.opts.restartWorkerOnError).toBe(false)
210 expect(pool.opts.enableTasksQueue).toBe(true)
211 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
212 expect(pool.opts.workerChoiceStrategy).toBe(
213 WorkerChoiceStrategies.LEAST_USED
214 )
215 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
216 choiceRetries: 6,
217 runTime: { median: true },
218 waitTime: { median: false },
219 elu: { median: false },
220 weights: { 0: 300, 1: 200 }
221 })
222 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
223 choiceRetries: 6,
224 runTime: { median: true },
225 waitTime: { median: false },
226 elu: { median: false },
227 weights: { 0: 300, 1: 200 }
228 })
229 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
230 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
231 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
232 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
233 await pool.destroy()
234 })
235
236 it('Verify that pool options are validated', async () => {
237 expect(
238 () =>
239 new FixedThreadPool(
240 numberOfWorkers,
241 './tests/worker-files/thread/testWorker.js',
242 {
243 workerChoiceStrategy: 'invalidStrategy'
244 }
245 )
246 ).toThrowError(
247 new Error("Invalid worker choice strategy 'invalidStrategy'")
248 )
249 expect(
250 () =>
251 new FixedThreadPool(
252 numberOfWorkers,
253 './tests/worker-files/thread/testWorker.js',
254 {
255 workerChoiceStrategyOptions: { weights: {} }
256 }
257 )
258 ).toThrowError(
259 new Error(
260 'Invalid worker choice strategy options: must have a weight for each worker node'
261 )
262 )
263 expect(
264 () =>
265 new FixedThreadPool(
266 numberOfWorkers,
267 './tests/worker-files/thread/testWorker.js',
268 {
269 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
270 }
271 )
272 ).toThrowError(
273 new Error(
274 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
275 )
276 )
277 expect(
278 () =>
279 new FixedThreadPool(
280 numberOfWorkers,
281 './tests/worker-files/thread/testWorker.js',
282 {
283 enableTasksQueue: true,
284 tasksQueueOptions: { concurrency: 0 }
285 }
286 )
287 ).toThrowError(
288 new TypeError(
289 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
290 )
291 )
292 expect(
293 () =>
294 new FixedThreadPool(
295 numberOfWorkers,
296 './tests/worker-files/thread/testWorker.js',
297 {
298 enableTasksQueue: true,
299 tasksQueueOptions: 'invalidTasksQueueOptions'
300 }
301 )
302 ).toThrowError(
303 new TypeError('Invalid tasks queue options: must be a plain object')
304 )
305 expect(
306 () =>
307 new FixedThreadPool(
308 numberOfWorkers,
309 './tests/worker-files/thread/testWorker.js',
310 {
311 enableTasksQueue: true,
312 tasksQueueOptions: { concurrency: 0.2 }
313 }
314 )
315 ).toThrowError(
316 new TypeError('Invalid worker tasks concurrency: must be an integer')
317 )
318 })
319
320 it('Verify that pool worker choice strategy options can be set', async () => {
321 const pool = new FixedThreadPool(
322 numberOfWorkers,
323 './tests/worker-files/thread/testWorker.js',
324 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
325 )
326 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
327 choiceRetries: 6,
328 runTime: { median: false },
329 waitTime: { median: false },
330 elu: { median: false }
331 })
332 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
333 choiceRetries: 6,
334 runTime: { median: false },
335 waitTime: { median: false },
336 elu: { median: false }
337 })
338 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
339 .workerChoiceStrategies) {
340 expect(workerChoiceStrategy.opts).toStrictEqual({
341 choiceRetries: 6,
342 runTime: { median: false },
343 waitTime: { median: false },
344 elu: { median: false }
345 })
346 }
347 expect(
348 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 ).toStrictEqual({
350 runTime: {
351 aggregate: true,
352 average: true,
353 median: false
354 },
355 waitTime: {
356 aggregate: false,
357 average: false,
358 median: false
359 },
360 elu: {
361 aggregate: true,
362 average: true,
363 median: false
364 }
365 })
366 pool.setWorkerChoiceStrategyOptions({
367 runTime: { median: true },
368 elu: { median: true }
369 })
370 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
371 choiceRetries: 6,
372 runTime: { median: true },
373 waitTime: { median: false },
374 elu: { median: true }
375 })
376 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
377 choiceRetries: 6,
378 runTime: { median: true },
379 waitTime: { median: false },
380 elu: { median: true }
381 })
382 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
383 .workerChoiceStrategies) {
384 expect(workerChoiceStrategy.opts).toStrictEqual({
385 choiceRetries: 6,
386 runTime: { median: true },
387 waitTime: { median: false },
388 elu: { median: true }
389 })
390 }
391 expect(
392 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
393 ).toStrictEqual({
394 runTime: {
395 aggregate: true,
396 average: false,
397 median: true
398 },
399 waitTime: {
400 aggregate: false,
401 average: false,
402 median: false
403 },
404 elu: {
405 aggregate: true,
406 average: false,
407 median: true
408 }
409 })
410 pool.setWorkerChoiceStrategyOptions({
411 runTime: { median: false },
412 elu: { median: false }
413 })
414 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
415 choiceRetries: 6,
416 runTime: { median: false },
417 waitTime: { median: false },
418 elu: { median: false }
419 })
420 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
421 choiceRetries: 6,
422 runTime: { median: false },
423 waitTime: { median: false },
424 elu: { median: false }
425 })
426 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
427 .workerChoiceStrategies) {
428 expect(workerChoiceStrategy.opts).toStrictEqual({
429 choiceRetries: 6,
430 runTime: { median: false },
431 waitTime: { median: false },
432 elu: { median: false }
433 })
434 }
435 expect(
436 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
437 ).toStrictEqual({
438 runTime: {
439 aggregate: true,
440 average: true,
441 median: false
442 },
443 waitTime: {
444 aggregate: false,
445 average: false,
446 median: false
447 },
448 elu: {
449 aggregate: true,
450 average: true,
451 median: false
452 }
453 })
454 expect(() =>
455 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
456 ).toThrowError(
457 new TypeError(
458 'Invalid worker choice strategy options: must be a plain object'
459 )
460 )
461 expect(() =>
462 pool.setWorkerChoiceStrategyOptions({ weights: {} })
463 ).toThrowError(
464 new Error(
465 'Invalid worker choice strategy options: must have a weight for each worker node'
466 )
467 )
468 expect(() =>
469 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
470 ).toThrowError(
471 new Error(
472 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
473 )
474 )
475 await pool.destroy()
476 })
477
478 it('Verify that pool tasks queue can be enabled/disabled', async () => {
479 const pool = new FixedThreadPool(
480 numberOfWorkers,
481 './tests/worker-files/thread/testWorker.js'
482 )
483 expect(pool.opts.enableTasksQueue).toBe(false)
484 expect(pool.opts.tasksQueueOptions).toBeUndefined()
485 pool.enableTasksQueue(true)
486 expect(pool.opts.enableTasksQueue).toBe(true)
487 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
488 pool.enableTasksQueue(true, { concurrency: 2 })
489 expect(pool.opts.enableTasksQueue).toBe(true)
490 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
491 pool.enableTasksQueue(false)
492 expect(pool.opts.enableTasksQueue).toBe(false)
493 expect(pool.opts.tasksQueueOptions).toBeUndefined()
494 await pool.destroy()
495 })
496
497 it('Verify that pool tasks queue options can be set', async () => {
498 const pool = new FixedThreadPool(
499 numberOfWorkers,
500 './tests/worker-files/thread/testWorker.js',
501 { enableTasksQueue: true }
502 )
503 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
504 pool.setTasksQueueOptions({ concurrency: 2 })
505 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
506 expect(() =>
507 pool.setTasksQueueOptions('invalidTasksQueueOptions')
508 ).toThrowError(
509 new TypeError('Invalid tasks queue options: must be a plain object')
510 )
511 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
512 new Error(
513 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
514 )
515 )
516 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
517 new Error(
518 'Invalid worker tasks concurrency: -1 is a negative integer or zero'
519 )
520 )
521 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
522 new TypeError('Invalid worker tasks concurrency: must be an integer')
523 )
524 await pool.destroy()
525 })
526
527 it('Verify that pool info is set', async () => {
528 let pool = new FixedThreadPool(
529 numberOfWorkers,
530 './tests/worker-files/thread/testWorker.js'
531 )
532 expect(pool.info).toStrictEqual({
533 version,
534 type: PoolTypes.fixed,
535 worker: WorkerTypes.thread,
536 ready: true,
537 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
538 minSize: numberOfWorkers,
539 maxSize: numberOfWorkers,
540 workerNodes: numberOfWorkers,
541 idleWorkerNodes: numberOfWorkers,
542 busyWorkerNodes: 0,
543 executedTasks: 0,
544 executingTasks: 0,
545 failedTasks: 0
546 })
547 await pool.destroy()
548 pool = new DynamicClusterPool(
549 Math.floor(numberOfWorkers / 2),
550 numberOfWorkers,
551 './tests/worker-files/cluster/testWorker.js'
552 )
553 expect(pool.info).toStrictEqual({
554 version,
555 type: PoolTypes.dynamic,
556 worker: WorkerTypes.cluster,
557 ready: true,
558 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
559 minSize: Math.floor(numberOfWorkers / 2),
560 maxSize: numberOfWorkers,
561 workerNodes: Math.floor(numberOfWorkers / 2),
562 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
563 busyWorkerNodes: 0,
564 executedTasks: 0,
565 executingTasks: 0,
566 failedTasks: 0
567 })
568 await pool.destroy()
569 })
570
571 it('Verify that pool worker tasks usage are initialized', async () => {
572 const pool = new FixedClusterPool(
573 numberOfWorkers,
574 './tests/worker-files/cluster/testWorker.js'
575 )
576 for (const workerNode of pool.workerNodes) {
577 expect(workerNode.usage).toStrictEqual({
578 tasks: {
579 executed: 0,
580 executing: 0,
581 queued: 0,
582 maxQueued: 0,
583 failed: 0
584 },
585 runTime: {
586 history: expect.any(CircularArray)
587 },
588 waitTime: {
589 history: expect.any(CircularArray)
590 },
591 elu: {
592 idle: {
593 history: expect.any(CircularArray)
594 },
595 active: {
596 history: expect.any(CircularArray)
597 }
598 }
599 })
600 }
601 await pool.destroy()
602 })
603
604 it('Verify that pool worker tasks queue are initialized', async () => {
605 let pool = new FixedClusterPool(
606 numberOfWorkers,
607 './tests/worker-files/cluster/testWorker.js'
608 )
609 for (const workerNode of pool.workerNodes) {
610 expect(workerNode.tasksQueue).toBeDefined()
611 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
612 expect(workerNode.tasksQueue.size).toBe(0)
613 expect(workerNode.tasksQueue.maxSize).toBe(0)
614 }
615 await pool.destroy()
616 pool = new DynamicThreadPool(
617 Math.floor(numberOfWorkers / 2),
618 numberOfWorkers,
619 './tests/worker-files/thread/testWorker.js'
620 )
621 for (const workerNode of pool.workerNodes) {
622 expect(workerNode.tasksQueue).toBeDefined()
623 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
624 expect(workerNode.tasksQueue.size).toBe(0)
625 expect(workerNode.tasksQueue.maxSize).toBe(0)
626 }
627 })
628
629 it('Verify that pool worker info are initialized', async () => {
630 let pool = new FixedClusterPool(
631 numberOfWorkers,
632 './tests/worker-files/cluster/testWorker.js'
633 )
634 for (const workerNode of pool.workerNodes) {
635 expect(workerNode.info).toStrictEqual({
636 id: expect.any(Number),
637 type: WorkerTypes.cluster,
638 dynamic: false,
639 ready: true
640 })
641 }
642 await pool.destroy()
643 pool = new DynamicThreadPool(
644 Math.floor(numberOfWorkers / 2),
645 numberOfWorkers,
646 './tests/worker-files/thread/testWorker.js'
647 )
648 for (const workerNode of pool.workerNodes) {
649 expect(workerNode.info).toStrictEqual({
650 id: expect.any(Number),
651 type: WorkerTypes.thread,
652 dynamic: false,
653 ready: true
654 })
655 }
656 })
657
658 it('Verify that pool worker tasks usage are computed', async () => {
659 const pool = new FixedClusterPool(
660 numberOfWorkers,
661 './tests/worker-files/cluster/testWorker.js'
662 )
663 const promises = new Set()
664 const maxMultiplier = 2
665 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
666 promises.add(pool.execute())
667 }
668 for (const workerNode of pool.workerNodes) {
669 expect(workerNode.usage).toStrictEqual({
670 tasks: {
671 executed: 0,
672 executing: maxMultiplier,
673 queued: 0,
674 maxQueued: 0,
675 failed: 0
676 },
677 runTime: {
678 history: expect.any(CircularArray)
679 },
680 waitTime: {
681 history: expect.any(CircularArray)
682 },
683 elu: {
684 idle: {
685 history: expect.any(CircularArray)
686 },
687 active: {
688 history: expect.any(CircularArray)
689 }
690 }
691 })
692 }
693 await Promise.all(promises)
694 for (const workerNode of pool.workerNodes) {
695 expect(workerNode.usage).toStrictEqual({
696 tasks: {
697 executed: maxMultiplier,
698 executing: 0,
699 queued: 0,
700 maxQueued: 0,
701 failed: 0
702 },
703 runTime: {
704 history: expect.any(CircularArray)
705 },
706 waitTime: {
707 history: expect.any(CircularArray)
708 },
709 elu: {
710 idle: {
711 history: expect.any(CircularArray)
712 },
713 active: {
714 history: expect.any(CircularArray)
715 }
716 }
717 })
718 }
719 await pool.destroy()
720 })
721
722 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
723 const pool = new DynamicThreadPool(
724 Math.floor(numberOfWorkers / 2),
725 numberOfWorkers,
726 './tests/worker-files/thread/testWorker.js'
727 )
728 const promises = new Set()
729 const maxMultiplier = 2
730 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
731 promises.add(pool.execute())
732 }
733 await Promise.all(promises)
734 for (const workerNode of pool.workerNodes) {
735 expect(workerNode.usage).toStrictEqual({
736 tasks: {
737 executed: expect.any(Number),
738 executing: 0,
739 queued: 0,
740 maxQueued: 0,
741 failed: 0
742 },
743 runTime: {
744 history: expect.any(CircularArray)
745 },
746 waitTime: {
747 history: expect.any(CircularArray)
748 },
749 elu: {
750 idle: {
751 history: expect.any(CircularArray)
752 },
753 active: {
754 history: expect.any(CircularArray)
755 }
756 }
757 })
758 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
759 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
760 numberOfWorkers * maxMultiplier
761 )
762 expect(workerNode.usage.runTime.history.length).toBe(0)
763 expect(workerNode.usage.waitTime.history.length).toBe(0)
764 expect(workerNode.usage.elu.idle.history.length).toBe(0)
765 expect(workerNode.usage.elu.active.history.length).toBe(0)
766 }
767 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
768 for (const workerNode of pool.workerNodes) {
769 expect(workerNode.usage).toStrictEqual({
770 tasks: {
771 executed: 0,
772 executing: 0,
773 queued: 0,
774 maxQueued: 0,
775 failed: 0
776 },
777 runTime: {
778 history: expect.any(CircularArray)
779 },
780 waitTime: {
781 history: expect.any(CircularArray)
782 },
783 elu: {
784 idle: {
785 history: expect.any(CircularArray)
786 },
787 active: {
788 history: expect.any(CircularArray)
789 }
790 }
791 })
792 expect(workerNode.usage.runTime.history.length).toBe(0)
793 expect(workerNode.usage.waitTime.history.length).toBe(0)
794 expect(workerNode.usage.elu.idle.history.length).toBe(0)
795 expect(workerNode.usage.elu.active.history.length).toBe(0)
796 }
797 await pool.destroy()
798 })
799
800 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
801 const pool = new DynamicClusterPool(
802 Math.floor(numberOfWorkers / 2),
803 numberOfWorkers,
804 './tests/worker-files/cluster/testWorker.js'
805 )
806 let poolInfo
807 let poolReady = 0
808 pool.emitter.on(PoolEvents.ready, (info) => {
809 ++poolReady
810 poolInfo = info
811 })
812 await waitPoolEvents(pool, PoolEvents.ready, 1)
813 expect(poolReady).toBe(1)
814 expect(poolInfo).toStrictEqual({
815 version,
816 type: PoolTypes.dynamic,
817 worker: WorkerTypes.cluster,
818 ready: true,
819 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
820 minSize: expect.any(Number),
821 maxSize: expect.any(Number),
822 workerNodes: expect.any(Number),
823 idleWorkerNodes: expect.any(Number),
824 busyWorkerNodes: expect.any(Number),
825 executedTasks: expect.any(Number),
826 executingTasks: expect.any(Number),
827 failedTasks: expect.any(Number)
828 })
829 await pool.destroy()
830 })
831
832 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
833 const pool = new FixedThreadPool(
834 numberOfWorkers,
835 './tests/worker-files/thread/testWorker.js'
836 )
837 const promises = new Set()
838 let poolBusy = 0
839 let poolInfo
840 pool.emitter.on(PoolEvents.busy, (info) => {
841 ++poolBusy
842 poolInfo = info
843 })
844 for (let i = 0; i < numberOfWorkers * 2; i++) {
845 promises.add(pool.execute())
846 }
847 await Promise.all(promises)
848 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
849 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
850 expect(poolBusy).toBe(numberOfWorkers + 1)
851 expect(poolInfo).toStrictEqual({
852 version,
853 type: PoolTypes.fixed,
854 worker: WorkerTypes.thread,
855 ready: expect.any(Boolean),
856 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
857 minSize: expect.any(Number),
858 maxSize: expect.any(Number),
859 workerNodes: expect.any(Number),
860 idleWorkerNodes: expect.any(Number),
861 busyWorkerNodes: expect.any(Number),
862 executedTasks: expect.any(Number),
863 executingTasks: expect.any(Number),
864 failedTasks: expect.any(Number)
865 })
866 await pool.destroy()
867 })
868
869 it("Verify that pool event emitter 'full' event can register a callback", async () => {
870 const pool = new DynamicThreadPool(
871 Math.floor(numberOfWorkers / 2),
872 numberOfWorkers,
873 './tests/worker-files/thread/testWorker.js'
874 )
875 const promises = new Set()
876 let poolFull = 0
877 let poolInfo
878 pool.emitter.on(PoolEvents.full, (info) => {
879 ++poolFull
880 poolInfo = info
881 })
882 for (let i = 0; i < numberOfWorkers * 2; i++) {
883 promises.add(pool.execute())
884 }
885 await Promise.all(promises)
886 expect(poolFull).toBe(1)
887 expect(poolInfo).toStrictEqual({
888 version,
889 type: PoolTypes.dynamic,
890 worker: WorkerTypes.thread,
891 ready: expect.any(Boolean),
892 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
893 minSize: expect.any(Number),
894 maxSize: expect.any(Number),
895 workerNodes: expect.any(Number),
896 idleWorkerNodes: expect.any(Number),
897 busyWorkerNodes: expect.any(Number),
898 executedTasks: expect.any(Number),
899 executingTasks: expect.any(Number),
900 failedTasks: expect.any(Number)
901 })
902 await pool.destroy()
903 })
904
905 it.skip("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
906 const pool = new FixedThreadPool(
907 numberOfWorkers,
908 './tests/worker-files/thread/testWorker.js',
909 {
910 enableTasksQueue: true
911 }
912 )
913 for (const workerNode of pool.workerNodes) {
914 workerNode.hasBackPressure = sinon
915 .stub()
916 .onFirstCall()
917 .returns(true)
918 .returns(false)
919 }
920 const promises = new Set()
921 let poolBackPressure = 0
922 let poolInfo
923 pool.emitter.on(PoolEvents.backPressure, (info) => {
924 ++poolBackPressure
925 poolInfo = info
926 })
927 for (let i = 0; i < numberOfWorkers * 2; i++) {
928 promises.add(pool.execute())
929 }
930 // console.log(pool.info.backPressure)
931 await Promise.all(promises)
932 // console.log(pool.info.backPressure)
933 expect(poolBackPressure).toBe(1)
934 expect(poolInfo).toStrictEqual({
935 version,
936 type: PoolTypes.dynamic,
937 worker: WorkerTypes.thread,
938 ready: expect.any(Boolean),
939 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
940 minSize: expect.any(Number),
941 maxSize: expect.any(Number),
942 workerNodes: expect.any(Number),
943 idleWorkerNodes: expect.any(Number),
944 busyWorkerNodes: expect.any(Number),
945 executedTasks: expect.any(Number),
946 executingTasks: expect.any(Number),
947 failedTasks: expect.any(Number)
948 })
949 await pool.destroy()
950 })
951
952 it('Verify that listTaskFunctions() is working', async () => {
953 const dynamicThreadPool = new DynamicThreadPool(
954 Math.floor(numberOfWorkers / 2),
955 numberOfWorkers,
956 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
957 )
958 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
959 expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
960 'default',
961 'jsonIntegerSerialization',
962 'factorial',
963 'fibonacci'
964 ])
965 const fixedClusterPool = new FixedClusterPool(
966 numberOfWorkers,
967 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
968 )
969 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
970 expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
971 'default',
972 'jsonIntegerSerialization',
973 'factorial',
974 'fibonacci'
975 ])
976 })
977
978 it('Verify that multiple task functions worker is working', async () => {
979 const pool = new DynamicClusterPool(
980 Math.floor(numberOfWorkers / 2),
981 numberOfWorkers,
982 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
983 )
984 const data = { n: 10 }
985 const result0 = await pool.execute(data)
986 expect(result0).toStrictEqual({ ok: 1 })
987 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
988 expect(result1).toStrictEqual({ ok: 1 })
989 const result2 = await pool.execute(data, 'factorial')
990 expect(result2).toBe(3628800)
991 const result3 = await pool.execute(data, 'fibonacci')
992 expect(result3).toBe(55)
993 expect(pool.info.executingTasks).toBe(0)
994 expect(pool.info.executedTasks).toBe(4)
995 for (const workerNode of pool.workerNodes) {
996 expect(workerNode.info.taskFunctions).toStrictEqual([
997 'default',
998 'jsonIntegerSerialization',
999 'factorial',
1000 'fibonacci'
1001 ])
1002 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1003 for (const name of pool.listTaskFunctions()) {
1004 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1005 tasks: {
1006 executed: expect.any(Number),
1007 executing: expect.any(Number),
1008 failed: 0,
1009 queued: 0
1010 },
1011 runTime: {
1012 history: expect.any(CircularArray)
1013 },
1014 waitTime: {
1015 history: expect.any(CircularArray)
1016 },
1017 elu: {
1018 idle: {
1019 history: expect.any(CircularArray)
1020 },
1021 active: {
1022 history: expect.any(CircularArray)
1023 }
1024 }
1025 })
1026 expect(
1027 workerNode.getTaskFunctionWorkerUsage(name).tasks.executing
1028 ).toBeGreaterThanOrEqual(0)
1029 }
1030 }
1031 })
1032 })