1 import { randomInt } from 'node:crypto'
3 import { expect } from 'expect'
5 import { CircularArray } from '../../../lib/circular-array.cjs'
11 WorkerChoiceStrategies
12 } from '../../../lib/index.cjs'
14 describe('Selection strategies test suite', () => {
18 it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
19 expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
20 expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
21 expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
22 expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
23 expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
24 expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
25 'WEIGHTED_ROUND_ROBIN'
27 expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe(
28 'INTERLEAVED_WEIGHTED_ROUND_ROBIN'
32 it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {
33 const pool = new DynamicThreadPool(
36 './tests/worker-files/thread/testWorker.mjs'
38 expect(pool.opts.workerChoiceStrategy).toBe(
39 WorkerChoiceStrategies.ROUND_ROBIN
41 expect(pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy).toBe(
42 WorkerChoiceStrategies.ROUND_ROBIN
44 // We need to clean up the resources after our test
48 it('Verify available strategies are taken at pool creation', async () => {
49 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
50 const pool = new FixedThreadPool(
52 './tests/worker-files/thread/testWorker.mjs',
53 { workerChoiceStrategy }
55 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
57 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
58 ).toBe(workerChoiceStrategy)
63 it('Verify available strategies can be set after pool creation', async () => {
64 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
65 const pool = new DynamicThreadPool(
68 './tests/worker-files/thread/testWorker.mjs'
70 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
71 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
73 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
74 ).toBe(workerChoiceStrategy)
77 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
78 const pool = new DynamicClusterPool(
81 './tests/worker-files/cluster/testWorker.cjs'
83 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
84 expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
86 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
87 ).toBe(workerChoiceStrategy)
92 it('Verify available strategies default internals at pool creation', async () => {
93 for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
94 const pool = new FixedThreadPool(
96 './tests/worker-files/thread/testWorker.mjs',
97 { workerChoiceStrategy }
100 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
105 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
107 ).previousWorkerNodeKey
110 workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
113 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
115 ).workerNodeVirtualTaskExecutionTime
118 workerChoiceStrategy ===
119 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
122 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
124 ).workerNodeVirtualTaskExecutionTime
127 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
132 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
137 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
139 ).roundWeights.length
142 Number.isSafeInteger(
143 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
153 it('Verify ROUND_ROBIN strategy default policy', async () => {
154 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
155 let pool = new FixedThreadPool(
157 './tests/worker-files/thread/testWorker.mjs',
158 { workerChoiceStrategy }
160 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
161 dynamicWorkerUsage: false,
162 dynamicWorkerReady: true
165 pool = new DynamicThreadPool(
168 './tests/worker-files/thread/testWorker.mjs',
169 { workerChoiceStrategy }
171 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
172 dynamicWorkerUsage: false,
173 dynamicWorkerReady: true
175 // We need to clean up the resources after our test
179 it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
180 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
181 let pool = new FixedThreadPool(
183 './tests/worker-files/thread/testWorker.mjs',
184 { workerChoiceStrategy }
187 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
206 pool = new DynamicThreadPool(
209 './tests/worker-files/thread/testWorker.mjs',
210 { workerChoiceStrategy }
213 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
231 // We need to clean up the resources after our test
235 it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
236 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
237 const pool = new FixedThreadPool(
239 './tests/worker-files/thread/testWorker.mjs',
240 { workerChoiceStrategy }
242 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
243 const promises = new Set()
244 const maxMultiplier = 2
245 for (let i = 0; i < max * maxMultiplier; i++) {
246 promises.add(pool.execute())
248 await Promise.all(promises)
249 for (const workerNode of pool.workerNodes) {
250 expect(workerNode.usage).toStrictEqual({
252 executed: maxMultiplier,
256 sequentiallyStolen: 0,
261 history: new CircularArray()
264 history: new CircularArray()
268 history: new CircularArray()
271 history: new CircularArray()
277 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
278 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
282 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
283 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
284 ).previousWorkerNodeKey
285 ).toBe(pool.workerNodes.length - 1)
286 // We need to clean up the resources after our test
290 it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
291 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
292 const pool = new DynamicThreadPool(
295 './tests/worker-files/thread/testWorker.mjs',
296 { workerChoiceStrategy }
298 // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
299 const promises = new Set()
300 const maxMultiplier = 2
301 for (let i = 0; i < max * maxMultiplier; i++) {
302 promises.add(pool.execute())
304 await Promise.all(promises)
305 for (const workerNode of pool.workerNodes) {
306 expect(workerNode.usage).toStrictEqual({
308 executed: expect.any(Number),
312 sequentiallyStolen: 0,
317 history: new CircularArray()
320 history: new CircularArray()
324 history: new CircularArray()
327 history: new CircularArray()
331 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
332 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
337 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
338 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
342 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
343 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
344 ).previousWorkerNodeKey
345 ).toBe(pool.workerNodes.length - 1)
346 // We need to clean up the resources after our test
350 it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
351 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
352 let pool = new FixedClusterPool(
354 './tests/worker-files/cluster/testWorker.cjs',
355 { workerChoiceStrategy }
357 let results = new Set()
358 for (let i = 0; i < max; i++) {
359 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
361 expect(results.size).toBe(max)
363 pool = new FixedThreadPool(
365 './tests/worker-files/thread/testWorker.mjs',
366 { workerChoiceStrategy }
369 for (let i = 0; i < max; i++) {
370 results.add(pool.workerNodes[pool.chooseWorkerNode()].info.id)
372 expect(results.size).toBe(max)
376 it("Verify ROUND_ROBIN strategy internals aren't reset after setting it", async () => {
377 const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
378 let pool = new FixedThreadPool(
380 './tests/worker-files/thread/testWorker.mjs',
381 { workerChoiceStrategy }
383 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
384 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
385 ).nextWorkerNodeKey = randomInt(1, max - 1)
386 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
387 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
388 ).previousWorkerNodeKey = randomInt(1, max - 1)
389 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
391 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
392 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
396 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
397 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
398 ).previousWorkerNodeKey
401 pool = new DynamicThreadPool(
404 './tests/worker-files/thread/testWorker.mjs',
405 { workerChoiceStrategy }
407 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
408 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
409 ).nextWorkerNodeKey = randomInt(1, max - 1)
410 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
411 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
412 ).previousWorkerNodeKey = randomInt(1, max - 1)
413 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
415 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
416 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
420 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
421 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
422 ).previousWorkerNodeKey
424 // We need to clean up the resources after our test
428 it('Verify LEAST_USED strategy default policy', async () => {
429 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
430 let pool = new FixedThreadPool(
432 './tests/worker-files/thread/testWorker.mjs',
433 { workerChoiceStrategy }
435 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
436 dynamicWorkerUsage: false,
437 dynamicWorkerReady: true
440 pool = new DynamicThreadPool(
443 './tests/worker-files/thread/testWorker.mjs',
444 { workerChoiceStrategy }
446 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
447 dynamicWorkerUsage: false,
448 dynamicWorkerReady: true
450 // We need to clean up the resources after our test
454 it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
455 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
456 let pool = new FixedThreadPool(
458 './tests/worker-files/thread/testWorker.mjs',
459 { workerChoiceStrategy }
462 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
481 pool = new DynamicThreadPool(
484 './tests/worker-files/thread/testWorker.mjs',
485 { workerChoiceStrategy }
488 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
506 // We need to clean up the resources after our test
510 it('Verify LEAST_USED strategy can be run in a fixed pool', async () => {
511 const pool = new FixedThreadPool(
513 './tests/worker-files/thread/testWorker.mjs',
514 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
516 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
517 const promises = new Set()
518 const maxMultiplier = 2
519 for (let i = 0; i < max * maxMultiplier; i++) {
520 promises.add(pool.execute())
522 await Promise.all(promises)
523 for (const workerNode of pool.workerNodes) {
524 expect(workerNode.usage).toStrictEqual({
526 executed: expect.any(Number),
530 sequentiallyStolen: 0,
535 history: new CircularArray()
538 history: new CircularArray()
542 history: new CircularArray()
545 history: new CircularArray()
549 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
550 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
555 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
556 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
558 ).toEqual(expect.any(Number))
560 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
561 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
562 ).previousWorkerNodeKey
563 ).toEqual(expect.any(Number))
564 // We need to clean up the resources after our test
568 it('Verify LEAST_USED strategy can be run in a dynamic pool', async () => {
569 const pool = new DynamicThreadPool(
572 './tests/worker-files/thread/testWorker.mjs',
573 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED }
575 // TODO: Create a better test to cover `LeastUsedWorkerChoiceStrategy#choose`
576 const promises = new Set()
577 const maxMultiplier = 2
578 for (let i = 0; i < max * maxMultiplier; i++) {
579 promises.add(pool.execute())
581 await Promise.all(promises)
582 for (const workerNode of pool.workerNodes) {
583 expect(workerNode.usage).toStrictEqual({
585 executed: expect.any(Number),
589 sequentiallyStolen: 0,
594 history: new CircularArray()
597 history: new CircularArray()
601 history: new CircularArray()
604 history: new CircularArray()
608 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
609 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
614 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
615 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
617 ).toEqual(expect.any(Number))
619 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
620 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
621 ).previousWorkerNodeKey
622 ).toEqual(expect.any(Number))
623 // We need to clean up the resources after our test
627 it('Verify LEAST_BUSY strategy default policy', async () => {
628 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
629 let pool = new FixedThreadPool(
631 './tests/worker-files/thread/testWorker.mjs',
632 { workerChoiceStrategy }
634 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
635 dynamicWorkerUsage: false,
636 dynamicWorkerReady: true
639 pool = new DynamicThreadPool(
642 './tests/worker-files/thread/testWorker.mjs',
643 { workerChoiceStrategy }
645 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
646 dynamicWorkerUsage: false,
647 dynamicWorkerReady: true
649 // We need to clean up the resources after our test
653 it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
654 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
655 let pool = new FixedThreadPool(
657 './tests/worker-files/thread/testWorker.mjs',
658 { workerChoiceStrategy }
661 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
680 pool = new DynamicThreadPool(
683 './tests/worker-files/thread/testWorker.mjs',
684 { workerChoiceStrategy }
687 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
705 // We need to clean up the resources after our test
709 it('Verify LEAST_BUSY strategy can be run in a fixed pool', async () => {
710 const pool = new FixedThreadPool(
712 './tests/worker-files/thread/testWorker.mjs',
713 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
715 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
716 const promises = new Set()
717 const maxMultiplier = 2
718 for (let i = 0; i < max * maxMultiplier; i++) {
719 promises.add(pool.execute())
721 await Promise.all(promises)
722 for (const workerNode of pool.workerNodes) {
723 expect(workerNode.usage).toStrictEqual({
725 executed: expect.any(Number),
729 sequentiallyStolen: 0,
733 runTime: expect.objectContaining({
734 history: expect.any(CircularArray)
736 waitTime: expect.objectContaining({
737 history: expect.any(CircularArray)
741 history: new CircularArray()
744 history: new CircularArray()
748 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
749 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
752 if (workerNode.usage.runTime.aggregate == null) {
753 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
755 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
757 if (workerNode.usage.waitTime.aggregate == null) {
758 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
760 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
764 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
765 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
767 ).toEqual(expect.any(Number))
769 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
770 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
771 ).previousWorkerNodeKey
772 ).toEqual(expect.any(Number))
773 // We need to clean up the resources after our test
777 it('Verify LEAST_BUSY strategy can be run in a dynamic pool', async () => {
778 const pool = new DynamicThreadPool(
781 './tests/worker-files/thread/testWorker.mjs',
782 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_BUSY }
784 // TODO: Create a better test to cover `LeastBusyWorkerChoiceStrategy#choose`
785 const promises = new Set()
786 const maxMultiplier = 2
787 for (let i = 0; i < max * maxMultiplier; i++) {
788 promises.add(pool.execute())
790 await Promise.all(promises)
791 for (const workerNode of pool.workerNodes) {
792 expect(workerNode.usage).toStrictEqual({
794 executed: expect.any(Number),
798 sequentiallyStolen: 0,
802 runTime: expect.objectContaining({
803 history: expect.any(CircularArray)
805 waitTime: expect.objectContaining({
806 history: expect.any(CircularArray)
810 history: new CircularArray()
813 history: new CircularArray()
817 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
818 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
821 if (workerNode.usage.runTime.aggregate == null) {
822 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
824 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
826 if (workerNode.usage.waitTime.aggregate == null) {
827 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
829 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
833 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
834 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
836 ).toEqual(expect.any(Number))
838 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
839 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
840 ).previousWorkerNodeKey
841 ).toEqual(expect.any(Number))
842 // We need to clean up the resources after our test
846 it('Verify LEAST_ELU strategy default policy', async () => {
847 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
848 let pool = new FixedThreadPool(
850 './tests/worker-files/thread/testWorker.mjs',
851 { workerChoiceStrategy }
853 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
854 dynamicWorkerUsage: false,
855 dynamicWorkerReady: true
858 pool = new DynamicThreadPool(
861 './tests/worker-files/thread/testWorker.mjs',
862 { workerChoiceStrategy }
864 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
865 dynamicWorkerUsage: false,
866 dynamicWorkerReady: true
868 // We need to clean up the resources after our test
872 it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
873 const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
874 let pool = new FixedThreadPool(
876 './tests/worker-files/thread/testWorker.mjs',
877 { workerChoiceStrategy }
880 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
899 pool = new DynamicThreadPool(
902 './tests/worker-files/thread/testWorker.mjs',
903 { workerChoiceStrategy }
906 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
924 // We need to clean up the resources after our test
928 it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
929 const pool = new FixedThreadPool(
931 './tests/worker-files/thread/testWorker.mjs',
932 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
934 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
935 const promises = new Set()
936 const maxMultiplier = 2
937 for (let i = 0; i < max * maxMultiplier; i++) {
938 promises.add(pool.execute())
940 await Promise.all(promises)
941 for (const workerNode of pool.workerNodes) {
942 expect(workerNode.usage).toStrictEqual({
944 executed: expect.any(Number),
948 sequentiallyStolen: 0,
953 history: new CircularArray()
956 history: new CircularArray()
958 elu: expect.objectContaining({
959 idle: expect.objectContaining({
960 history: expect.any(CircularArray)
962 active: expect.objectContaining({
963 history: expect.any(CircularArray)
967 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
968 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
971 if (workerNode.usage.elu.active.aggregate == null) {
972 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
974 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
976 if (workerNode.usage.elu.idle.aggregate == null) {
977 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
979 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
981 if (workerNode.usage.elu.utilization == null) {
982 expect(workerNode.usage.elu.utilization).toBeUndefined()
984 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
985 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
989 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
990 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
992 ).toEqual(expect.any(Number))
994 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
995 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
996 ).previousWorkerNodeKey
997 ).toEqual(expect.any(Number))
998 // We need to clean up the resources after our test
1002 it('Verify LEAST_ELU strategy can be run in a dynamic pool', async () => {
1003 const pool = new DynamicThreadPool(
1006 './tests/worker-files/thread/testWorker.mjs',
1007 { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
1009 // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
1010 const promises = new Set()
1011 const maxMultiplier = 2
1012 for (let i = 0; i < max * maxMultiplier; i++) {
1013 promises.add(pool.execute())
1015 await Promise.all(promises)
1016 for (const workerNode of pool.workerNodes) {
1017 expect(workerNode.usage).toStrictEqual({
1019 executed: expect.any(Number),
1023 sequentiallyStolen: 0,
1028 history: new CircularArray()
1031 history: new CircularArray()
1033 elu: expect.objectContaining({
1034 idle: expect.objectContaining({
1035 history: expect.any(CircularArray)
1037 active: expect.objectContaining({
1038 history: expect.any(CircularArray)
1042 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1043 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1046 if (workerNode.usage.elu.active.aggregate == null) {
1047 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1049 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1051 if (workerNode.usage.elu.idle.aggregate == null) {
1052 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1054 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1056 if (workerNode.usage.elu.utilization == null) {
1057 expect(workerNode.usage.elu.utilization).toBeUndefined()
1059 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1060 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1064 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1065 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1067 ).toEqual(expect.any(Number))
1069 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1070 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1071 ).previousWorkerNodeKey
1072 ).toEqual(expect.any(Number))
1073 // We need to clean up the resources after our test
1074 await pool.destroy()
1077 it('Verify FAIR_SHARE strategy default policy', async () => {
1078 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1079 let pool = new FixedThreadPool(
1081 './tests/worker-files/thread/testWorker.mjs',
1082 { workerChoiceStrategy }
1084 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
1085 dynamicWorkerUsage: false,
1086 dynamicWorkerReady: true
1088 await pool.destroy()
1089 pool = new DynamicThreadPool(
1092 './tests/worker-files/thread/testWorker.mjs',
1093 { workerChoiceStrategy }
1095 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
1096 dynamicWorkerUsage: false,
1097 dynamicWorkerReady: true
1099 // We need to clean up the resources after our test
1100 await pool.destroy()
1103 it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
1104 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1105 let pool = new FixedThreadPool(
1107 './tests/worker-files/thread/testWorker.mjs',
1108 { workerChoiceStrategy }
1111 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
1129 await pool.destroy()
1130 pool = new DynamicThreadPool(
1133 './tests/worker-files/thread/testWorker.mjs',
1134 { workerChoiceStrategy }
1137 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
1155 // We need to clean up the resources after our test
1156 await pool.destroy()
1159 it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
1160 const pool = new FixedThreadPool(
1162 './tests/worker-files/thread/testWorker.mjs',
1163 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1165 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1166 const promises = new Set()
1167 const maxMultiplier = 2
1168 for (let i = 0; i < max * maxMultiplier; i++) {
1169 promises.add(pool.execute())
1171 await Promise.all(promises)
1172 for (const workerNode of pool.workerNodes) {
1173 expect(workerNode.usage).toStrictEqual({
1175 executed: expect.any(Number),
1179 sequentiallyStolen: 0,
1183 runTime: expect.objectContaining({
1184 history: expect.any(CircularArray)
1186 waitTime: expect.objectContaining({
1187 history: expect.any(CircularArray)
1189 elu: expect.objectContaining({
1190 idle: expect.objectContaining({
1191 history: expect.any(CircularArray)
1193 active: expect.objectContaining({
1194 history: expect.any(CircularArray)
1198 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1199 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1202 if (workerNode.usage.runTime.aggregate == null) {
1203 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1205 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1207 if (workerNode.usage.runTime.average == null) {
1208 expect(workerNode.usage.runTime.average).toBeUndefined()
1210 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1212 if (workerNode.usage.waitTime.aggregate == null) {
1213 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
1215 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
1217 if (workerNode.usage.waitTime.average == null) {
1218 expect(workerNode.usage.waitTime.average).toBeUndefined()
1220 expect(workerNode.usage.waitTime.average).toBeGreaterThan(0)
1222 if (workerNode.usage.elu.active.aggregate == null) {
1223 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1225 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1227 if (workerNode.usage.elu.idle.aggregate == null) {
1228 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1230 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1232 if (workerNode.usage.elu.utilization == null) {
1233 expect(workerNode.usage.elu.utilization).toBeUndefined()
1235 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1236 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1238 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1241 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1242 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1244 ).toEqual(expect.any(Number))
1246 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1247 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1248 ).previousWorkerNodeKey
1249 ).toEqual(expect.any(Number))
1250 // We need to clean up the resources after our test
1251 await pool.destroy()
1254 it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
1255 const pool = new DynamicThreadPool(
1258 './tests/worker-files/thread/testWorker.mjs',
1259 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
1261 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1262 const promises = new Set()
1263 const maxMultiplier = 2
1264 for (let i = 0; i < max * maxMultiplier; i++) {
1265 promises.add(pool.execute())
1267 await Promise.all(promises)
1268 for (const workerNode of pool.workerNodes) {
1269 expect(workerNode.usage).toStrictEqual({
1271 executed: expect.any(Number),
1275 sequentiallyStolen: 0,
1279 runTime: expect.objectContaining({
1280 history: expect.any(CircularArray)
1282 waitTime: expect.objectContaining({
1283 history: expect.any(CircularArray)
1285 elu: expect.objectContaining({
1286 idle: expect.objectContaining({
1287 history: expect.any(CircularArray)
1289 active: expect.objectContaining({
1290 history: expect.any(CircularArray)
1294 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1295 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1298 if (workerNode.usage.runTime.aggregate == null) {
1299 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1301 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1303 if (workerNode.usage.runTime.average == null) {
1304 expect(workerNode.usage.runTime.average).toBeUndefined()
1306 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1308 if (workerNode.usage.waitTime.aggregate == null) {
1309 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
1311 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
1313 if (workerNode.usage.waitTime.average == null) {
1314 expect(workerNode.usage.waitTime.average).toBeUndefined()
1316 expect(workerNode.usage.waitTime.average).toBeGreaterThan(0)
1318 if (workerNode.usage.elu.active.aggregate == null) {
1319 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1321 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1323 if (workerNode.usage.elu.idle.aggregate == null) {
1324 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1326 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1328 if (workerNode.usage.elu.utilization == null) {
1329 expect(workerNode.usage.elu.utilization).toBeUndefined()
1331 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1332 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1334 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1337 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1338 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1340 ).toEqual(expect.any(Number))
1342 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1343 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1344 ).previousWorkerNodeKey
1345 ).toEqual(expect.any(Number))
1346 // We need to clean up the resources after our test
1347 await pool.destroy()
1350 it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median runtime statistic', async () => {
1351 const pool = new DynamicThreadPool(
1354 './tests/worker-files/thread/testWorker.mjs',
1356 workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
1357 workerChoiceStrategyOptions: {
1358 runTime: { median: true }
1362 // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
1363 const promises = new Set()
1364 const maxMultiplier = 2
1365 for (let i = 0; i < max * maxMultiplier; i++) {
1366 promises.add(pool.execute())
1368 await Promise.all(promises)
1369 for (const workerNode of pool.workerNodes) {
1370 expect(workerNode.usage).toStrictEqual({
1372 executed: expect.any(Number),
1376 sequentiallyStolen: 0,
1380 runTime: expect.objectContaining({
1381 history: expect.any(CircularArray)
1383 waitTime: expect.objectContaining({
1384 history: expect.any(CircularArray)
1386 elu: expect.objectContaining({
1387 idle: expect.objectContaining({
1388 history: expect.any(CircularArray)
1390 active: expect.objectContaining({
1391 history: expect.any(CircularArray)
1395 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1396 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1399 if (workerNode.usage.runTime.aggregate == null) {
1400 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1402 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1404 if (workerNode.usage.runTime.median == null) {
1405 expect(workerNode.usage.runTime.median).toBeUndefined()
1407 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1409 if (workerNode.usage.waitTime.aggregate == null) {
1410 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
1412 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
1414 if (workerNode.usage.waitTime.median == null) {
1415 expect(workerNode.usage.waitTime.median).toBeUndefined()
1417 expect(workerNode.usage.waitTime.median).toBeGreaterThan(0)
1419 if (workerNode.usage.elu.active.aggregate == null) {
1420 expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
1422 expect(workerNode.usage.elu.active.aggregate).toBeGreaterThan(0)
1424 if (workerNode.usage.elu.idle.aggregate == null) {
1425 expect(workerNode.usage.elu.idle.aggregate).toBeUndefined()
1427 expect(workerNode.usage.elu.idle.aggregate).toBeGreaterThanOrEqual(0)
1429 if (workerNode.usage.elu.utilization == null) {
1430 expect(workerNode.usage.elu.utilization).toBeUndefined()
1432 expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
1433 expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
1435 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1438 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1439 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1441 ).toEqual(expect.any(Number))
1443 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1444 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1445 ).previousWorkerNodeKey
1446 ).toEqual(expect.any(Number))
1447 // We need to clean up the resources after our test
1448 await pool.destroy()
1451 it("Verify FAIR_SHARE strategy internals aren't reset after setting it", async () => {
1452 const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
1453 let pool = new FixedThreadPool(
1455 './tests/worker-files/thread/testWorker.mjs'
1457 for (const workerNode of pool.workerNodes) {
1458 workerNode.strategyData = {
1459 virtualTaskEndTimestamp: performance.now()
1462 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1463 for (const workerNode of pool.workerNodes) {
1464 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1466 await pool.destroy()
1467 pool = new DynamicThreadPool(
1470 './tests/worker-files/thread/testWorker.mjs'
1472 for (const workerNode of pool.workerNodes) {
1473 workerNode.strategyData = {
1474 virtualTaskEndTimestamp: performance.now()
1477 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1478 for (const workerNode of pool.workerNodes) {
1479 expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
1481 // We need to clean up the resources after our test
1482 await pool.destroy()
1485 it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1486 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1487 let pool = new FixedThreadPool(
1489 './tests/worker-files/thread/testWorker.mjs',
1490 { workerChoiceStrategy }
1492 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
1493 dynamicWorkerUsage: false,
1494 dynamicWorkerReady: true
1496 await pool.destroy()
1497 pool = new DynamicThreadPool(
1500 './tests/worker-files/thread/testWorker.mjs',
1501 { workerChoiceStrategy }
1503 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
1504 dynamicWorkerUsage: false,
1505 dynamicWorkerReady: true
1507 // We need to clean up the resources after our test
1508 await pool.destroy()
1511 it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1512 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1513 let pool = new FixedThreadPool(
1515 './tests/worker-files/thread/testWorker.mjs',
1516 { workerChoiceStrategy }
1519 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
1537 await pool.destroy()
1538 pool = new DynamicThreadPool(
1541 './tests/worker-files/thread/testWorker.mjs',
1542 { workerChoiceStrategy }
1545 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
1563 // We need to clean up the resources after our test
1564 await pool.destroy()
1567 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1568 const pool = new FixedThreadPool(
1570 './tests/worker-files/thread/testWorker.mjs',
1571 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1573 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1574 const promises = new Set()
1575 const maxMultiplier = 2
1576 for (let i = 0; i < max * maxMultiplier; i++) {
1577 promises.add(pool.execute())
1579 await Promise.all(promises)
1580 for (const workerNode of pool.workerNodes) {
1581 expect(workerNode.usage).toStrictEqual({
1583 executed: expect.any(Number),
1587 sequentiallyStolen: 0,
1591 runTime: expect.objectContaining({
1592 history: expect.any(CircularArray)
1594 waitTime: expect.objectContaining({
1595 history: expect.any(CircularArray)
1599 history: new CircularArray()
1602 history: new CircularArray()
1606 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1607 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1610 if (workerNode.usage.runTime.aggregate == null) {
1611 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1613 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1615 if (workerNode.usage.runTime.average == null) {
1616 expect(workerNode.usage.runTime.average).toBeUndefined()
1618 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1620 if (workerNode.usage.waitTime.aggregate == null) {
1621 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
1623 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
1625 if (workerNode.usage.waitTime.average == null) {
1626 expect(workerNode.usage.waitTime.average).toBeUndefined()
1628 expect(workerNode.usage.waitTime.average).toBeGreaterThan(0)
1632 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1633 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1637 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1638 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1639 ).previousWorkerNodeKey
1642 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1643 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1644 ).workerNodeVirtualTaskExecutionTime
1645 ).toBeGreaterThanOrEqual(0)
1646 // We need to clean up the resources after our test
1647 await pool.destroy()
1650 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
1651 const pool = new DynamicThreadPool(
1654 './tests/worker-files/thread/testWorker.mjs',
1655 { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
1657 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1658 const promises = new Set()
1659 const maxMultiplier = 2
1660 for (let i = 0; i < max * maxMultiplier; i++) {
1661 promises.add(pool.execute())
1663 await Promise.all(promises)
1664 for (const workerNode of pool.workerNodes) {
1665 expect(workerNode.usage).toStrictEqual({
1667 executed: expect.any(Number),
1671 sequentiallyStolen: 0,
1675 runTime: expect.objectContaining({
1676 history: expect.any(CircularArray)
1678 waitTime: expect.objectContaining({
1679 history: expect.any(CircularArray)
1683 history: new CircularArray()
1686 history: new CircularArray()
1690 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1691 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1694 if (workerNode.usage.runTime.aggregate == null) {
1695 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1697 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1699 if (workerNode.usage.runTime.average == null) {
1700 expect(workerNode.usage.runTime.average).toBeUndefined()
1702 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
1704 if (workerNode.usage.waitTime.aggregate == null) {
1705 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
1707 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
1709 if (workerNode.usage.waitTime.average == null) {
1710 expect(workerNode.usage.waitTime.average).toBeUndefined()
1712 expect(workerNode.usage.waitTime.average).toBeGreaterThan(0)
1716 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1717 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1721 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1722 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1723 ).previousWorkerNodeKey
1726 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1727 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1728 ).workerNodeVirtualTaskExecutionTime
1729 ).toBeGreaterThanOrEqual(0)
1730 // We need to clean up the resources after our test
1731 await pool.destroy()
1734 it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median runtime statistic', async () => {
1735 const pool = new DynamicThreadPool(
1738 './tests/worker-files/thread/testWorker.mjs',
1740 workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
1741 workerChoiceStrategyOptions: {
1742 runTime: { median: true }
1746 // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
1747 const promises = new Set()
1748 const maxMultiplier = 2
1749 for (let i = 0; i < max * maxMultiplier; i++) {
1750 promises.add(pool.execute())
1752 await Promise.all(promises)
1753 for (const workerNode of pool.workerNodes) {
1754 expect(workerNode.usage).toStrictEqual({
1756 executed: expect.any(Number),
1760 sequentiallyStolen: 0,
1764 runTime: expect.objectContaining({
1765 history: expect.any(CircularArray)
1767 waitTime: expect.objectContaining({
1768 history: expect.any(CircularArray)
1772 history: new CircularArray()
1775 history: new CircularArray()
1779 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
1780 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1783 if (workerNode.usage.runTime.aggregate == null) {
1784 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
1786 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
1788 if (workerNode.usage.runTime.median == null) {
1789 expect(workerNode.usage.runTime.median).toBeUndefined()
1791 expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
1793 if (workerNode.usage.waitTime.aggregate == null) {
1794 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
1796 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
1798 if (workerNode.usage.waitTime.median == null) {
1799 expect(workerNode.usage.waitTime.median).toBeUndefined()
1801 expect(workerNode.usage.waitTime.median).toBeGreaterThan(0)
1805 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1806 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1810 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1811 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1812 ).previousWorkerNodeKey
1815 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1816 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1817 ).workerNodeVirtualTaskExecutionTime
1818 ).toBeGreaterThanOrEqual(0)
1819 // We need to clean up the resources after our test
1820 await pool.destroy()
1823 it("Verify WEIGHTED_ROUND_ROBIN strategy internals aren't reset after setting it", async () => {
1824 const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
1825 let pool = new FixedThreadPool(
1827 './tests/worker-files/thread/testWorker.mjs',
1828 { workerChoiceStrategy }
1830 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1831 workerChoiceStrategy
1832 ).nextWorkerNodeKey = randomInt(1, max - 1)
1833 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1834 workerChoiceStrategy
1835 ).previousWorkerNodeKey = randomInt(1, max - 1)
1836 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1837 workerChoiceStrategy
1838 ).workerNodeVirtualTaskRunTime = randomInt(100, 1000)
1839 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1841 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1842 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1844 ).toBeGreaterThan(0)
1846 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1847 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1848 ).previousWorkerNodeKey
1849 ).toBeGreaterThan(0)
1851 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1852 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1853 ).workerNodeVirtualTaskRunTime
1854 ).toBeGreaterThan(99)
1855 await pool.destroy()
1856 pool = new DynamicThreadPool(
1859 './tests/worker-files/thread/testWorker.mjs',
1860 { workerChoiceStrategy }
1862 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1863 workerChoiceStrategy
1864 ).nextWorkerNodeKey = randomInt(1, max - 1)
1865 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1866 workerChoiceStrategy
1867 ).previousWorkerNodeKey = randomInt(1, max - 1)
1868 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1869 workerChoiceStrategy
1870 ).workerNodeVirtualTaskRunTime = randomInt(100, 1000)
1871 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
1873 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1874 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1876 ).toBeGreaterThan(0)
1878 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1879 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1880 ).previousWorkerNodeKey
1881 ).toBeGreaterThan(0)
1883 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
1884 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
1885 ).workerNodeVirtualTaskRunTime
1886 ).toBeGreaterThan(99)
1887 // We need to clean up the resources after our test
1888 await pool.destroy()
1891 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
1892 const workerChoiceStrategy =
1893 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1894 let pool = new FixedThreadPool(
1896 './tests/worker-files/thread/testWorker.mjs',
1897 { workerChoiceStrategy }
1899 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
1900 dynamicWorkerUsage: false,
1901 dynamicWorkerReady: true
1903 await pool.destroy()
1904 pool = new DynamicThreadPool(
1907 './tests/worker-files/thread/testWorker.mjs',
1908 { workerChoiceStrategy }
1910 expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
1911 dynamicWorkerUsage: false,
1912 dynamicWorkerReady: true
1914 // We need to clean up the resources after our test
1915 await pool.destroy()
1918 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
1919 const workerChoiceStrategy =
1920 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1921 let pool = new FixedThreadPool(
1923 './tests/worker-files/thread/testWorker.mjs',
1924 { workerChoiceStrategy }
1927 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
1945 await pool.destroy()
1946 pool = new DynamicThreadPool(
1949 './tests/worker-files/thread/testWorker.mjs',
1950 { workerChoiceStrategy }
1953 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
1971 // We need to clean up the resources after our test
1972 await pool.destroy()
1975 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
1976 const pool = new FixedThreadPool(
1978 './tests/worker-files/thread/testWorker.mjs',
1980 workerChoiceStrategy:
1981 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
1984 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
1985 const promises = new Set()
1986 const maxMultiplier = 2
1987 for (let i = 0; i < max * maxMultiplier; i++) {
1988 promises.add(pool.execute())
1990 await Promise.all(promises)
1991 for (const workerNode of pool.workerNodes) {
1992 expect(workerNode.usage).toStrictEqual({
1994 executed: expect.any(Number),
1998 sequentiallyStolen: 0,
2002 runTime: expect.objectContaining({
2003 history: expect.any(CircularArray)
2005 waitTime: expect.objectContaining({
2006 history: expect.any(CircularArray)
2010 history: new CircularArray()
2013 history: new CircularArray()
2017 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2018 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2021 if (workerNode.usage.runTime.aggregate == null) {
2022 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
2024 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
2026 if (workerNode.usage.runTime.average == null) {
2027 expect(workerNode.usage.runTime.average).toBeUndefined()
2029 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
2031 if (workerNode.usage.waitTime.aggregate == null) {
2032 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
2034 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
2036 if (workerNode.usage.waitTime.average == null) {
2037 expect(workerNode.usage.waitTime.average).toBeUndefined()
2039 expect(workerNode.usage.waitTime.average).toBeGreaterThan(0)
2043 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2044 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2048 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2049 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2053 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2054 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2058 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2059 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2060 ).previousWorkerNodeKey
2063 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2064 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2065 ).roundWeights.length
2068 Number.isSafeInteger(
2069 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2070 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2074 // We need to clean up the resources after our test
2075 await pool.destroy()
2078 it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
2079 const pool = new DynamicThreadPool(
2082 './tests/worker-files/thread/testWorker.mjs',
2084 workerChoiceStrategy:
2085 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2088 // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
2089 const promises = new Set()
2090 const maxMultiplier = 2
2091 for (let i = 0; i < max * maxMultiplier; i++) {
2092 promises.add(pool.execute())
2094 await Promise.all(promises)
2095 for (const workerNode of pool.workerNodes) {
2096 expect(workerNode.usage).toStrictEqual({
2098 executed: expect.any(Number),
2102 sequentiallyStolen: 0,
2106 runTime: expect.objectContaining({
2107 history: expect.any(CircularArray)
2109 waitTime: expect.objectContaining({
2110 history: expect.any(CircularArray)
2114 history: new CircularArray()
2117 history: new CircularArray()
2121 expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
2122 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
2125 if (workerNode.usage.runTime.aggregate == null) {
2126 expect(workerNode.usage.runTime.aggregate).toBeUndefined()
2128 expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
2130 if (workerNode.usage.runTime.average == null) {
2131 expect(workerNode.usage.runTime.average).toBeUndefined()
2133 expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
2135 if (workerNode.usage.waitTime.aggregate == null) {
2136 expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
2138 expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
2140 if (workerNode.usage.waitTime.average == null) {
2141 expect(workerNode.usage.waitTime.average).toBeUndefined()
2143 expect(workerNode.usage.waitTime.average).toBeGreaterThan(0)
2147 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2148 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2152 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2153 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2157 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2158 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2162 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2163 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2164 ).previousWorkerNodeKey
2167 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2168 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2169 ).roundWeights.length
2172 Number.isSafeInteger(
2173 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2174 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2178 // We need to clean up the resources after our test
2179 await pool.destroy()
2182 it("Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals aren't reset after setting it", async () => {
2183 const workerChoiceStrategy =
2184 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
2185 let pool = new FixedThreadPool(
2187 './tests/worker-files/thread/testWorker.mjs',
2188 { workerChoiceStrategy }
2190 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2191 workerChoiceStrategy
2192 ).roundId = randomInt(1, max - 1)
2193 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2194 workerChoiceStrategy
2195 ).workerNodeId = randomInt(1, max - 1)
2196 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2197 workerChoiceStrategy
2198 ).nextWorkerNodeKey = randomInt(1, max - 1)
2199 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2200 workerChoiceStrategy
2201 ).previousWorkerNodeKey = randomInt(1, max - 1)
2202 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2203 workerChoiceStrategy
2204 ).roundWeights = [randomInt(1, max - 1), randomInt(1, max - 1)]
2205 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2207 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2208 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2210 ).toBeGreaterThan(0)
2212 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2213 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2215 ).toBeGreaterThan(0)
2217 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2218 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2220 ).toBeGreaterThan(0)
2222 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2223 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2224 ).previousWorkerNodeKey
2225 ).toBeGreaterThan(0)
2227 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2228 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2229 ).roundWeights.length
2230 ).toBeGreaterThan(1)
2232 Number.isSafeInteger(
2233 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2234 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2238 await pool.destroy()
2239 pool = new DynamicThreadPool(
2242 './tests/worker-files/thread/testWorker.mjs',
2243 { workerChoiceStrategy }
2245 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2246 workerChoiceStrategy
2247 ).roundId = randomInt(1, max - 1)
2248 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2249 workerChoiceStrategy
2250 ).workerNodeId = randomInt(1, max - 1)
2251 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2252 workerChoiceStrategy
2253 ).nextWorkerNodeKey = randomInt(1, max - 1)
2254 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2255 workerChoiceStrategy
2256 ).previousWorkerNodeKey = randomInt(1, max - 1)
2257 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2258 workerChoiceStrategy
2259 ).roundWeights = [randomInt(1, max - 1), randomInt(1, max - 1)]
2260 pool.setWorkerChoiceStrategy(workerChoiceStrategy)
2262 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2263 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2265 ).toBeGreaterThan(0)
2267 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2268 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2270 ).toBeGreaterThan(0)
2272 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2273 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2275 ).toBeGreaterThan(0)
2277 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2278 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2279 ).previousWorkerNodeKey
2280 ).toBeGreaterThan(0)
2282 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2283 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2284 ).roundWeights.length
2285 ).toBeGreaterThan(1)
2287 Number.isSafeInteger(
2288 pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
2289 pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
2293 // We need to clean up the resources after our test
2294 await pool.destroy()
2297 it('Verify unknown strategy throw error', () => {
2300 new DynamicThreadPool(
2303 './tests/worker-files/thread/testWorker.mjs',
2304 { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
2306 ).toThrow("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")