Commit | Line | Data |
---|---|---|
a074ffee | 1 | import { expect } from 'expect' |
ded253e2 | 2 | |
8e8d9101 JB |
3 | import { |
4 | DynamicThreadPool, | |
5 | PoolEvents, | |
6 | WorkerChoiceStrategies | |
7 | } from '../../../lib/index.cjs' | |
d35e5717 | 8 | import { TaskFunctions } from '../../test-types.cjs' |
8e8d9101 | 9 | import { sleep, waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs' |
506c2a14 | 10 | |
a35560ba | 11 | describe('Dynamic thread pool test suite', () => { |
e1ffb94f JB |
12 | const min = 1 |
13 | const max = 3 | |
14 | const pool = new DynamicThreadPool( | |
15 | min, | |
16 | max, | |
b2fd3f4a | 17 | './tests/worker-files/thread/testWorker.mjs', |
e1ffb94f | 18 | { |
041dc05b | 19 | errorHandler: e => console.error(e) |
e1ffb94f JB |
20 | } |
21 | ) | |
22 | ||
506c2a14 | 23 | it('Verify that the function is executed in a worker thread', async () => { |
6db75ad9 | 24 | let result = await pool.execute({ |
dbca3be9 | 25 | function: TaskFunctions.fibonacci |
6db75ad9 | 26 | }) |
66f0c14c | 27 | expect(result).toBe(354224848179262000000) |
6db75ad9 | 28 | result = await pool.execute({ |
dbca3be9 | 29 | function: TaskFunctions.factorial |
6db75ad9 | 30 | }) |
70a4f5ea | 31 | expect(result).toBe(9.33262154439441e157) |
506c2a14 | 32 | }) |
33 | ||
34 | it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => { | |
7c0ba920 | 35 | let poolBusy = 0 |
aee46736 | 36 | pool.emitter.on(PoolEvents.busy, () => ++poolBusy) |
cf9aa6c3 | 37 | for (let i = 0; i < max * 2; i++) { |
8cbb82eb | 38 | pool.execute() |
506c2a14 | 39 | } |
f06e48d8 JB |
40 | expect(pool.workerNodes.length).toBeLessThanOrEqual(max) |
41 | expect(pool.workerNodes.length).toBeGreaterThan(min) | |
94407def | 42 | expect(poolBusy).toBe(1) |
bac873bd | 43 | const numberOfExitEvents = await waitWorkerEvents(pool, 'exit', max - min) |
bdacc2d2 | 44 | expect(numberOfExitEvents).toBe(max - min) |
9b5c72ff | 45 | expect(pool.workerNodes.length).toBe(min) |
506c2a14 | 46 | }) |
47 | ||
bcf04003 | 48 | it('Verify scale thread up and down is working', async () => { |
e211bc18 | 49 | for (let i = 0; i < max * 2; i++) { |
6db75ad9 | 50 | pool.execute() |
bcf04003 | 51 | } |
f06e48d8 | 52 | expect(pool.workerNodes.length).toBe(max) |
bac873bd | 53 | await waitWorkerEvents(pool, 'exit', max - min) |
f06e48d8 | 54 | expect(pool.workerNodes.length).toBe(min) |
e211bc18 | 55 | for (let i = 0; i < max * 2; i++) { |
6db75ad9 | 56 | pool.execute() |
bcf04003 | 57 | } |
f06e48d8 | 58 | expect(pool.workerNodes.length).toBe(max) |
bac873bd | 59 | await waitWorkerEvents(pool, 'exit', max - min) |
f06e48d8 | 60 | expect(pool.workerNodes.length).toBe(min) |
bcf04003 | 61 | }) |
c01733f1 | 62 | |
506c2a14 | 63 | it('Shutdown test', async () => { |
bac873bd | 64 | const exitPromise = waitWorkerEvents(pool, 'exit', min) |
c726f66c | 65 | expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy]) |
ef3891a3 JB |
66 | let poolDestroy = 0 |
67 | pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy) | |
c726f66c JB |
68 | expect(pool.emitter.eventNames()).toStrictEqual([ |
69 | PoolEvents.busy, | |
70 | PoolEvents.destroy | |
71 | ]) | |
1f9a5a44 | 72 | await pool.destroy() |
cf597bc5 | 73 | const numberOfExitEvents = await exitPromise |
bb9423b7 | 74 | expect(pool.started).toBe(false) |
a621172f JB |
75 | expect(pool.emitter.eventNames()).toStrictEqual([ |
76 | PoolEvents.busy, | |
77 | PoolEvents.destroy | |
78 | ]) | |
55082af9 | 79 | expect(pool.readyEventEmitted).toBe(false) |
bb9423b7 | 80 | expect(pool.workerNodes.length).toBe(0) |
cf597bc5 | 81 | expect(numberOfExitEvents).toBe(min) |
ef3891a3 | 82 | expect(poolDestroy).toBe(1) |
506c2a14 | 83 | }) |
84 | ||
8d3782fa | 85 | it('Validation of inputs test', () => { |
948faff7 | 86 | expect(() => new DynamicThreadPool(min)).toThrow( |
c3719753 | 87 | 'The worker file path must be specified' |
85a3f8a7 | 88 | ) |
506c2a14 | 89 | }) |
90 | ||
91 | it('Should work even without opts in input', async () => { | |
0fe39c97 | 92 | const pool = new DynamicThreadPool( |
e1ffb94f JB |
93 | min, |
94 | max, | |
b2fd3f4a | 95 | './tests/worker-files/thread/testWorker.mjs' |
325f50bc | 96 | ) |
0fe39c97 | 97 | const res = await pool.execute() |
30b963d4 | 98 | expect(res).toStrictEqual({ ok: 1 }) |
0e2503fc | 99 | // We need to clean up the resources after our test |
0fe39c97 | 100 | await pool.destroy() |
506c2a14 | 101 | }) |
c01733f1 | 102 | |
1c6fe997 | 103 | it('Verify scale thread up and down is working when long executing task is used:hard', async () => { |
c01733f1 | 104 | const longRunningPool = new DynamicThreadPool( |
105 | min, | |
106 | max, | |
b2fd3f4a | 107 | './tests/worker-files/thread/longRunningWorkerHardBehavior.mjs', |
4c35177b | 108 | { |
041dc05b | 109 | errorHandler: e => console.error(e), |
73bfd59d JB |
110 | onlineHandler: () => console.info('long executing worker is online'), |
111 | exitHandler: () => console.info('long executing worker exited') | |
4c35177b | 112 | } |
113 | ) | |
f06e48d8 | 114 | expect(longRunningPool.workerNodes.length).toBe(min) |
e211bc18 | 115 | for (let i = 0; i < max * 2; i++) { |
6db75ad9 | 116 | longRunningPool.execute() |
4c35177b | 117 | } |
f06e48d8 | 118 | expect(longRunningPool.workerNodes.length).toBe(max) |
bac873bd | 119 | await waitWorkerEvents(longRunningPool, 'exit', max - min) |
f06e48d8 | 120 | expect(longRunningPool.workerNodes.length).toBe(min) |
d710242d | 121 | expect( |
bcfb06ce JB |
122 | longRunningPool.workerChoiceStrategiesContext.workerChoiceStrategies.get( |
123 | longRunningPool.workerChoiceStrategiesContext | |
124 | .defaultWorkerChoiceStrategy | |
9b106837 | 125 | ).nextWorkerNodeKey |
d710242d | 126 | ).toBeLessThan(longRunningPool.workerNodes.length) |
0e2503fc JB |
127 | // We need to clean up the resources after our test |
128 | await longRunningPool.destroy() | |
4c35177b | 129 | }) |
130 | ||
1c6fe997 | 131 | it('Verify scale thread up and down is working when long executing task is used:soft', async () => { |
4c35177b | 132 | const longRunningPool = new DynamicThreadPool( |
133 | min, | |
134 | max, | |
b2fd3f4a | 135 | './tests/worker-files/thread/longRunningWorkerSoftBehavior.mjs', |
c01733f1 | 136 | { |
041dc05b | 137 | errorHandler: e => console.error(e), |
73bfd59d JB |
138 | onlineHandler: () => console.info('long executing worker is online'), |
139 | exitHandler: () => console.info('long executing worker exited') | |
c01733f1 | 140 | } |
141 | ) | |
f06e48d8 | 142 | expect(longRunningPool.workerNodes.length).toBe(min) |
e211bc18 | 143 | for (let i = 0; i < max * 2; i++) { |
6db75ad9 | 144 | longRunningPool.execute() |
c01733f1 | 145 | } |
f06e48d8 | 146 | expect(longRunningPool.workerNodes.length).toBe(max) |
920278a2 | 147 | await sleep(1000) |
1c6fe997 | 148 | // Here we expect the workerNodes to be at the max size since the task is still executing |
f06e48d8 | 149 | expect(longRunningPool.workerNodes.length).toBe(max) |
0e2503fc JB |
150 | // We need to clean up the resources after our test |
151 | await longRunningPool.destroy() | |
c01733f1 | 152 | }) |
8d3782fa JB |
153 | |
154 | it('Verify that a pool with zero worker can be instantiated', async () => { | |
155 | const pool = new DynamicThreadPool( | |
156 | 0, | |
157 | max, | |
b2fd3f4a | 158 | './tests/worker-files/thread/testWorker.mjs' |
8d3782fa JB |
159 | ) |
160 | expect(pool).toBeInstanceOf(DynamicThreadPool) | |
161 | // We need to clean up the resources after our test | |
162 | await pool.destroy() | |
163 | }) | |
e44639e9 JB |
164 | |
165 | it('Verify that a pool with zero worker works', async () => { | |
8e8d9101 | 166 | for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { |
6d7beb8c JB |
167 | const pool = new DynamicThreadPool( |
168 | 0, | |
169 | max, | |
170 | './tests/worker-files/thread/testWorker.mjs', | |
171 | { | |
172 | workerChoiceStrategy | |
173 | } | |
174 | ) | |
175 | expect(pool.starting).toBe(false) | |
8e8d9101 JB |
176 | expect(pool.readyEventEmitted).toBe(false) |
177 | for (let run = 0; run < 2; run++) { | |
178 | run % 2 !== 0 && pool.enableTasksQueue(true) | |
179 | const maxMultiplier = 4 | |
180 | const promises = new Set() | |
181 | expect(pool.workerNodes.length).toBe(pool.info.minSize) | |
182 | for (let i = 0; i < max * maxMultiplier; i++) { | |
183 | promises.add(pool.execute()) | |
184 | } | |
185 | await Promise.all(promises) | |
186 | expect(pool.readyEventEmitted).toBe(true) | |
187 | expect(pool.workerNodes.length).toBeGreaterThan(pool.info.minSize) | |
188 | expect(pool.workerNodes.length).toBeLessThanOrEqual(pool.info.maxSize) | |
189 | await waitPoolEvents(pool, PoolEvents.empty, 1) | |
190 | expect(pool.readyEventEmitted).toBe(false) | |
191 | expect(pool.workerNodes.length).toBe(pool.info.minSize) | |
28881126 | 192 | } |
6d7beb8c JB |
193 | // We need to clean up the resources after our test |
194 | await pool.destroy() | |
a2283a19 | 195 | } |
e44639e9 | 196 | }) |
506c2a14 | 197 | }) |