feat: allow to disable tasks timeout check in worker
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
e843b904 8 WorkerChoiceStrategies
cdace0e5 9} = require('../../../lib')
78099a15 10const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 11const { Queue } = require('../../../lib/queue')
e1ffb94f
JB
12
13describe('Abstract pool test suite', () => {
fc027381 14 const numberOfWorkers = 2
a8884ffd 15 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 16 removeAllWorker () {
d4aeae5a 17 this.workerNodes = []
c923ce56 18 this.promiseResponseMap.clear()
e1ffb94f 19 }
3ec964d6 20 }
a8884ffd 21 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
22 isMain () {
23 return false
24 }
3ec964d6 25 }
3ec964d6 26
3ec964d6 27 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
28 expect(
29 () =>
a8884ffd 30 new StubPoolWithIsMain(
7c0ba920 31 numberOfWorkers,
8d3782fa
JB
32 './tests/worker-files/thread/testWorker.js',
33 {
34 errorHandler: e => console.error(e)
35 }
36 )
d4aeae5a 37 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 38 })
c510fea7
APA
39
40 it('Verify that filePath is checked', () => {
292ad316
JB
41 const expectedError = new Error(
42 'Please specify a file with a worker implementation'
43 )
7c0ba920 44 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 45 expectedError
8d3782fa 46 )
7c0ba920 47 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 48 expectedError
8d3782fa
JB
49 )
50 })
51
52 it('Verify that numberOfWorkers is checked', () => {
53 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 54 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
55 )
56 })
57
58 it('Verify that a negative number of workers is checked', () => {
59 expect(
60 () =>
61 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
62 ).toThrowError(
473c717a
JB
63 new RangeError(
64 'Cannot instantiate a pool with a negative number of workers'
65 )
8d3782fa
JB
66 )
67 })
68
69 it('Verify that a non integer number of workers is checked', () => {
70 expect(
71 () =>
72 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
73 ).toThrowError(
473c717a 74 new TypeError(
0d80593b 75 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
76 )
77 )
c510fea7 78 })
7c0ba920 79
fd7ebd49 80 it('Verify that pool options are checked', async () => {
7c0ba920
JB
81 let pool = new FixedThreadPool(
82 numberOfWorkers,
83 './tests/worker-files/thread/testWorker.js'
84 )
8620fb25 85 expect(pool.opts.enableEvents).toBe(true)
7c0ba920 86 expect(pool.emitter).toBeDefined()
ff733df7 87 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 88 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
89 expect(pool.opts.workerChoiceStrategy).toBe(
90 WorkerChoiceStrategies.ROUND_ROBIN
91 )
da309861 92 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
86bf340d
JB
93 medRunTime: false,
94 medWaitTime: false
da309861 95 })
35cf1c03
JB
96 expect(pool.opts.messageHandler).toBeUndefined()
97 expect(pool.opts.errorHandler).toBeUndefined()
98 expect(pool.opts.onlineHandler).toBeUndefined()
99 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 100 await pool.destroy()
35cf1c03 101 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
102 pool = new FixedThreadPool(
103 numberOfWorkers,
104 './tests/worker-files/thread/testWorker.js',
105 {
e4543b14 106 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe
JB
107 workerChoiceStrategyOptions: {
108 medRunTime: true,
fc027381 109 weights: { 0: 300, 1: 200 }
49be33fe 110 },
35cf1c03 111 enableEvents: false,
ff733df7 112 enableTasksQueue: true,
d4aeae5a 113 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
114 messageHandler: testHandler,
115 errorHandler: testHandler,
116 onlineHandler: testHandler,
117 exitHandler: testHandler
7c0ba920
JB
118 }
119 )
8620fb25 120 expect(pool.opts.enableEvents).toBe(false)
7c0ba920 121 expect(pool.emitter).toBeUndefined()
ff733df7 122 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 123 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 124 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 125 WorkerChoiceStrategies.LEAST_USED
e843b904 126 )
da309861 127 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
49be33fe 128 medRunTime: true,
fc027381 129 weights: { 0: 300, 1: 200 }
da309861 130 })
35cf1c03
JB
131 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
132 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
133 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
134 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 135 await pool.destroy()
7c0ba920
JB
136 })
137
a20f0ba5 138 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
139 expect(
140 () =>
141 new FixedThreadPool(
142 numberOfWorkers,
143 './tests/worker-files/thread/testWorker.js',
144 {
145 enableTasksQueue: true,
146 tasksQueueOptions: { concurrency: 0 }
147 }
148 )
149 ).toThrowError("Invalid worker tasks concurrency '0'")
150 expect(
151 () =>
152 new FixedThreadPool(
153 numberOfWorkers,
154 './tests/worker-files/thread/testWorker.js',
155 {
156 workerChoiceStrategy: 'invalidStrategy'
157 }
158 )
159 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
49be33fe
JB
160 expect(
161 () =>
162 new FixedThreadPool(
163 numberOfWorkers,
164 './tests/worker-files/thread/testWorker.js',
165 {
166 workerChoiceStrategyOptions: { weights: {} }
167 }
168 )
169 ).toThrowError(
170 'Invalid worker choice strategy options: must have a weight for each worker node'
171 )
d4aeae5a
JB
172 })
173
a20f0ba5
JB
174 it('Verify that worker choice strategy options can be set', async () => {
175 const pool = new FixedThreadPool(
176 numberOfWorkers,
177 './tests/worker-files/thread/testWorker.js',
178 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
179 )
180 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
86bf340d
JB
181 medRunTime: false,
182 medWaitTime: false
a20f0ba5
JB
183 })
184 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
185 .workerChoiceStrategies) {
86bf340d
JB
186 expect(workerChoiceStrategy.opts).toStrictEqual({
187 medRunTime: false,
188 medWaitTime: false
189 })
a20f0ba5
JB
190 }
191 expect(
86bf340d
JB
192 pool.workerChoiceStrategyContext.getRequiredStatistics()
193 ).toStrictEqual({
194 runTime: true,
195 avgRunTime: true,
196 medRunTime: false,
197 waitTime: false,
198 avgWaitTime: false,
199 medWaitTime: false
200 })
a20f0ba5
JB
201 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
202 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
203 medRunTime: true
204 })
205 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
206 .workerChoiceStrategies) {
207 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
208 }
209 expect(
86bf340d
JB
210 pool.workerChoiceStrategyContext.getRequiredStatistics()
211 ).toStrictEqual({
212 runTime: true,
213 avgRunTime: false,
214 medRunTime: true,
215 waitTime: false,
216 avgWaitTime: false,
217 medWaitTime: false
218 })
a20f0ba5
JB
219 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
220 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
221 medRunTime: false
222 })
223 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
224 .workerChoiceStrategies) {
225 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
226 }
227 expect(
86bf340d
JB
228 pool.workerChoiceStrategyContext.getRequiredStatistics()
229 ).toStrictEqual({
230 runTime: true,
231 avgRunTime: true,
232 medRunTime: false,
233 waitTime: false,
234 avgWaitTime: false,
235 medWaitTime: false
236 })
a20f0ba5
JB
237 await pool.destroy()
238 })
239
240 it('Verify that tasks queue can be enabled/disabled', async () => {
241 const pool = new FixedThreadPool(
242 numberOfWorkers,
243 './tests/worker-files/thread/testWorker.js'
244 )
245 expect(pool.opts.enableTasksQueue).toBe(false)
246 expect(pool.opts.tasksQueueOptions).toBeUndefined()
247 pool.enableTasksQueue(true)
248 expect(pool.opts.enableTasksQueue).toBe(true)
249 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
250 pool.enableTasksQueue(true, { concurrency: 2 })
251 expect(pool.opts.enableTasksQueue).toBe(true)
252 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
253 pool.enableTasksQueue(false)
254 expect(pool.opts.enableTasksQueue).toBe(false)
255 expect(pool.opts.tasksQueueOptions).toBeUndefined()
256 await pool.destroy()
257 })
258
259 it('Verify that tasks queue options can be set', async () => {
260 const pool = new FixedThreadPool(
261 numberOfWorkers,
262 './tests/worker-files/thread/testWorker.js',
263 { enableTasksQueue: true }
264 )
265 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
266 pool.setTasksQueueOptions({ concurrency: 2 })
267 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
268 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
269 "Invalid worker tasks concurrency '0'"
270 )
271 await pool.destroy()
272 })
273
9d16d33e 274 it('Simulate worker not found', async () => {
a8884ffd 275 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4
JB
276 numberOfWorkers,
277 './tests/worker-files/cluster/testWorker.js',
278 {
10fcfaf4
JB
279 errorHandler: e => console.error(e)
280 }
281 )
d4aeae5a 282 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
283 // Simulate worker not found.
284 pool.removeAllWorker()
d4aeae5a 285 expect(pool.workerNodes.length).toBe(0)
fd7ebd49 286 await pool.destroy()
bf9549ae
JB
287 })
288
fd7ebd49 289 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
290 const pool = new FixedClusterPool(
291 numberOfWorkers,
292 './tests/worker-files/cluster/testWorker.js'
293 )
f06e48d8 294 for (const workerNode of pool.workerNodes) {
86bf340d
JB
295 expect(workerNode.tasksUsage).toStrictEqual({
296 run: 0,
297 running: 0,
298 runTime: 0,
299 runTimeHistory: expect.any(CircularArray),
300 avgRunTime: 0,
301 medRunTime: 0,
302 waitTime: 0,
303 waitTimeHistory: expect.any(CircularArray),
304 avgWaitTime: 0,
305 medWaitTime: 0,
306 error: 0
307 })
f06e48d8
JB
308 }
309 await pool.destroy()
310 })
311
312 it('Verify that worker pool tasks queue are initialized', async () => {
313 const pool = new FixedClusterPool(
314 numberOfWorkers,
315 './tests/worker-files/cluster/testWorker.js'
316 )
317 for (const workerNode of pool.workerNodes) {
318 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 319 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 320 expect(workerNode.tasksQueue.size).toBe(0)
bf9549ae 321 }
fd7ebd49 322 await pool.destroy()
bf9549ae
JB
323 })
324
325 it('Verify that worker pool tasks usage are computed', async () => {
326 const pool = new FixedClusterPool(
327 numberOfWorkers,
328 './tests/worker-files/cluster/testWorker.js'
329 )
09c2d0d3 330 const promises = new Set()
fc027381
JB
331 const maxMultiplier = 2
332 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 333 promises.add(pool.execute())
bf9549ae 334 }
f06e48d8 335 for (const workerNode of pool.workerNodes) {
86bf340d
JB
336 expect(workerNode.tasksUsage).toStrictEqual({
337 run: 0,
fc027381 338 running: maxMultiplier,
86bf340d
JB
339 runTime: 0,
340 runTimeHistory: expect.any(CircularArray),
341 avgRunTime: 0,
342 medRunTime: 0,
343 waitTime: 0,
344 waitTimeHistory: expect.any(CircularArray),
345 avgWaitTime: 0,
346 medWaitTime: 0,
347 error: 0
348 })
bf9549ae
JB
349 }
350 await Promise.all(promises)
f06e48d8 351 for (const workerNode of pool.workerNodes) {
86bf340d 352 expect(workerNode.tasksUsage).toStrictEqual({
fc027381 353 run: maxMultiplier,
86bf340d
JB
354 running: 0,
355 runTime: 0,
356 runTimeHistory: expect.any(CircularArray),
357 avgRunTime: 0,
358 medRunTime: 0,
359 waitTime: 0,
360 waitTimeHistory: expect.any(CircularArray),
361 avgWaitTime: 0,
362 medWaitTime: 0,
363 error: 0
364 })
bf9549ae 365 }
fd7ebd49 366 await pool.destroy()
bf9549ae
JB
367 })
368
ee11a4a2 369 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 370 const pool = new DynamicThreadPool(
9e619829 371 numberOfWorkers,
8f4878b7 372 numberOfWorkers,
9e619829
JB
373 './tests/worker-files/thread/testWorker.js'
374 )
09c2d0d3 375 const promises = new Set()
ee9f5295
JB
376 const maxMultiplier = 2
377 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 378 promises.add(pool.execute())
9e619829
JB
379 }
380 await Promise.all(promises)
f06e48d8 381 for (const workerNode of pool.workerNodes) {
86bf340d 382 expect(workerNode.tasksUsage).toStrictEqual({
fc027381 383 run: expect.any(Number),
86bf340d 384 running: 0,
ee9f5295 385 runTime: 0,
86bf340d
JB
386 runTimeHistory: expect.any(CircularArray),
387 avgRunTime: 0,
388 medRunTime: 0,
389 waitTime: 0,
390 waitTimeHistory: expect.any(CircularArray),
391 avgWaitTime: 0,
392 medWaitTime: 0,
393 error: 0
394 })
fc027381
JB
395 expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
396 expect(workerNode.tasksUsage.run).toBeLessThanOrEqual(maxMultiplier)
9e619829
JB
397 }
398 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 399 for (const workerNode of pool.workerNodes) {
86bf340d
JB
400 expect(workerNode.tasksUsage).toStrictEqual({
401 run: 0,
402 running: 0,
403 runTime: 0,
404 runTimeHistory: expect.any(CircularArray),
405 avgRunTime: 0,
406 medRunTime: 0,
407 waitTime: 0,
408 waitTimeHistory: expect.any(CircularArray),
409 avgWaitTime: 0,
410 medWaitTime: 0,
411 error: 0
412 })
f06e48d8 413 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
ee9f5295 414 expect(workerNode.tasksUsage.waitTimeHistory.length).toBe(0)
ee11a4a2 415 }
fd7ebd49 416 await pool.destroy()
ee11a4a2
JB
417 })
418
164d950a
JB
419 it("Verify that pool event emitter 'full' event can register a callback", async () => {
420 const pool = new DynamicThreadPool(
421 numberOfWorkers,
422 numberOfWorkers,
423 './tests/worker-files/thread/testWorker.js'
424 )
09c2d0d3 425 const promises = new Set()
164d950a 426 let poolFull = 0
aee46736 427 pool.emitter.on(PoolEvents.full, () => ++poolFull)
164d950a 428 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 429 promises.add(pool.execute())
164d950a
JB
430 }
431 await Promise.all(promises)
594bfb84 432 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
fc027381
JB
433 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
434 expect(poolFull).toBe(numberOfWorkers * 2)
164d950a
JB
435 await pool.destroy()
436 })
437
cf597bc5 438 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
439 const pool = new FixedThreadPool(
440 numberOfWorkers,
441 './tests/worker-files/thread/testWorker.js'
442 )
09c2d0d3 443 const promises = new Set()
7c0ba920 444 let poolBusy = 0
aee46736 445 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 446 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 447 promises.add(pool.execute())
7c0ba920 448 }
cf597bc5 449 await Promise.all(promises)
14916bf9
JB
450 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
451 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
452 expect(poolBusy).toBe(numberOfWorkers + 1)
fd7ebd49 453 await pool.destroy()
7c0ba920 454 })
70a4f5ea
JB
455
456 it('Verify that multiple tasks worker is working', async () => {
457 const pool = new DynamicClusterPool(
458 numberOfWorkers,
459 numberOfWorkers * 2,
460 './tests/worker-files/cluster/testMultiTasksWorker.js'
461 )
462 const data = { n: 10 }
82888165
JB
463 const result0 = await pool.execute(data)
464 expect(result0).toBe(false)
70a4f5ea
JB
465 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
466 expect(result1).toBe(false)
467 const result2 = await pool.execute(data, 'factorial')
468 expect(result2).toBe(3628800)
469 const result3 = await pool.execute(data, 'fibonacci')
470 expect(result3).toBe(89)
471 })
3ec964d6 472})