Merge pull request #747 from poolifier/multiple-functions
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 PoolEvents,
8 WorkerChoiceStrategies
9 } = require('../../../lib')
10 const { CircularArray } = require('../../../lib/circular-array')
11 const { Queue } = require('../../../lib/queue')
12
13 describe('Abstract pool test suite', () => {
14 const numberOfWorkers = 1
15 const workerNotFoundInPoolError = new Error(
16 'Worker could not be found in the pool worker nodes'
17 )
18 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
19 removeAllWorker () {
20 this.workerNodes = []
21 this.promiseResponseMap.clear()
22 }
23 }
24 class StubPoolWithIsMain extends FixedThreadPool {
25 isMain () {
26 return false
27 }
28 }
29
30 it('Simulate pool creation from a non main thread/process', () => {
31 expect(
32 () =>
33 new StubPoolWithIsMain(
34 numberOfWorkers,
35 './tests/worker-files/thread/testWorker.js',
36 {
37 errorHandler: e => console.error(e)
38 }
39 )
40 ).toThrowError('Cannot start a pool from a worker!')
41 })
42
43 it('Verify that filePath is checked', () => {
44 const expectedError = new Error(
45 'Please specify a file with a worker implementation'
46 )
47 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
48 expectedError
49 )
50 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
51 expectedError
52 )
53 })
54
55 it('Verify that numberOfWorkers is checked', () => {
56 expect(() => new FixedThreadPool()).toThrowError(
57 'Cannot instantiate a pool without specifying the number of workers'
58 )
59 })
60
61 it('Verify that a negative number of workers is checked', () => {
62 expect(
63 () =>
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
65 ).toThrowError(
66 new RangeError(
67 'Cannot instantiate a pool with a negative number of workers'
68 )
69 )
70 })
71
72 it('Verify that a non integer number of workers is checked', () => {
73 expect(
74 () =>
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
76 ).toThrowError(
77 new TypeError(
78 'Cannot instantiate a pool with a non integer number of workers'
79 )
80 )
81 })
82
83 it('Verify that pool options are checked', async () => {
84 let pool = new FixedThreadPool(
85 numberOfWorkers,
86 './tests/worker-files/thread/testWorker.js'
87 )
88 expect(pool.opts.enableEvents).toBe(true)
89 expect(pool.emitter).toBeDefined()
90 expect(pool.opts.enableTasksQueue).toBe(false)
91 expect(pool.opts.tasksQueueOptions).toBeUndefined()
92 expect(pool.opts.workerChoiceStrategy).toBe(
93 WorkerChoiceStrategies.ROUND_ROBIN
94 )
95 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
96 medRunTime: false
97 })
98 expect(pool.opts.messageHandler).toBeUndefined()
99 expect(pool.opts.errorHandler).toBeUndefined()
100 expect(pool.opts.onlineHandler).toBeUndefined()
101 expect(pool.opts.exitHandler).toBeUndefined()
102 await pool.destroy()
103 const testHandler = () => console.log('test handler executed')
104 pool = new FixedThreadPool(
105 numberOfWorkers,
106 './tests/worker-files/thread/testWorker.js',
107 {
108 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
109 workerChoiceStrategyOptions: { medRunTime: true },
110 enableEvents: false,
111 enableTasksQueue: true,
112 tasksQueueOptions: { concurrency: 2 },
113 messageHandler: testHandler,
114 errorHandler: testHandler,
115 onlineHandler: testHandler,
116 exitHandler: testHandler
117 }
118 )
119 expect(pool.opts.enableEvents).toBe(false)
120 expect(pool.emitter).toBeUndefined()
121 expect(pool.opts.enableTasksQueue).toBe(true)
122 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
123 expect(pool.opts.workerChoiceStrategy).toBe(
124 WorkerChoiceStrategies.LESS_USED
125 )
126 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
127 medRunTime: true
128 })
129 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
130 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
131 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
132 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
133 await pool.destroy()
134 })
135
136 it('Verify that pool options are validated', async () => {
137 expect(
138 () =>
139 new FixedThreadPool(
140 numberOfWorkers,
141 './tests/worker-files/thread/testWorker.js',
142 {
143 enableTasksQueue: true,
144 tasksQueueOptions: { concurrency: 0 }
145 }
146 )
147 ).toThrowError("Invalid worker tasks concurrency '0'")
148 expect(
149 () =>
150 new FixedThreadPool(
151 numberOfWorkers,
152 './tests/worker-files/thread/testWorker.js',
153 {
154 workerChoiceStrategy: 'invalidStrategy'
155 }
156 )
157 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
158 })
159
160 it('Verify that worker choice strategy options can be set', async () => {
161 const pool = new FixedThreadPool(
162 numberOfWorkers,
163 './tests/worker-files/thread/testWorker.js',
164 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
165 )
166 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
167 medRunTime: false
168 })
169 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
170 .workerChoiceStrategies) {
171 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
172 }
173 expect(
174 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
175 ).toBe(true)
176 expect(
177 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
178 ).toBe(false)
179 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
180 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
181 medRunTime: true
182 })
183 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
184 .workerChoiceStrategies) {
185 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
186 }
187 expect(
188 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
189 ).toBe(false)
190 expect(
191 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
192 ).toBe(true)
193 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
194 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
195 medRunTime: false
196 })
197 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
198 .workerChoiceStrategies) {
199 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
200 }
201 expect(
202 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
203 ).toBe(true)
204 expect(
205 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
206 ).toBe(false)
207 await pool.destroy()
208 })
209
210 it('Verify that tasks queue can be enabled/disabled', async () => {
211 const pool = new FixedThreadPool(
212 numberOfWorkers,
213 './tests/worker-files/thread/testWorker.js'
214 )
215 expect(pool.opts.enableTasksQueue).toBe(false)
216 expect(pool.opts.tasksQueueOptions).toBeUndefined()
217 pool.enableTasksQueue(true)
218 expect(pool.opts.enableTasksQueue).toBe(true)
219 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
220 pool.enableTasksQueue(true, { concurrency: 2 })
221 expect(pool.opts.enableTasksQueue).toBe(true)
222 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
223 pool.enableTasksQueue(false)
224 expect(pool.opts.enableTasksQueue).toBe(false)
225 expect(pool.opts.tasksQueueOptions).toBeUndefined()
226 await pool.destroy()
227 })
228
229 it('Verify that tasks queue options can be set', async () => {
230 const pool = new FixedThreadPool(
231 numberOfWorkers,
232 './tests/worker-files/thread/testWorker.js',
233 { enableTasksQueue: true }
234 )
235 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
236 pool.setTasksQueueOptions({ concurrency: 2 })
237 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
238 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
239 "Invalid worker tasks concurrency '0'"
240 )
241 await pool.destroy()
242 })
243
244 it('Simulate worker not found at getWorkerTasksUsage()', async () => {
245 const pool = new StubPoolWithRemoveAllWorker(
246 numberOfWorkers,
247 './tests/worker-files/cluster/testWorker.js',
248 {
249 errorHandler: e => console.error(e)
250 }
251 )
252 expect(pool.workerNodes.length).toBe(numberOfWorkers)
253 // Simulate worker not found.
254 pool.removeAllWorker()
255 expect(pool.workerNodes.length).toBe(0)
256 expect(() => pool.getWorkerTasksUsage()).toThrowError(
257 workerNotFoundInPoolError
258 )
259 await pool.destroy()
260 })
261
262 it('Verify that worker pool tasks usage are initialized', async () => {
263 const pool = new FixedClusterPool(
264 numberOfWorkers,
265 './tests/worker-files/cluster/testWorker.js'
266 )
267 for (const workerNode of pool.workerNodes) {
268 expect(workerNode.tasksUsage).toBeDefined()
269 expect(workerNode.tasksUsage.run).toBe(0)
270 expect(workerNode.tasksUsage.running).toBe(0)
271 expect(workerNode.tasksUsage.runTime).toBe(0)
272 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
273 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
274 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
275 expect(workerNode.tasksUsage.medRunTime).toBe(0)
276 expect(workerNode.tasksUsage.error).toBe(0)
277 }
278 await pool.destroy()
279 })
280
281 it('Verify that worker pool tasks queue are initialized', async () => {
282 const pool = new FixedClusterPool(
283 numberOfWorkers,
284 './tests/worker-files/cluster/testWorker.js'
285 )
286 for (const workerNode of pool.workerNodes) {
287 expect(workerNode.tasksQueue).toBeDefined()
288 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
289 expect(workerNode.tasksQueue.size).toBe(0)
290 }
291 await pool.destroy()
292 })
293
294 it('Verify that worker pool tasks usage are computed', async () => {
295 const pool = new FixedClusterPool(
296 numberOfWorkers,
297 './tests/worker-files/cluster/testWorker.js'
298 )
299 const promises = []
300 for (let i = 0; i < numberOfWorkers * 2; i++) {
301 promises.push(pool.execute())
302 }
303 for (const workerNode of pool.workerNodes) {
304 expect(workerNode.tasksUsage).toBeDefined()
305 expect(workerNode.tasksUsage.run).toBe(0)
306 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
307 expect(workerNode.tasksUsage.runTime).toBe(0)
308 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
309 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
310 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
311 expect(workerNode.tasksUsage.medRunTime).toBe(0)
312 expect(workerNode.tasksUsage.error).toBe(0)
313 }
314 await Promise.all(promises)
315 for (const workerNode of pool.workerNodes) {
316 expect(workerNode.tasksUsage).toBeDefined()
317 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
318 expect(workerNode.tasksUsage.running).toBe(0)
319 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
320 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
321 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
322 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
323 expect(workerNode.tasksUsage.medRunTime).toBe(0)
324 expect(workerNode.tasksUsage.error).toBe(0)
325 }
326 await pool.destroy()
327 })
328
329 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
330 const pool = new DynamicThreadPool(
331 numberOfWorkers,
332 numberOfWorkers,
333 './tests/worker-files/thread/testWorker.js'
334 )
335 const promises = []
336 for (let i = 0; i < numberOfWorkers * 2; i++) {
337 promises.push(pool.execute())
338 }
339 await Promise.all(promises)
340 for (const workerNode of pool.workerNodes) {
341 expect(workerNode.tasksUsage).toBeDefined()
342 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
343 expect(workerNode.tasksUsage.running).toBe(0)
344 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
345 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
346 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
347 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
348 expect(workerNode.tasksUsage.medRunTime).toBe(0)
349 expect(workerNode.tasksUsage.error).toBe(0)
350 }
351 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
352 for (const workerNode of pool.workerNodes) {
353 expect(workerNode.tasksUsage).toBeDefined()
354 expect(workerNode.tasksUsage.run).toBe(0)
355 expect(workerNode.tasksUsage.running).toBe(0)
356 expect(workerNode.tasksUsage.runTime).toBe(0)
357 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
358 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
359 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
360 expect(workerNode.tasksUsage.medRunTime).toBe(0)
361 expect(workerNode.tasksUsage.error).toBe(0)
362 }
363 await pool.destroy()
364 })
365
366 it("Verify that pool event emitter 'full' event can register a callback", async () => {
367 const pool = new DynamicThreadPool(
368 numberOfWorkers,
369 numberOfWorkers,
370 './tests/worker-files/thread/testWorker.js'
371 )
372 const promises = []
373 let poolFull = 0
374 pool.emitter.on(PoolEvents.full, () => ++poolFull)
375 for (let i = 0; i < numberOfWorkers * 2; i++) {
376 promises.push(pool.execute())
377 }
378 await Promise.all(promises)
379 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
380 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
381 expect(poolFull).toBe(numberOfWorkers + 1)
382 await pool.destroy()
383 })
384
385 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
386 const pool = new FixedThreadPool(
387 numberOfWorkers,
388 './tests/worker-files/thread/testWorker.js'
389 )
390 const promises = []
391 let poolBusy = 0
392 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
393 for (let i = 0; i < numberOfWorkers * 2; i++) {
394 promises.push(pool.execute())
395 }
396 await Promise.all(promises)
397 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
398 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
399 expect(poolBusy).toBe(numberOfWorkers + 1)
400 await pool.destroy()
401 })
402
403 it('Verify that multiple tasks worker is working', async () => {
404 const pool = new DynamicClusterPool(
405 numberOfWorkers,
406 numberOfWorkers * 2,
407 './tests/worker-files/cluster/testMultiTasksWorker.js'
408 )
409 const data = { n: 10 }
410 const result0 = await pool.execute(data)
411 expect(result0).toBe(false)
412 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
413 expect(result1).toBe(false)
414 const result2 = await pool.execute(data, 'factorial')
415 expect(result2).toBe(3628800)
416 const result3 = await pool.execute(data, 'fibonacci')
417 expect(result3).toBe(89)
418 })
419 })