test: improve INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy test
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
b1aae695 2const sinon = require('sinon')
e843b904 3const {
70a4f5ea 4 DynamicClusterPool,
9e619829 5 DynamicThreadPool,
aee46736 6 FixedClusterPool,
e843b904 7 FixedThreadPool,
aee46736 8 PoolEvents,
184855e6 9 PoolTypes,
3d6dd312 10 WorkerChoiceStrategies,
184855e6 11 WorkerTypes
cdace0e5 12} = require('../../../lib')
78099a15 13const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 14const { Queue } = require('../../../lib/queue')
23ccf9d7 15const { version } = require('../../../package.json')
2431bdb4 16const { waitPoolEvents } = require('../../test-utils')
e1ffb94f
JB
17
18describe('Abstract pool test suite', () => {
fc027381 19 const numberOfWorkers = 2
a8884ffd 20 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
21 isMain () {
22 return false
23 }
3ec964d6 24 }
3ec964d6 25
3ec964d6 26 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
27 expect(
28 () =>
a8884ffd 29 new StubPoolWithIsMain(
7c0ba920 30 numberOfWorkers,
8d3782fa
JB
31 './tests/worker-files/thread/testWorker.js',
32 {
8ebe6c30 33 errorHandler: (e) => console.error(e)
8d3782fa
JB
34 }
35 )
04f45163 36 ).toThrowError(
8c6d4acf 37 'Cannot start a pool from a worker with the same type as the pool'
04f45163 38 )
3ec964d6 39 })
c510fea7
APA
40
41 it('Verify that filePath is checked', () => {
292ad316
JB
42 const expectedError = new Error(
43 'Please specify a file with a worker implementation'
44 )
7c0ba920 45 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 46 expectedError
8d3782fa 47 )
7c0ba920 48 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 49 expectedError
8d3782fa 50 )
3d6dd312
JB
51 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
52 expectedError
53 )
54 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
55 expectedError
56 )
57 expect(
58 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
59 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
60 })
61
62 it('Verify that numberOfWorkers is checked', () => {
63 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 64 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
65 )
66 })
67
68 it('Verify that a negative number of workers is checked', () => {
69 expect(
70 () =>
71 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
72 ).toThrowError(
473c717a
JB
73 new RangeError(
74 'Cannot instantiate a pool with a negative number of workers'
75 )
8d3782fa
JB
76 )
77 })
78
79 it('Verify that a non integer number of workers is checked', () => {
80 expect(
81 () =>
82 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
83 ).toThrowError(
473c717a 84 new TypeError(
0d80593b 85 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
86 )
87 )
c510fea7 88 })
7c0ba920 89
216541b6 90 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
91 expect(
92 () =>
93 new DynamicClusterPool(
94 1,
95 undefined,
96 './tests/worker-files/cluster/testWorker.js'
97 )
98 ).toThrowError(
99 new TypeError(
100 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
101 )
102 )
2761efb4
JB
103 expect(
104 () =>
105 new DynamicThreadPool(
106 0.5,
107 1,
108 './tests/worker-files/thread/testWorker.js'
109 )
110 ).toThrowError(
111 new TypeError(
112 'Cannot instantiate a pool with a non safe integer number of workers'
113 )
114 )
115 expect(
116 () =>
117 new DynamicClusterPool(
118 0,
119 0.5,
a5ed75b7 120 './tests/worker-files/cluster/testWorker.js'
2761efb4
JB
121 )
122 ).toThrowError(
123 new TypeError(
124 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
125 )
126 )
2431bdb4
JB
127 expect(
128 () =>
129 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
130 ).toThrowError(
131 new RangeError(
132 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
133 )
134 )
135 expect(
136 () =>
2761efb4
JB
137 new DynamicClusterPool(
138 1,
139 1,
a5ed75b7 140 './tests/worker-files/cluster/testWorker.js'
2761efb4 141 )
2431bdb4
JB
142 ).toThrowError(
143 new RangeError(
144 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
145 )
146 )
21f710aa
JB
147 expect(
148 () =>
149 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
150 ).toThrowError(
151 new RangeError(
d640b48b 152 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
21f710aa
JB
153 )
154 )
2431bdb4
JB
155 })
156
fd7ebd49 157 it('Verify that pool options are checked', async () => {
7c0ba920
JB
158 let pool = new FixedThreadPool(
159 numberOfWorkers,
160 './tests/worker-files/thread/testWorker.js'
161 )
7c0ba920 162 expect(pool.emitter).toBeDefined()
1f68cede
JB
163 expect(pool.opts.enableEvents).toBe(true)
164 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 165 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 166 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
167 expect(pool.opts.workerChoiceStrategy).toBe(
168 WorkerChoiceStrategies.ROUND_ROBIN
169 )
da309861 170 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d
JB
171 choiceRetries: 6,
172 runTime: { median: false },
173 waitTime: { median: false },
174 elu: { median: false }
175 })
176 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
177 choiceRetries: 6,
932fc8be 178 runTime: { median: false },
5df69fab
JB
179 waitTime: { median: false },
180 elu: { median: false }
da309861 181 })
35cf1c03
JB
182 expect(pool.opts.messageHandler).toBeUndefined()
183 expect(pool.opts.errorHandler).toBeUndefined()
184 expect(pool.opts.onlineHandler).toBeUndefined()
185 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 186 await pool.destroy()
73bfd59d 187 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
188 pool = new FixedThreadPool(
189 numberOfWorkers,
190 './tests/worker-files/thread/testWorker.js',
191 {
e4543b14 192 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 193 workerChoiceStrategyOptions: {
932fc8be 194 runTime: { median: true },
fc027381 195 weights: { 0: 300, 1: 200 }
49be33fe 196 },
35cf1c03 197 enableEvents: false,
1f68cede 198 restartWorkerOnError: false,
ff733df7 199 enableTasksQueue: true,
d4aeae5a 200 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
201 messageHandler: testHandler,
202 errorHandler: testHandler,
203 onlineHandler: testHandler,
204 exitHandler: testHandler
7c0ba920
JB
205 }
206 )
7c0ba920 207 expect(pool.emitter).toBeUndefined()
1f68cede
JB
208 expect(pool.opts.enableEvents).toBe(false)
209 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 210 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 211 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 212 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 213 WorkerChoiceStrategies.LEAST_USED
e843b904 214 )
da309861 215 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 216 choiceRetries: 6,
932fc8be 217 runTime: { median: true },
8990357d
JB
218 waitTime: { median: false },
219 elu: { median: false },
220 weights: { 0: 300, 1: 200 }
221 })
222 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
223 choiceRetries: 6,
224 runTime: { median: true },
225 waitTime: { median: false },
226 elu: { median: false },
fc027381 227 weights: { 0: 300, 1: 200 }
da309861 228 })
35cf1c03
JB
229 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
230 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
231 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
232 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 233 await pool.destroy()
7c0ba920
JB
234 })
235
a20f0ba5 236 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
237 expect(
238 () =>
239 new FixedThreadPool(
240 numberOfWorkers,
241 './tests/worker-files/thread/testWorker.js',
242 {
f0d7f803 243 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
244 }
245 )
8735b4e5
JB
246 ).toThrowError(
247 new Error("Invalid worker choice strategy 'invalidStrategy'")
248 )
49be33fe
JB
249 expect(
250 () =>
251 new FixedThreadPool(
252 numberOfWorkers,
253 './tests/worker-files/thread/testWorker.js',
254 {
255 workerChoiceStrategyOptions: { weights: {} }
256 }
257 )
258 ).toThrowError(
8735b4e5
JB
259 new Error(
260 'Invalid worker choice strategy options: must have a weight for each worker node'
261 )
49be33fe 262 )
f0d7f803
JB
263 expect(
264 () =>
265 new FixedThreadPool(
266 numberOfWorkers,
267 './tests/worker-files/thread/testWorker.js',
268 {
269 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
270 }
271 )
272 ).toThrowError(
8735b4e5
JB
273 new Error(
274 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
275 )
f0d7f803
JB
276 )
277 expect(
278 () =>
279 new FixedThreadPool(
280 numberOfWorkers,
281 './tests/worker-files/thread/testWorker.js',
282 {
283 enableTasksQueue: true,
284 tasksQueueOptions: { concurrency: 0 }
285 }
286 )
8735b4e5
JB
287 ).toThrowError(
288 new TypeError(
289 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
290 )
291 )
f0d7f803
JB
292 expect(
293 () =>
294 new FixedThreadPool(
295 numberOfWorkers,
296 './tests/worker-files/thread/testWorker.js',
297 {
298 enableTasksQueue: true,
299 tasksQueueOptions: 'invalidTasksQueueOptions'
300 }
301 )
8735b4e5
JB
302 ).toThrowError(
303 new TypeError('Invalid tasks queue options: must be a plain object')
304 )
f0d7f803
JB
305 expect(
306 () =>
307 new FixedThreadPool(
308 numberOfWorkers,
309 './tests/worker-files/thread/testWorker.js',
310 {
311 enableTasksQueue: true,
312 tasksQueueOptions: { concurrency: 0.2 }
313 }
314 )
8735b4e5
JB
315 ).toThrowError(
316 new TypeError('Invalid worker tasks concurrency: must be an integer')
317 )
d4aeae5a
JB
318 })
319
2431bdb4 320 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
321 const pool = new FixedThreadPool(
322 numberOfWorkers,
323 './tests/worker-files/thread/testWorker.js',
324 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
325 )
326 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d
JB
327 choiceRetries: 6,
328 runTime: { median: false },
329 waitTime: { median: false },
330 elu: { median: false }
331 })
332 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
333 choiceRetries: 6,
932fc8be 334 runTime: { median: false },
5df69fab
JB
335 waitTime: { median: false },
336 elu: { median: false }
a20f0ba5
JB
337 })
338 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
339 .workerChoiceStrategies) {
86bf340d 340 expect(workerChoiceStrategy.opts).toStrictEqual({
8990357d 341 choiceRetries: 6,
932fc8be 342 runTime: { median: false },
5df69fab
JB
343 waitTime: { median: false },
344 elu: { median: false }
86bf340d 345 })
a20f0ba5 346 }
87de9ff5
JB
347 expect(
348 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 ).toStrictEqual({
932fc8be
JB
350 runTime: {
351 aggregate: true,
352 average: true,
353 median: false
354 },
355 waitTime: {
356 aggregate: false,
357 average: false,
358 median: false
359 },
5df69fab 360 elu: {
9adcefab
JB
361 aggregate: true,
362 average: true,
5df69fab
JB
363 median: false
364 }
86bf340d 365 })
9adcefab
JB
366 pool.setWorkerChoiceStrategyOptions({
367 runTime: { median: true },
368 elu: { median: true }
369 })
a20f0ba5 370 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 371 choiceRetries: 6,
9adcefab 372 runTime: { median: true },
8990357d
JB
373 waitTime: { median: false },
374 elu: { median: true }
375 })
376 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
377 choiceRetries: 6,
378 runTime: { median: true },
379 waitTime: { median: false },
9adcefab 380 elu: { median: true }
a20f0ba5
JB
381 })
382 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
383 .workerChoiceStrategies) {
932fc8be 384 expect(workerChoiceStrategy.opts).toStrictEqual({
8990357d 385 choiceRetries: 6,
9adcefab 386 runTime: { median: true },
8990357d 387 waitTime: { median: false },
9adcefab 388 elu: { median: true }
932fc8be 389 })
a20f0ba5 390 }
87de9ff5
JB
391 expect(
392 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
393 ).toStrictEqual({
932fc8be
JB
394 runTime: {
395 aggregate: true,
396 average: false,
397 median: true
398 },
399 waitTime: {
400 aggregate: false,
401 average: false,
402 median: false
403 },
5df69fab 404 elu: {
9adcefab 405 aggregate: true,
5df69fab 406 average: false,
9adcefab 407 median: true
5df69fab 408 }
86bf340d 409 })
9adcefab
JB
410 pool.setWorkerChoiceStrategyOptions({
411 runTime: { median: false },
412 elu: { median: false }
413 })
a20f0ba5 414 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d
JB
415 choiceRetries: 6,
416 runTime: { median: false },
417 waitTime: { median: false },
418 elu: { median: false }
419 })
420 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
421 choiceRetries: 6,
9adcefab 422 runTime: { median: false },
8990357d 423 waitTime: { median: false },
9adcefab 424 elu: { median: false }
a20f0ba5
JB
425 })
426 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
427 .workerChoiceStrategies) {
932fc8be 428 expect(workerChoiceStrategy.opts).toStrictEqual({
8990357d 429 choiceRetries: 6,
9adcefab 430 runTime: { median: false },
8990357d 431 waitTime: { median: false },
9adcefab 432 elu: { median: false }
932fc8be 433 })
a20f0ba5 434 }
87de9ff5
JB
435 expect(
436 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
437 ).toStrictEqual({
932fc8be
JB
438 runTime: {
439 aggregate: true,
440 average: true,
441 median: false
442 },
443 waitTime: {
444 aggregate: false,
445 average: false,
446 median: false
447 },
5df69fab 448 elu: {
9adcefab
JB
449 aggregate: true,
450 average: true,
5df69fab
JB
451 median: false
452 }
86bf340d 453 })
1f95d544
JB
454 expect(() =>
455 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
456 ).toThrowError(
8735b4e5
JB
457 new TypeError(
458 'Invalid worker choice strategy options: must be a plain object'
459 )
1f95d544
JB
460 )
461 expect(() =>
462 pool.setWorkerChoiceStrategyOptions({ weights: {} })
463 ).toThrowError(
8735b4e5
JB
464 new Error(
465 'Invalid worker choice strategy options: must have a weight for each worker node'
466 )
1f95d544
JB
467 )
468 expect(() =>
469 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
470 ).toThrowError(
8735b4e5
JB
471 new Error(
472 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
473 )
1f95d544 474 )
a20f0ba5
JB
475 await pool.destroy()
476 })
477
2431bdb4 478 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
479 const pool = new FixedThreadPool(
480 numberOfWorkers,
481 './tests/worker-files/thread/testWorker.js'
482 )
483 expect(pool.opts.enableTasksQueue).toBe(false)
484 expect(pool.opts.tasksQueueOptions).toBeUndefined()
485 pool.enableTasksQueue(true)
486 expect(pool.opts.enableTasksQueue).toBe(true)
487 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
488 pool.enableTasksQueue(true, { concurrency: 2 })
489 expect(pool.opts.enableTasksQueue).toBe(true)
490 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
491 pool.enableTasksQueue(false)
492 expect(pool.opts.enableTasksQueue).toBe(false)
493 expect(pool.opts.tasksQueueOptions).toBeUndefined()
494 await pool.destroy()
495 })
496
2431bdb4 497 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
498 const pool = new FixedThreadPool(
499 numberOfWorkers,
500 './tests/worker-files/thread/testWorker.js',
501 { enableTasksQueue: true }
502 )
503 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
504 pool.setTasksQueueOptions({ concurrency: 2 })
505 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
f0d7f803
JB
506 expect(() =>
507 pool.setTasksQueueOptions('invalidTasksQueueOptions')
8735b4e5
JB
508 ).toThrowError(
509 new TypeError('Invalid tasks queue options: must be a plain object')
510 )
a20f0ba5 511 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
8735b4e5
JB
512 new Error(
513 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
514 )
515 )
516 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
517 new Error(
518 'Invalid worker tasks concurrency: -1 is a negative integer or zero'
519 )
a20f0ba5 520 )
f0d7f803 521 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
8735b4e5 522 new TypeError('Invalid worker tasks concurrency: must be an integer')
f0d7f803 523 )
a20f0ba5
JB
524 await pool.destroy()
525 })
526
6b27d407
JB
527 it('Verify that pool info is set', async () => {
528 let pool = new FixedThreadPool(
529 numberOfWorkers,
530 './tests/worker-files/thread/testWorker.js'
531 )
2dca6cab
JB
532 expect(pool.info).toStrictEqual({
533 version,
534 type: PoolTypes.fixed,
535 worker: WorkerTypes.thread,
536 ready: true,
537 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
538 minSize: numberOfWorkers,
539 maxSize: numberOfWorkers,
540 workerNodes: numberOfWorkers,
541 idleWorkerNodes: numberOfWorkers,
542 busyWorkerNodes: 0,
543 executedTasks: 0,
544 executingTasks: 0,
2dca6cab
JB
545 failedTasks: 0
546 })
6b27d407
JB
547 await pool.destroy()
548 pool = new DynamicClusterPool(
2431bdb4 549 Math.floor(numberOfWorkers / 2),
6b27d407 550 numberOfWorkers,
ecdfbdc0 551 './tests/worker-files/cluster/testWorker.js'
6b27d407 552 )
2dca6cab
JB
553 expect(pool.info).toStrictEqual({
554 version,
555 type: PoolTypes.dynamic,
556 worker: WorkerTypes.cluster,
557 ready: true,
558 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
559 minSize: Math.floor(numberOfWorkers / 2),
560 maxSize: numberOfWorkers,
561 workerNodes: Math.floor(numberOfWorkers / 2),
562 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
563 busyWorkerNodes: 0,
564 executedTasks: 0,
565 executingTasks: 0,
2dca6cab
JB
566 failedTasks: 0
567 })
6b27d407
JB
568 await pool.destroy()
569 })
570
2431bdb4 571 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
572 const pool = new FixedClusterPool(
573 numberOfWorkers,
574 './tests/worker-files/cluster/testWorker.js'
575 )
f06e48d8 576 for (const workerNode of pool.workerNodes) {
465b2940 577 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
578 tasks: {
579 executed: 0,
580 executing: 0,
581 queued: 0,
df593701 582 maxQueued: 0,
a4e07f72
JB
583 failed: 0
584 },
585 runTime: {
a4e07f72
JB
586 history: expect.any(CircularArray)
587 },
588 waitTime: {
a4e07f72
JB
589 history: expect.any(CircularArray)
590 },
5df69fab
JB
591 elu: {
592 idle: {
5df69fab
JB
593 history: expect.any(CircularArray)
594 },
595 active: {
5df69fab 596 history: expect.any(CircularArray)
f7510105 597 }
5df69fab 598 }
86bf340d 599 })
f06e48d8
JB
600 }
601 await pool.destroy()
602 })
603
2431bdb4
JB
604 it('Verify that pool worker tasks queue are initialized', async () => {
605 let pool = new FixedClusterPool(
f06e48d8
JB
606 numberOfWorkers,
607 './tests/worker-files/cluster/testWorker.js'
608 )
609 for (const workerNode of pool.workerNodes) {
610 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 611 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 612 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 613 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 614 }
fd7ebd49 615 await pool.destroy()
2431bdb4
JB
616 pool = new DynamicThreadPool(
617 Math.floor(numberOfWorkers / 2),
618 numberOfWorkers,
619 './tests/worker-files/thread/testWorker.js'
620 )
621 for (const workerNode of pool.workerNodes) {
622 expect(workerNode.tasksQueue).toBeDefined()
623 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
624 expect(workerNode.tasksQueue.size).toBe(0)
625 expect(workerNode.tasksQueue.maxSize).toBe(0)
626 }
627 })
628
629 it('Verify that pool worker info are initialized', async () => {
630 let pool = new FixedClusterPool(
631 numberOfWorkers,
632 './tests/worker-files/cluster/testWorker.js'
633 )
2dca6cab
JB
634 for (const workerNode of pool.workerNodes) {
635 expect(workerNode.info).toStrictEqual({
636 id: expect.any(Number),
637 type: WorkerTypes.cluster,
638 dynamic: false,
639 ready: true
640 })
641 }
2431bdb4
JB
642 await pool.destroy()
643 pool = new DynamicThreadPool(
644 Math.floor(numberOfWorkers / 2),
645 numberOfWorkers,
646 './tests/worker-files/thread/testWorker.js'
647 )
2dca6cab
JB
648 for (const workerNode of pool.workerNodes) {
649 expect(workerNode.info).toStrictEqual({
650 id: expect.any(Number),
651 type: WorkerTypes.thread,
652 dynamic: false,
7884d183 653 ready: true
2dca6cab
JB
654 })
655 }
bf9549ae
JB
656 })
657
2431bdb4 658 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
659 const pool = new FixedClusterPool(
660 numberOfWorkers,
661 './tests/worker-files/cluster/testWorker.js'
662 )
09c2d0d3 663 const promises = new Set()
fc027381
JB
664 const maxMultiplier = 2
665 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 666 promises.add(pool.execute())
bf9549ae 667 }
f06e48d8 668 for (const workerNode of pool.workerNodes) {
465b2940 669 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
670 tasks: {
671 executed: 0,
672 executing: maxMultiplier,
673 queued: 0,
df593701 674 maxQueued: 0,
a4e07f72
JB
675 failed: 0
676 },
677 runTime: {
a4e07f72
JB
678 history: expect.any(CircularArray)
679 },
680 waitTime: {
a4e07f72
JB
681 history: expect.any(CircularArray)
682 },
5df69fab
JB
683 elu: {
684 idle: {
5df69fab
JB
685 history: expect.any(CircularArray)
686 },
687 active: {
5df69fab 688 history: expect.any(CircularArray)
f7510105 689 }
5df69fab 690 }
86bf340d 691 })
bf9549ae
JB
692 }
693 await Promise.all(promises)
f06e48d8 694 for (const workerNode of pool.workerNodes) {
465b2940 695 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
696 tasks: {
697 executed: maxMultiplier,
698 executing: 0,
699 queued: 0,
df593701 700 maxQueued: 0,
a4e07f72
JB
701 failed: 0
702 },
703 runTime: {
a4e07f72
JB
704 history: expect.any(CircularArray)
705 },
706 waitTime: {
a4e07f72
JB
707 history: expect.any(CircularArray)
708 },
5df69fab
JB
709 elu: {
710 idle: {
5df69fab
JB
711 history: expect.any(CircularArray)
712 },
713 active: {
5df69fab 714 history: expect.any(CircularArray)
f7510105 715 }
5df69fab 716 }
86bf340d 717 })
bf9549ae 718 }
fd7ebd49 719 await pool.destroy()
bf9549ae
JB
720 })
721
2431bdb4 722 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 723 const pool = new DynamicThreadPool(
2431bdb4 724 Math.floor(numberOfWorkers / 2),
8f4878b7 725 numberOfWorkers,
9e619829
JB
726 './tests/worker-files/thread/testWorker.js'
727 )
09c2d0d3 728 const promises = new Set()
ee9f5295
JB
729 const maxMultiplier = 2
730 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 731 promises.add(pool.execute())
9e619829
JB
732 }
733 await Promise.all(promises)
f06e48d8 734 for (const workerNode of pool.workerNodes) {
465b2940 735 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
736 tasks: {
737 executed: expect.any(Number),
738 executing: 0,
739 queued: 0,
df593701 740 maxQueued: 0,
a4e07f72
JB
741 failed: 0
742 },
743 runTime: {
a4e07f72
JB
744 history: expect.any(CircularArray)
745 },
746 waitTime: {
a4e07f72
JB
747 history: expect.any(CircularArray)
748 },
5df69fab
JB
749 elu: {
750 idle: {
5df69fab
JB
751 history: expect.any(CircularArray)
752 },
753 active: {
5df69fab 754 history: expect.any(CircularArray)
f7510105 755 }
5df69fab 756 }
86bf340d 757 })
465b2940 758 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
b56c2ee5 759 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
b97d82d8
JB
760 expect(workerNode.usage.runTime.history.length).toBe(0)
761 expect(workerNode.usage.waitTime.history.length).toBe(0)
762 expect(workerNode.usage.elu.idle.history.length).toBe(0)
763 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
764 }
765 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 766 for (const workerNode of pool.workerNodes) {
465b2940 767 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
768 tasks: {
769 executed: 0,
770 executing: 0,
771 queued: 0,
df593701 772 maxQueued: 0,
a4e07f72
JB
773 failed: 0
774 },
775 runTime: {
a4e07f72
JB
776 history: expect.any(CircularArray)
777 },
778 waitTime: {
a4e07f72
JB
779 history: expect.any(CircularArray)
780 },
5df69fab
JB
781 elu: {
782 idle: {
5df69fab
JB
783 history: expect.any(CircularArray)
784 },
785 active: {
5df69fab 786 history: expect.any(CircularArray)
f7510105 787 }
5df69fab 788 }
86bf340d 789 })
465b2940
JB
790 expect(workerNode.usage.runTime.history.length).toBe(0)
791 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
792 expect(workerNode.usage.elu.idle.history.length).toBe(0)
793 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 794 }
fd7ebd49 795 await pool.destroy()
ee11a4a2
JB
796 })
797
a1763c54
JB
798 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
799 const pool = new DynamicClusterPool(
2431bdb4 800 Math.floor(numberOfWorkers / 2),
164d950a 801 numberOfWorkers,
a1763c54 802 './tests/worker-files/cluster/testWorker.js'
164d950a 803 )
d46660cd 804 let poolInfo
a1763c54
JB
805 let poolReady = 0
806 pool.emitter.on(PoolEvents.ready, (info) => {
807 ++poolReady
d46660cd
JB
808 poolInfo = info
809 })
a1763c54
JB
810 await waitPoolEvents(pool, PoolEvents.ready, 1)
811 expect(poolReady).toBe(1)
d46660cd 812 expect(poolInfo).toStrictEqual({
23ccf9d7 813 version,
d46660cd 814 type: PoolTypes.dynamic,
a1763c54
JB
815 worker: WorkerTypes.cluster,
816 ready: true,
2431bdb4
JB
817 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
818 minSize: expect.any(Number),
819 maxSize: expect.any(Number),
820 workerNodes: expect.any(Number),
821 idleWorkerNodes: expect.any(Number),
822 busyWorkerNodes: expect.any(Number),
823 executedTasks: expect.any(Number),
824 executingTasks: expect.any(Number),
2431bdb4
JB
825 failedTasks: expect.any(Number)
826 })
827 await pool.destroy()
828 })
829
a1763c54
JB
830 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
831 const pool = new FixedThreadPool(
2431bdb4 832 numberOfWorkers,
a1763c54 833 './tests/worker-files/thread/testWorker.js'
2431bdb4 834 )
a1763c54
JB
835 const promises = new Set()
836 let poolBusy = 0
2431bdb4 837 let poolInfo
a1763c54
JB
838 pool.emitter.on(PoolEvents.busy, (info) => {
839 ++poolBusy
2431bdb4
JB
840 poolInfo = info
841 })
a1763c54
JB
842 for (let i = 0; i < numberOfWorkers * 2; i++) {
843 promises.add(pool.execute())
844 }
845 await Promise.all(promises)
846 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
847 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
848 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
849 expect(poolInfo).toStrictEqual({
850 version,
a1763c54
JB
851 type: PoolTypes.fixed,
852 worker: WorkerTypes.thread,
853 ready: expect.any(Boolean),
2431bdb4 854 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
855 minSize: expect.any(Number),
856 maxSize: expect.any(Number),
857 workerNodes: expect.any(Number),
858 idleWorkerNodes: expect.any(Number),
859 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
860 executedTasks: expect.any(Number),
861 executingTasks: expect.any(Number),
a4e07f72 862 failedTasks: expect.any(Number)
d46660cd 863 })
164d950a
JB
864 await pool.destroy()
865 })
866
a1763c54
JB
867 it("Verify that pool event emitter 'full' event can register a callback", async () => {
868 const pool = new DynamicThreadPool(
869 Math.floor(numberOfWorkers / 2),
7c0ba920
JB
870 numberOfWorkers,
871 './tests/worker-files/thread/testWorker.js'
872 )
09c2d0d3 873 const promises = new Set()
a1763c54 874 let poolFull = 0
d46660cd 875 let poolInfo
a1763c54
JB
876 pool.emitter.on(PoolEvents.full, (info) => {
877 ++poolFull
d46660cd
JB
878 poolInfo = info
879 })
7c0ba920 880 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 881 promises.add(pool.execute())
7c0ba920 882 }
cf597bc5 883 await Promise.all(promises)
33e6bb4c 884 expect(poolFull).toBe(1)
d46660cd 885 expect(poolInfo).toStrictEqual({
23ccf9d7 886 version,
a1763c54 887 type: PoolTypes.dynamic,
d46660cd 888 worker: WorkerTypes.thread,
2431bdb4
JB
889 ready: expect.any(Boolean),
890 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
891 minSize: expect.any(Number),
892 maxSize: expect.any(Number),
893 workerNodes: expect.any(Number),
894 idleWorkerNodes: expect.any(Number),
895 busyWorkerNodes: expect.any(Number),
896 executedTasks: expect.any(Number),
897 executingTasks: expect.any(Number),
898 failedTasks: expect.any(Number)
899 })
900 await pool.destroy()
901 })
902
903 it.skip("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 904 const pool = new FixedThreadPool(
8735b4e5
JB
905 numberOfWorkers,
906 './tests/worker-files/thread/testWorker.js',
907 {
908 enableTasksQueue: true
909 }
910 )
b1aae695
JB
911 for (const workerNode of pool.workerNodes) {
912 workerNode.hasBackPressure = sinon
913 .stub()
914 .onFirstCall()
915 .returns(true)
916 .returns(false)
917 }
8735b4e5
JB
918 const promises = new Set()
919 let poolBackPressure = 0
920 let poolInfo
921 pool.emitter.on(PoolEvents.backPressure, (info) => {
922 ++poolBackPressure
923 poolInfo = info
924 })
b1aae695 925 for (let i = 0; i < numberOfWorkers * 2; i++) {
8735b4e5
JB
926 promises.add(pool.execute())
927 }
b1aae695 928 // console.log(pool.info.backPressure)
8735b4e5 929 await Promise.all(promises)
b1aae695 930 // console.log(pool.info.backPressure)
8735b4e5
JB
931 expect(poolBackPressure).toBe(1)
932 expect(poolInfo).toStrictEqual({
933 version,
934 type: PoolTypes.dynamic,
935 worker: WorkerTypes.thread,
936 ready: expect.any(Boolean),
937 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
938 minSize: expect.any(Number),
939 maxSize: expect.any(Number),
940 workerNodes: expect.any(Number),
941 idleWorkerNodes: expect.any(Number),
942 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
943 executedTasks: expect.any(Number),
944 executingTasks: expect.any(Number),
a4e07f72 945 failedTasks: expect.any(Number)
d46660cd 946 })
fd7ebd49 947 await pool.destroy()
7c0ba920 948 })
70a4f5ea 949
90d7d101
JB
950 it('Verify that listTaskFunctions() is working', async () => {
951 const dynamicThreadPool = new DynamicThreadPool(
952 Math.floor(numberOfWorkers / 2),
953 numberOfWorkers,
954 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
955 )
956 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
957 expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
958 'default',
959 'jsonIntegerSerialization',
960 'factorial',
961 'fibonacci'
962 ])
963 const fixedClusterPool = new FixedClusterPool(
964 numberOfWorkers,
965 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
966 )
967 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
968 expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
969 'default',
970 'jsonIntegerSerialization',
971 'factorial',
972 'fibonacci'
973 ])
974 })
975
976 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 977 const pool = new DynamicClusterPool(
2431bdb4 978 Math.floor(numberOfWorkers / 2),
70a4f5ea 979 numberOfWorkers,
90d7d101 980 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
981 )
982 const data = { n: 10 }
82888165 983 const result0 = await pool.execute(data)
30b963d4 984 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 985 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 986 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
987 const result2 = await pool.execute(data, 'factorial')
988 expect(result2).toBe(3628800)
989 const result3 = await pool.execute(data, 'fibonacci')
024daf59 990 expect(result3).toBe(55)
5bb5be17
JB
991 expect(pool.info.executingTasks).toBe(0)
992 expect(pool.info.executedTasks).toBe(4)
b414b84c
JB
993 for (const workerNode of pool.workerNodes) {
994 expect(workerNode.info.taskFunctions).toStrictEqual([
995 'default',
996 'jsonIntegerSerialization',
997 'factorial',
998 'fibonacci'
999 ])
1000 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1001 for (const name of pool.listTaskFunctions()) {
5bb5be17
JB
1002 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1003 tasks: {
1004 executed: expect.any(Number),
1005 executing: expect.any(Number),
1006 failed: 0,
1007 queued: 0
1008 },
1009 runTime: {
1010 history: expect.any(CircularArray)
1011 },
1012 waitTime: {
1013 history: expect.any(CircularArray)
1014 },
1015 elu: {
1016 idle: {
1017 history: expect.any(CircularArray)
1018 },
1019 active: {
1020 history: expect.any(CircularArray)
1021 }
1022 }
1023 })
1024 expect(
1025 workerNode.getTaskFunctionWorkerUsage(name).tasks.executing
1026 ).toBeGreaterThanOrEqual(0)
1027 }
1028 }
70a4f5ea 1029 })
3ec964d6 1030})