fix: fix worker function type definition and validation
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
9e619829 3 DynamicThreadPool,
aee46736 4 FixedClusterPool,
e843b904 5 FixedThreadPool,
aee46736 6 PoolEvents,
e843b904
JB
7 WorkerChoiceStrategies
8} = require('../../../lib/index')
78099a15 9const { CircularArray } = require('../../../lib/circular-array')
e1ffb94f
JB
10
11describe('Abstract pool test suite', () => {
12 const numberOfWorkers = 1
3032893a 13 const workerNotFoundInPoolError = new Error(
f06e48d8 14 'Worker could not be found in the pool worker nodes'
e1ffb94f 15 )
a8884ffd 16 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 17 removeAllWorker () {
d4aeae5a 18 this.workerNodes = []
c923ce56 19 this.promiseResponseMap.clear()
e1ffb94f 20 }
3ec964d6 21 }
a8884ffd 22 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
23 isMain () {
24 return false
25 }
3ec964d6 26 }
3ec964d6 27
3ec964d6 28 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
29 expect(
30 () =>
a8884ffd 31 new StubPoolWithIsMain(
7c0ba920 32 numberOfWorkers,
8d3782fa
JB
33 './tests/worker-files/thread/testWorker.js',
34 {
35 errorHandler: e => console.error(e)
36 }
37 )
d4aeae5a 38 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 39 })
c510fea7
APA
40
41 it('Verify that filePath is checked', () => {
292ad316
JB
42 const expectedError = new Error(
43 'Please specify a file with a worker implementation'
44 )
7c0ba920 45 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 46 expectedError
8d3782fa 47 )
7c0ba920 48 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 49 expectedError
8d3782fa
JB
50 )
51 })
52
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 55 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
56 )
57 })
58
59 it('Verify that a negative number of workers is checked', () => {
60 expect(
61 () =>
62 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
63 ).toThrowError(
473c717a
JB
64 new RangeError(
65 'Cannot instantiate a pool with a negative number of workers'
66 )
8d3782fa
JB
67 )
68 })
69
70 it('Verify that a non integer number of workers is checked', () => {
71 expect(
72 () =>
73 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
74 ).toThrowError(
473c717a 75 new TypeError(
8d3782fa
JB
76 'Cannot instantiate a pool with a non integer number of workers'
77 )
78 )
c510fea7 79 })
7c0ba920 80
fd7ebd49 81 it('Verify that pool options are checked', async () => {
7c0ba920
JB
82 let pool = new FixedThreadPool(
83 numberOfWorkers,
84 './tests/worker-files/thread/testWorker.js'
85 )
8620fb25 86 expect(pool.opts.enableEvents).toBe(true)
7c0ba920 87 expect(pool.emitter).toBeDefined()
ff733df7 88 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 89 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
90 expect(pool.opts.workerChoiceStrategy).toBe(
91 WorkerChoiceStrategies.ROUND_ROBIN
92 )
da309861
JB
93 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
94 medRunTime: false
95 })
35cf1c03
JB
96 expect(pool.opts.messageHandler).toBeUndefined()
97 expect(pool.opts.errorHandler).toBeUndefined()
98 expect(pool.opts.onlineHandler).toBeUndefined()
99 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 100 await pool.destroy()
35cf1c03 101 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
102 pool = new FixedThreadPool(
103 numberOfWorkers,
104 './tests/worker-files/thread/testWorker.js',
105 {
737c6d97 106 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
da309861 107 workerChoiceStrategyOptions: { medRunTime: true },
35cf1c03 108 enableEvents: false,
ff733df7 109 enableTasksQueue: true,
d4aeae5a 110 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
111 messageHandler: testHandler,
112 errorHandler: testHandler,
113 onlineHandler: testHandler,
114 exitHandler: testHandler
7c0ba920
JB
115 }
116 )
8620fb25 117 expect(pool.opts.enableEvents).toBe(false)
7c0ba920 118 expect(pool.emitter).toBeUndefined()
ff733df7 119 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 120 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 121 expect(pool.opts.workerChoiceStrategy).toBe(
737c6d97 122 WorkerChoiceStrategies.LESS_USED
e843b904 123 )
da309861
JB
124 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
125 medRunTime: true
126 })
35cf1c03
JB
127 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
128 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
129 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
130 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 131 await pool.destroy()
7c0ba920
JB
132 })
133
d4aeae5a
JB
134 it('Verify that pool options are valid', async () => {
135 expect(
136 () =>
137 new FixedThreadPool(
138 numberOfWorkers,
139 './tests/worker-files/thread/testWorker.js',
140 {
141 enableTasksQueue: true,
142 tasksQueueOptions: { concurrency: 0 }
143 }
144 )
145 ).toThrowError("Invalid worker tasks concurrency '0'")
146 expect(
147 () =>
148 new FixedThreadPool(
149 numberOfWorkers,
150 './tests/worker-files/thread/testWorker.js',
151 {
152 workerChoiceStrategy: 'invalidStrategy'
153 }
154 )
155 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
156 })
157
158 it('Simulate worker not found at getWorkerTasksUsage()', async () => {
a8884ffd 159 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4
JB
160 numberOfWorkers,
161 './tests/worker-files/cluster/testWorker.js',
162 {
10fcfaf4
JB
163 errorHandler: e => console.error(e)
164 }
165 )
d4aeae5a 166 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
167 // Simulate worker not found.
168 pool.removeAllWorker()
d4aeae5a 169 expect(pool.workerNodes.length).toBe(0)
3032893a
JB
170 expect(() => pool.getWorkerTasksUsage()).toThrowError(
171 workerNotFoundInPoolError
bf9549ae 172 )
fd7ebd49 173 await pool.destroy()
bf9549ae
JB
174 })
175
fd7ebd49 176 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
177 const pool = new FixedClusterPool(
178 numberOfWorkers,
179 './tests/worker-files/cluster/testWorker.js'
180 )
f06e48d8
JB
181 for (const workerNode of pool.workerNodes) {
182 expect(workerNode.tasksUsage).toBeDefined()
183 expect(workerNode.tasksUsage.run).toBe(0)
184 expect(workerNode.tasksUsage.running).toBe(0)
185 expect(workerNode.tasksUsage.runTime).toBe(0)
186 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
187 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
188 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
189 expect(workerNode.tasksUsage.medRunTime).toBe(0)
190 expect(workerNode.tasksUsage.error).toBe(0)
191 }
192 await pool.destroy()
193 })
194
195 it('Verify that worker pool tasks queue are initialized', async () => {
196 const pool = new FixedClusterPool(
197 numberOfWorkers,
198 './tests/worker-files/cluster/testWorker.js'
199 )
200 for (const workerNode of pool.workerNodes) {
201 expect(workerNode.tasksQueue).toBeDefined()
202 expect(workerNode.tasksQueue).toBeInstanceOf(Array)
203 expect(workerNode.tasksQueue.length).toBe(0)
bf9549ae 204 }
fd7ebd49 205 await pool.destroy()
bf9549ae
JB
206 })
207
208 it('Verify that worker pool tasks usage are computed', async () => {
209 const pool = new FixedClusterPool(
210 numberOfWorkers,
211 './tests/worker-files/cluster/testWorker.js'
212 )
213 const promises = []
214 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 215 promises.push(pool.execute())
bf9549ae 216 }
f06e48d8
JB
217 for (const workerNode of pool.workerNodes) {
218 expect(workerNode.tasksUsage).toBeDefined()
219 expect(workerNode.tasksUsage.run).toBe(0)
220 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
221 expect(workerNode.tasksUsage.runTime).toBe(0)
222 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
223 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
224 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
225 expect(workerNode.tasksUsage.medRunTime).toBe(0)
226 expect(workerNode.tasksUsage.error).toBe(0)
bf9549ae
JB
227 }
228 await Promise.all(promises)
f06e48d8
JB
229 for (const workerNode of pool.workerNodes) {
230 expect(workerNode.tasksUsage).toBeDefined()
231 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
232 expect(workerNode.tasksUsage.running).toBe(0)
233 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
234 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
235 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
236 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
237 expect(workerNode.tasksUsage.medRunTime).toBe(0)
238 expect(workerNode.tasksUsage.error).toBe(0)
bf9549ae 239 }
fd7ebd49 240 await pool.destroy()
bf9549ae
JB
241 })
242
ee11a4a2 243 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 244 const pool = new DynamicThreadPool(
9e619829
JB
245 numberOfWorkers,
246 numberOfWorkers,
247 './tests/worker-files/thread/testWorker.js'
248 )
7fd82a1c 249 const promises = []
9e619829
JB
250 for (let i = 0; i < numberOfWorkers * 2; i++) {
251 promises.push(pool.execute())
252 }
253 await Promise.all(promises)
f06e48d8
JB
254 for (const workerNode of pool.workerNodes) {
255 expect(workerNode.tasksUsage).toBeDefined()
256 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
257 expect(workerNode.tasksUsage.running).toBe(0)
258 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
259 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
260 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
261 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
262 expect(workerNode.tasksUsage.medRunTime).toBe(0)
263 expect(workerNode.tasksUsage.error).toBe(0)
9e619829
JB
264 }
265 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8
JB
266 for (const workerNode of pool.workerNodes) {
267 expect(workerNode.tasksUsage).toBeDefined()
268 expect(workerNode.tasksUsage.run).toBe(0)
269 expect(workerNode.tasksUsage.running).toBe(0)
270 expect(workerNode.tasksUsage.runTime).toBe(0)
271 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
272 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
273 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
274 expect(workerNode.tasksUsage.medRunTime).toBe(0)
275 expect(workerNode.tasksUsage.error).toBe(0)
ee11a4a2 276 }
fd7ebd49 277 await pool.destroy()
ee11a4a2
JB
278 })
279
164d950a
JB
280 it("Verify that pool event emitter 'full' event can register a callback", async () => {
281 const pool = new DynamicThreadPool(
282 numberOfWorkers,
283 numberOfWorkers,
284 './tests/worker-files/thread/testWorker.js'
285 )
286 const promises = []
287 let poolFull = 0
aee46736 288 pool.emitter.on(PoolEvents.full, () => ++poolFull)
164d950a
JB
289 for (let i = 0; i < numberOfWorkers * 2; i++) {
290 promises.push(pool.execute())
291 }
292 await Promise.all(promises)
594bfb84 293 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
164d950a
JB
294 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
295 expect(poolFull).toBe(numberOfWorkers + 1)
296 await pool.destroy()
297 })
298
cf597bc5 299 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
300 const pool = new FixedThreadPool(
301 numberOfWorkers,
302 './tests/worker-files/thread/testWorker.js'
303 )
304 const promises = []
305 let poolBusy = 0
aee46736 306 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 307 for (let i = 0; i < numberOfWorkers * 2; i++) {
6db75ad9 308 promises.push(pool.execute())
7c0ba920 309 }
cf597bc5 310 await Promise.all(promises)
14916bf9
JB
311 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
312 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
313 expect(poolBusy).toBe(numberOfWorkers + 1)
fd7ebd49 314 await pool.destroy()
7c0ba920 315 })
3ec964d6 316})