Merge branch 'master' into interleaved-weighted-round-robin-worker-choice-strategy
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 PoolEvents,
8 WorkerChoiceStrategies
9 } = require('../../../lib')
10 const { CircularArray } = require('../../../lib/circular-array')
11 const { Queue } = require('../../../lib/queue')
12
13 describe('Abstract pool test suite', () => {
14 const numberOfWorkers = 1
15 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
16 removeAllWorker () {
17 this.workerNodes = []
18 this.promiseResponseMap.clear()
19 }
20 }
21 class StubPoolWithIsMain extends FixedThreadPool {
22 isMain () {
23 return false
24 }
25 }
26
27 it('Simulate pool creation from a non main thread/process', () => {
28 expect(
29 () =>
30 new StubPoolWithIsMain(
31 numberOfWorkers,
32 './tests/worker-files/thread/testWorker.js',
33 {
34 errorHandler: e => console.error(e)
35 }
36 )
37 ).toThrowError('Cannot start a pool from a worker!')
38 })
39
40 it('Verify that filePath is checked', () => {
41 const expectedError = new Error(
42 'Please specify a file with a worker implementation'
43 )
44 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
45 expectedError
46 )
47 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
48 expectedError
49 )
50 })
51
52 it('Verify that numberOfWorkers is checked', () => {
53 expect(() => new FixedThreadPool()).toThrowError(
54 'Cannot instantiate a pool without specifying the number of workers'
55 )
56 })
57
58 it('Verify that a negative number of workers is checked', () => {
59 expect(
60 () =>
61 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
62 ).toThrowError(
63 new RangeError(
64 'Cannot instantiate a pool with a negative number of workers'
65 )
66 )
67 })
68
69 it('Verify that a non integer number of workers is checked', () => {
70 expect(
71 () =>
72 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
73 ).toThrowError(
74 new TypeError(
75 'Cannot instantiate a pool with a non safe integer number of workers'
76 )
77 )
78 })
79
80 it('Verify that pool options are checked', async () => {
81 let pool = new FixedThreadPool(
82 numberOfWorkers,
83 './tests/worker-files/thread/testWorker.js'
84 )
85 expect(pool.opts.enableEvents).toBe(true)
86 expect(pool.emitter).toBeDefined()
87 expect(pool.opts.enableTasksQueue).toBe(false)
88 expect(pool.opts.tasksQueueOptions).toBeUndefined()
89 expect(pool.opts.workerChoiceStrategy).toBe(
90 WorkerChoiceStrategies.ROUND_ROBIN
91 )
92 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
93 medRunTime: false
94 })
95 expect(pool.opts.messageHandler).toBeUndefined()
96 expect(pool.opts.errorHandler).toBeUndefined()
97 expect(pool.opts.onlineHandler).toBeUndefined()
98 expect(pool.opts.exitHandler).toBeUndefined()
99 await pool.destroy()
100 const testHandler = () => console.log('test handler executed')
101 pool = new FixedThreadPool(
102 numberOfWorkers,
103 './tests/worker-files/thread/testWorker.js',
104 {
105 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
106 workerChoiceStrategyOptions: { medRunTime: true },
107 enableEvents: false,
108 enableTasksQueue: true,
109 tasksQueueOptions: { concurrency: 2 },
110 messageHandler: testHandler,
111 errorHandler: testHandler,
112 onlineHandler: testHandler,
113 exitHandler: testHandler
114 }
115 )
116 expect(pool.opts.enableEvents).toBe(false)
117 expect(pool.emitter).toBeUndefined()
118 expect(pool.opts.enableTasksQueue).toBe(true)
119 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
120 expect(pool.opts.workerChoiceStrategy).toBe(
121 WorkerChoiceStrategies.LESS_USED
122 )
123 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
124 medRunTime: true
125 })
126 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
127 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
128 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
129 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
130 await pool.destroy()
131 })
132
133 it('Verify that pool options are validated', async () => {
134 expect(
135 () =>
136 new FixedThreadPool(
137 numberOfWorkers,
138 './tests/worker-files/thread/testWorker.js',
139 {
140 enableTasksQueue: true,
141 tasksQueueOptions: { concurrency: 0 }
142 }
143 )
144 ).toThrowError("Invalid worker tasks concurrency '0'")
145 expect(
146 () =>
147 new FixedThreadPool(
148 numberOfWorkers,
149 './tests/worker-files/thread/testWorker.js',
150 {
151 workerChoiceStrategy: 'invalidStrategy'
152 }
153 )
154 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
155 })
156
157 it('Verify that worker choice strategy options can be set', async () => {
158 const pool = new FixedThreadPool(
159 numberOfWorkers,
160 './tests/worker-files/thread/testWorker.js',
161 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
162 )
163 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
164 medRunTime: false
165 })
166 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
167 .workerChoiceStrategies) {
168 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
169 }
170 expect(
171 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
172 ).toBe(true)
173 expect(
174 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
175 ).toBe(false)
176 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
177 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
178 medRunTime: true
179 })
180 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
181 .workerChoiceStrategies) {
182 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
183 }
184 expect(
185 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
186 ).toBe(false)
187 expect(
188 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
189 ).toBe(true)
190 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
191 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
192 medRunTime: false
193 })
194 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
195 .workerChoiceStrategies) {
196 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
197 }
198 expect(
199 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
200 ).toBe(true)
201 expect(
202 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
203 ).toBe(false)
204 await pool.destroy()
205 })
206
207 it('Verify that tasks queue can be enabled/disabled', async () => {
208 const pool = new FixedThreadPool(
209 numberOfWorkers,
210 './tests/worker-files/thread/testWorker.js'
211 )
212 expect(pool.opts.enableTasksQueue).toBe(false)
213 expect(pool.opts.tasksQueueOptions).toBeUndefined()
214 pool.enableTasksQueue(true)
215 expect(pool.opts.enableTasksQueue).toBe(true)
216 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
217 pool.enableTasksQueue(true, { concurrency: 2 })
218 expect(pool.opts.enableTasksQueue).toBe(true)
219 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
220 pool.enableTasksQueue(false)
221 expect(pool.opts.enableTasksQueue).toBe(false)
222 expect(pool.opts.tasksQueueOptions).toBeUndefined()
223 await pool.destroy()
224 })
225
226 it('Verify that tasks queue options can be set', async () => {
227 const pool = new FixedThreadPool(
228 numberOfWorkers,
229 './tests/worker-files/thread/testWorker.js',
230 { enableTasksQueue: true }
231 )
232 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
233 pool.setTasksQueueOptions({ concurrency: 2 })
234 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
235 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
236 "Invalid worker tasks concurrency '0'"
237 )
238 await pool.destroy()
239 })
240
241 it('Simulate worker not found', async () => {
242 const pool = new StubPoolWithRemoveAllWorker(
243 numberOfWorkers,
244 './tests/worker-files/cluster/testWorker.js',
245 {
246 errorHandler: e => console.error(e)
247 }
248 )
249 expect(pool.workerNodes.length).toBe(numberOfWorkers)
250 // Simulate worker not found.
251 pool.removeAllWorker()
252 expect(pool.workerNodes.length).toBe(0)
253 await pool.destroy()
254 })
255
256 it('Verify that worker pool tasks usage are initialized', async () => {
257 const pool = new FixedClusterPool(
258 numberOfWorkers,
259 './tests/worker-files/cluster/testWorker.js'
260 )
261 for (const workerNode of pool.workerNodes) {
262 expect(workerNode.tasksUsage).toBeDefined()
263 expect(workerNode.tasksUsage.run).toBe(0)
264 expect(workerNode.tasksUsage.running).toBe(0)
265 expect(workerNode.tasksUsage.runTime).toBe(0)
266 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
267 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
268 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
269 expect(workerNode.tasksUsage.medRunTime).toBe(0)
270 expect(workerNode.tasksUsage.error).toBe(0)
271 }
272 await pool.destroy()
273 })
274
275 it('Verify that worker pool tasks queue are initialized', async () => {
276 const pool = new FixedClusterPool(
277 numberOfWorkers,
278 './tests/worker-files/cluster/testWorker.js'
279 )
280 for (const workerNode of pool.workerNodes) {
281 expect(workerNode.tasksQueue).toBeDefined()
282 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
283 expect(workerNode.tasksQueue.size).toBe(0)
284 }
285 await pool.destroy()
286 })
287
288 it('Verify that worker pool tasks usage are computed', async () => {
289 const pool = new FixedClusterPool(
290 numberOfWorkers,
291 './tests/worker-files/cluster/testWorker.js'
292 )
293 const promises = []
294 for (let i = 0; i < numberOfWorkers * 2; i++) {
295 promises.push(pool.execute())
296 }
297 for (const workerNode of pool.workerNodes) {
298 expect(workerNode.tasksUsage).toBeDefined()
299 expect(workerNode.tasksUsage.run).toBe(0)
300 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
301 expect(workerNode.tasksUsage.runTime).toBe(0)
302 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
303 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
304 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
305 expect(workerNode.tasksUsage.medRunTime).toBe(0)
306 expect(workerNode.tasksUsage.error).toBe(0)
307 }
308 await Promise.all(promises)
309 for (const workerNode of pool.workerNodes) {
310 expect(workerNode.tasksUsage).toBeDefined()
311 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
312 expect(workerNode.tasksUsage.running).toBe(0)
313 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
314 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
315 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
316 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
317 expect(workerNode.tasksUsage.medRunTime).toBe(0)
318 expect(workerNode.tasksUsage.error).toBe(0)
319 }
320 await pool.destroy()
321 })
322
323 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
324 const pool = new DynamicThreadPool(
325 numberOfWorkers,
326 numberOfWorkers,
327 './tests/worker-files/thread/testWorker.js'
328 )
329 const promises = []
330 for (let i = 0; i < numberOfWorkers * 2; i++) {
331 promises.push(pool.execute())
332 }
333 await Promise.all(promises)
334 for (const workerNode of pool.workerNodes) {
335 expect(workerNode.tasksUsage).toBeDefined()
336 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
337 expect(workerNode.tasksUsage.running).toBe(0)
338 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
339 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
340 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
341 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
342 expect(workerNode.tasksUsage.medRunTime).toBe(0)
343 expect(workerNode.tasksUsage.error).toBe(0)
344 }
345 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
346 for (const workerNode of pool.workerNodes) {
347 expect(workerNode.tasksUsage).toBeDefined()
348 expect(workerNode.tasksUsage.run).toBe(0)
349 expect(workerNode.tasksUsage.running).toBe(0)
350 expect(workerNode.tasksUsage.runTime).toBe(0)
351 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
352 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
353 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
354 expect(workerNode.tasksUsage.medRunTime).toBe(0)
355 expect(workerNode.tasksUsage.error).toBe(0)
356 }
357 await pool.destroy()
358 })
359
360 it("Verify that pool event emitter 'full' event can register a callback", async () => {
361 const pool = new DynamicThreadPool(
362 numberOfWorkers,
363 numberOfWorkers,
364 './tests/worker-files/thread/testWorker.js'
365 )
366 const promises = []
367 let poolFull = 0
368 pool.emitter.on(PoolEvents.full, () => ++poolFull)
369 for (let i = 0; i < numberOfWorkers * 2; i++) {
370 promises.push(pool.execute())
371 }
372 await Promise.all(promises)
373 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
374 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
375 expect(poolFull).toBe(numberOfWorkers + 1)
376 await pool.destroy()
377 })
378
379 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
380 const pool = new FixedThreadPool(
381 numberOfWorkers,
382 './tests/worker-files/thread/testWorker.js'
383 )
384 const promises = []
385 let poolBusy = 0
386 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
387 for (let i = 0; i < numberOfWorkers * 2; i++) {
388 promises.push(pool.execute())
389 }
390 await Promise.all(promises)
391 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
392 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
393 expect(poolBusy).toBe(numberOfWorkers + 1)
394 await pool.destroy()
395 })
396
397 it('Verify that multiple tasks worker is working', async () => {
398 const pool = new DynamicClusterPool(
399 numberOfWorkers,
400 numberOfWorkers * 2,
401 './tests/worker-files/cluster/testMultiTasksWorker.js'
402 )
403 const data = { n: 10 }
404 const result0 = await pool.execute(data)
405 expect(result0).toBe(false)
406 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
407 expect(result1).toBe(false)
408 const result2 = await pool.execute(data, 'factorial')
409 expect(result2).toBe(3628800)
410 const result3 = await pool.execute(data, 'fibonacci')
411 expect(result3).toBe(89)
412 })
413 })