feat: add minimum and maximum to internal measurements
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
6b27d407 8 WorkerChoiceStrategies,
184855e6
JB
9 PoolTypes,
10 WorkerTypes
cdace0e5 11} = require('../../../lib')
78099a15 12const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 13const { Queue } = require('../../../lib/queue')
23ccf9d7 14const { version } = require('../../../package.json')
e1ffb94f
JB
15
16describe('Abstract pool test suite', () => {
fc027381 17 const numberOfWorkers = 2
a8884ffd 18 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 19 removeAllWorker () {
d4aeae5a 20 this.workerNodes = []
c923ce56 21 this.promiseResponseMap.clear()
e1ffb94f 22 }
3ec964d6 23 }
a8884ffd 24 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
25 isMain () {
26 return false
27 }
3ec964d6 28 }
3ec964d6 29
3ec964d6 30 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
31 expect(
32 () =>
a8884ffd 33 new StubPoolWithIsMain(
7c0ba920 34 numberOfWorkers,
8d3782fa
JB
35 './tests/worker-files/thread/testWorker.js',
36 {
37 errorHandler: e => console.error(e)
38 }
39 )
d4aeae5a 40 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 41 })
c510fea7
APA
42
43 it('Verify that filePath is checked', () => {
292ad316
JB
44 const expectedError = new Error(
45 'Please specify a file with a worker implementation'
46 )
7c0ba920 47 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 48 expectedError
8d3782fa 49 )
7c0ba920 50 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 51 expectedError
8d3782fa
JB
52 )
53 })
54
55 it('Verify that numberOfWorkers is checked', () => {
56 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 57 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
58 )
59 })
60
61 it('Verify that a negative number of workers is checked', () => {
62 expect(
63 () =>
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
65 ).toThrowError(
473c717a
JB
66 new RangeError(
67 'Cannot instantiate a pool with a negative number of workers'
68 )
8d3782fa
JB
69 )
70 })
71
72 it('Verify that a non integer number of workers is checked', () => {
73 expect(
74 () =>
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
76 ).toThrowError(
473c717a 77 new TypeError(
0d80593b 78 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
79 )
80 )
c510fea7 81 })
7c0ba920 82
fd7ebd49 83 it('Verify that pool options are checked', async () => {
7c0ba920
JB
84 let pool = new FixedThreadPool(
85 numberOfWorkers,
86 './tests/worker-files/thread/testWorker.js'
87 )
7c0ba920 88 expect(pool.emitter).toBeDefined()
1f68cede
JB
89 expect(pool.opts.enableEvents).toBe(true)
90 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 91 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 92 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
93 expect(pool.opts.workerChoiceStrategy).toBe(
94 WorkerChoiceStrategies.ROUND_ROBIN
95 )
da309861 96 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 97 runTime: { median: false },
5df69fab
JB
98 waitTime: { median: false },
99 elu: { median: false }
da309861 100 })
35cf1c03
JB
101 expect(pool.opts.messageHandler).toBeUndefined()
102 expect(pool.opts.errorHandler).toBeUndefined()
103 expect(pool.opts.onlineHandler).toBeUndefined()
104 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 105 await pool.destroy()
35cf1c03 106 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
107 pool = new FixedThreadPool(
108 numberOfWorkers,
109 './tests/worker-files/thread/testWorker.js',
110 {
e4543b14 111 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 112 workerChoiceStrategyOptions: {
932fc8be 113 runTime: { median: true },
fc027381 114 weights: { 0: 300, 1: 200 }
49be33fe 115 },
35cf1c03 116 enableEvents: false,
1f68cede 117 restartWorkerOnError: false,
ff733df7 118 enableTasksQueue: true,
d4aeae5a 119 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
120 messageHandler: testHandler,
121 errorHandler: testHandler,
122 onlineHandler: testHandler,
123 exitHandler: testHandler
7c0ba920
JB
124 }
125 )
7c0ba920 126 expect(pool.emitter).toBeUndefined()
1f68cede
JB
127 expect(pool.opts.enableEvents).toBe(false)
128 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 129 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 130 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 131 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 132 WorkerChoiceStrategies.LEAST_USED
e843b904 133 )
da309861 134 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 135 runTime: { median: true },
fc027381 136 weights: { 0: 300, 1: 200 }
da309861 137 })
35cf1c03
JB
138 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
139 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
140 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
141 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 142 await pool.destroy()
7c0ba920
JB
143 })
144
a20f0ba5 145 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
146 expect(
147 () =>
148 new FixedThreadPool(
149 numberOfWorkers,
150 './tests/worker-files/thread/testWorker.js',
151 {
f0d7f803 152 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
153 }
154 )
f0d7f803 155 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
d4aeae5a
JB
156 expect(
157 () =>
158 new FixedThreadPool(
159 numberOfWorkers,
160 './tests/worker-files/thread/testWorker.js',
161 {
f0d7f803 162 workerChoiceStrategyOptions: 'invalidOptions'
d4aeae5a
JB
163 }
164 )
f0d7f803
JB
165 ).toThrowError(
166 'Invalid worker choice strategy options: must be a plain object'
167 )
49be33fe
JB
168 expect(
169 () =>
170 new FixedThreadPool(
171 numberOfWorkers,
172 './tests/worker-files/thread/testWorker.js',
173 {
174 workerChoiceStrategyOptions: { weights: {} }
175 }
176 )
177 ).toThrowError(
178 'Invalid worker choice strategy options: must have a weight for each worker node'
179 )
f0d7f803
JB
180 expect(
181 () =>
182 new FixedThreadPool(
183 numberOfWorkers,
184 './tests/worker-files/thread/testWorker.js',
185 {
186 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
187 }
188 )
189 ).toThrowError(
190 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
191 )
192 expect(
193 () =>
194 new FixedThreadPool(
195 numberOfWorkers,
196 './tests/worker-files/thread/testWorker.js',
197 {
198 enableTasksQueue: true,
199 tasksQueueOptions: { concurrency: 0 }
200 }
201 )
202 ).toThrowError("Invalid worker tasks concurrency '0'")
203 expect(
204 () =>
205 new FixedThreadPool(
206 numberOfWorkers,
207 './tests/worker-files/thread/testWorker.js',
208 {
209 enableTasksQueue: true,
210 tasksQueueOptions: 'invalidTasksQueueOptions'
211 }
212 )
213 ).toThrowError('Invalid tasks queue options: must be a plain object')
214 expect(
215 () =>
216 new FixedThreadPool(
217 numberOfWorkers,
218 './tests/worker-files/thread/testWorker.js',
219 {
220 enableTasksQueue: true,
221 tasksQueueOptions: { concurrency: 0.2 }
222 }
223 )
224 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
d4aeae5a
JB
225 })
226
a20f0ba5
JB
227 it('Verify that worker choice strategy options can be set', async () => {
228 const pool = new FixedThreadPool(
229 numberOfWorkers,
230 './tests/worker-files/thread/testWorker.js',
231 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
232 )
233 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 234 runTime: { median: false },
5df69fab
JB
235 waitTime: { median: false },
236 elu: { median: false }
a20f0ba5
JB
237 })
238 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
239 .workerChoiceStrategies) {
86bf340d 240 expect(workerChoiceStrategy.opts).toStrictEqual({
932fc8be 241 runTime: { median: false },
5df69fab
JB
242 waitTime: { median: false },
243 elu: { median: false }
86bf340d 244 })
a20f0ba5 245 }
87de9ff5
JB
246 expect(
247 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
248 ).toStrictEqual({
932fc8be
JB
249 runTime: {
250 aggregate: true,
251 average: true,
252 median: false
253 },
254 waitTime: {
255 aggregate: false,
256 average: false,
257 median: false
258 },
5df69fab 259 elu: {
9adcefab
JB
260 aggregate: true,
261 average: true,
5df69fab
JB
262 median: false
263 }
86bf340d 264 })
9adcefab
JB
265 pool.setWorkerChoiceStrategyOptions({
266 runTime: { median: true },
267 elu: { median: true }
268 })
a20f0ba5 269 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
270 runTime: { median: true },
271 elu: { median: true }
a20f0ba5
JB
272 })
273 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
274 .workerChoiceStrategies) {
932fc8be 275 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
276 runTime: { median: true },
277 elu: { median: true }
932fc8be 278 })
a20f0ba5 279 }
87de9ff5
JB
280 expect(
281 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
282 ).toStrictEqual({
932fc8be
JB
283 runTime: {
284 aggregate: true,
285 average: false,
286 median: true
287 },
288 waitTime: {
289 aggregate: false,
290 average: false,
291 median: false
292 },
5df69fab 293 elu: {
9adcefab 294 aggregate: true,
5df69fab 295 average: false,
9adcefab 296 median: true
5df69fab 297 }
86bf340d 298 })
9adcefab
JB
299 pool.setWorkerChoiceStrategyOptions({
300 runTime: { median: false },
301 elu: { median: false }
302 })
a20f0ba5 303 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
304 runTime: { median: false },
305 elu: { median: false }
a20f0ba5
JB
306 })
307 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
308 .workerChoiceStrategies) {
932fc8be 309 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
310 runTime: { median: false },
311 elu: { median: false }
932fc8be 312 })
a20f0ba5 313 }
87de9ff5
JB
314 expect(
315 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
316 ).toStrictEqual({
932fc8be
JB
317 runTime: {
318 aggregate: true,
319 average: true,
320 median: false
321 },
322 waitTime: {
323 aggregate: false,
324 average: false,
325 median: false
326 },
5df69fab 327 elu: {
9adcefab
JB
328 aggregate: true,
329 average: true,
5df69fab
JB
330 median: false
331 }
86bf340d 332 })
1f95d544
JB
333 expect(() =>
334 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
335 ).toThrowError(
336 'Invalid worker choice strategy options: must be a plain object'
337 )
338 expect(() =>
339 pool.setWorkerChoiceStrategyOptions({ weights: {} })
340 ).toThrowError(
341 'Invalid worker choice strategy options: must have a weight for each worker node'
342 )
343 expect(() =>
344 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
345 ).toThrowError(
346 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
347 )
a20f0ba5
JB
348 await pool.destroy()
349 })
350
351 it('Verify that tasks queue can be enabled/disabled', async () => {
352 const pool = new FixedThreadPool(
353 numberOfWorkers,
354 './tests/worker-files/thread/testWorker.js'
355 )
356 expect(pool.opts.enableTasksQueue).toBe(false)
357 expect(pool.opts.tasksQueueOptions).toBeUndefined()
358 pool.enableTasksQueue(true)
359 expect(pool.opts.enableTasksQueue).toBe(true)
360 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
361 pool.enableTasksQueue(true, { concurrency: 2 })
362 expect(pool.opts.enableTasksQueue).toBe(true)
363 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
364 pool.enableTasksQueue(false)
365 expect(pool.opts.enableTasksQueue).toBe(false)
366 expect(pool.opts.tasksQueueOptions).toBeUndefined()
367 await pool.destroy()
368 })
369
370 it('Verify that tasks queue options can be set', async () => {
371 const pool = new FixedThreadPool(
372 numberOfWorkers,
373 './tests/worker-files/thread/testWorker.js',
374 { enableTasksQueue: true }
375 )
376 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
377 pool.setTasksQueueOptions({ concurrency: 2 })
378 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
f0d7f803
JB
379 expect(() =>
380 pool.setTasksQueueOptions('invalidTasksQueueOptions')
381 ).toThrowError('Invalid tasks queue options: must be a plain object')
a20f0ba5
JB
382 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
383 "Invalid worker tasks concurrency '0'"
384 )
f0d7f803
JB
385 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
386 'Invalid worker tasks concurrency: must be an integer'
387 )
a20f0ba5
JB
388 await pool.destroy()
389 })
390
6b27d407
JB
391 it('Verify that pool info is set', async () => {
392 let pool = new FixedThreadPool(
393 numberOfWorkers,
394 './tests/worker-files/thread/testWorker.js'
395 )
396 expect(pool.info).toStrictEqual({
23ccf9d7 397 version,
6b27d407 398 type: PoolTypes.fixed,
184855e6 399 worker: WorkerTypes.thread,
6b27d407
JB
400 minSize: numberOfWorkers,
401 maxSize: numberOfWorkers,
402 workerNodes: numberOfWorkers,
403 idleWorkerNodes: numberOfWorkers,
404 busyWorkerNodes: 0,
a4e07f72
JB
405 executedTasks: 0,
406 executingTasks: 0,
6b27d407 407 queuedTasks: 0,
a4e07f72
JB
408 maxQueuedTasks: 0,
409 failedTasks: 0
6b27d407
JB
410 })
411 await pool.destroy()
412 pool = new DynamicClusterPool(
413 numberOfWorkers,
414 numberOfWorkers * 2,
ecdfbdc0 415 './tests/worker-files/cluster/testWorker.js'
6b27d407
JB
416 )
417 expect(pool.info).toStrictEqual({
23ccf9d7 418 version,
6b27d407 419 type: PoolTypes.dynamic,
184855e6 420 worker: WorkerTypes.cluster,
6b27d407
JB
421 minSize: numberOfWorkers,
422 maxSize: numberOfWorkers * 2,
423 workerNodes: numberOfWorkers,
424 idleWorkerNodes: numberOfWorkers,
425 busyWorkerNodes: 0,
a4e07f72
JB
426 executedTasks: 0,
427 executingTasks: 0,
6b27d407 428 queuedTasks: 0,
a4e07f72
JB
429 maxQueuedTasks: 0,
430 failedTasks: 0
6b27d407
JB
431 })
432 await pool.destroy()
433 })
434
9d16d33e 435 it('Simulate worker not found', async () => {
a8884ffd 436 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4 437 numberOfWorkers,
465b2940 438 './tests/worker-files/thread/testWorker.js',
10fcfaf4 439 {
10fcfaf4
JB
440 errorHandler: e => console.error(e)
441 }
442 )
d4aeae5a 443 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
444 // Simulate worker not found.
445 pool.removeAllWorker()
d4aeae5a 446 expect(pool.workerNodes.length).toBe(0)
fd7ebd49 447 await pool.destroy()
bf9549ae
JB
448 })
449
fd7ebd49 450 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
451 const pool = new FixedClusterPool(
452 numberOfWorkers,
453 './tests/worker-files/cluster/testWorker.js'
454 )
f06e48d8 455 for (const workerNode of pool.workerNodes) {
465b2940 456 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
457 tasks: {
458 executed: 0,
459 executing: 0,
460 queued: 0,
df593701 461 maxQueued: 0,
a4e07f72
JB
462 failed: 0
463 },
464 runTime: {
932fc8be 465 aggregate: 0,
f7510105
JB
466 maximum: 0,
467 minimum: 0,
a4e07f72
JB
468 average: 0,
469 median: 0,
470 history: expect.any(CircularArray)
471 },
472 waitTime: {
932fc8be 473 aggregate: 0,
f7510105
JB
474 maximum: 0,
475 minimum: 0,
a4e07f72
JB
476 average: 0,
477 median: 0,
478 history: expect.any(CircularArray)
479 },
5df69fab
JB
480 elu: {
481 idle: {
482 aggregate: 0,
f7510105
JB
483 maximum: 0,
484 minimum: 0,
5df69fab
JB
485 average: 0,
486 median: 0,
487 history: expect.any(CircularArray)
488 },
489 active: {
490 aggregate: 0,
f7510105
JB
491 maximum: 0,
492 minimum: 0,
5df69fab
JB
493 average: 0,
494 median: 0,
495 history: expect.any(CircularArray)
f7510105 496 }
5df69fab 497 }
86bf340d 498 })
f06e48d8
JB
499 }
500 await pool.destroy()
501 })
502
503 it('Verify that worker pool tasks queue are initialized', async () => {
504 const pool = new FixedClusterPool(
505 numberOfWorkers,
506 './tests/worker-files/cluster/testWorker.js'
507 )
508 for (const workerNode of pool.workerNodes) {
509 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 510 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 511 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 512 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 513 }
fd7ebd49 514 await pool.destroy()
bf9549ae
JB
515 })
516
517 it('Verify that worker pool tasks usage are computed', async () => {
518 const pool = new FixedClusterPool(
519 numberOfWorkers,
520 './tests/worker-files/cluster/testWorker.js'
521 )
09c2d0d3 522 const promises = new Set()
fc027381
JB
523 const maxMultiplier = 2
524 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 525 promises.add(pool.execute())
bf9549ae 526 }
f06e48d8 527 for (const workerNode of pool.workerNodes) {
465b2940 528 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
529 tasks: {
530 executed: 0,
531 executing: maxMultiplier,
532 queued: 0,
df593701 533 maxQueued: 0,
a4e07f72
JB
534 failed: 0
535 },
536 runTime: {
932fc8be 537 aggregate: 0,
f7510105
JB
538 maximum: 0,
539 minimum: 0,
a4e07f72
JB
540 average: 0,
541 median: 0,
542 history: expect.any(CircularArray)
543 },
544 waitTime: {
932fc8be 545 aggregate: 0,
f7510105
JB
546 maximum: 0,
547 minimum: 0,
a4e07f72
JB
548 average: 0,
549 median: 0,
550 history: expect.any(CircularArray)
551 },
5df69fab
JB
552 elu: {
553 idle: {
554 aggregate: 0,
f7510105
JB
555 maximum: 0,
556 minimum: 0,
5df69fab
JB
557 average: 0,
558 median: 0,
559 history: expect.any(CircularArray)
560 },
561 active: {
562 aggregate: 0,
f7510105
JB
563 maximum: 0,
564 minimum: 0,
5df69fab
JB
565 average: 0,
566 median: 0,
567 history: expect.any(CircularArray)
f7510105 568 }
5df69fab 569 }
86bf340d 570 })
bf9549ae
JB
571 }
572 await Promise.all(promises)
f06e48d8 573 for (const workerNode of pool.workerNodes) {
465b2940 574 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
575 tasks: {
576 executed: maxMultiplier,
577 executing: 0,
578 queued: 0,
df593701 579 maxQueued: 0,
a4e07f72
JB
580 failed: 0
581 },
582 runTime: {
932fc8be 583 aggregate: 0,
f7510105
JB
584 maximum: 0,
585 minimum: 0,
a4e07f72
JB
586 average: 0,
587 median: 0,
588 history: expect.any(CircularArray)
589 },
590 waitTime: {
932fc8be 591 aggregate: 0,
f7510105
JB
592 maximum: 0,
593 minimum: 0,
a4e07f72
JB
594 average: 0,
595 median: 0,
596 history: expect.any(CircularArray)
597 },
5df69fab
JB
598 elu: {
599 idle: {
600 aggregate: 0,
f7510105
JB
601 maximum: 0,
602 minimum: 0,
5df69fab
JB
603 average: 0,
604 median: 0,
605 history: expect.any(CircularArray)
606 },
607 active: {
608 aggregate: 0,
f7510105
JB
609 maximum: 0,
610 minimum: 0,
5df69fab
JB
611 average: 0,
612 median: 0,
613 history: expect.any(CircularArray)
f7510105 614 }
5df69fab 615 }
86bf340d 616 })
bf9549ae 617 }
fd7ebd49 618 await pool.destroy()
bf9549ae
JB
619 })
620
ee11a4a2 621 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 622 const pool = new DynamicThreadPool(
9e619829 623 numberOfWorkers,
8f4878b7 624 numberOfWorkers,
9e619829
JB
625 './tests/worker-files/thread/testWorker.js'
626 )
09c2d0d3 627 const promises = new Set()
ee9f5295
JB
628 const maxMultiplier = 2
629 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 630 promises.add(pool.execute())
9e619829
JB
631 }
632 await Promise.all(promises)
f06e48d8 633 for (const workerNode of pool.workerNodes) {
465b2940 634 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
635 tasks: {
636 executed: expect.any(Number),
637 executing: 0,
638 queued: 0,
df593701 639 maxQueued: 0,
a4e07f72
JB
640 failed: 0
641 },
642 runTime: {
932fc8be 643 aggregate: 0,
f7510105
JB
644 maximum: 0,
645 minimum: 0,
a4e07f72
JB
646 average: 0,
647 median: 0,
648 history: expect.any(CircularArray)
649 },
650 waitTime: {
932fc8be 651 aggregate: 0,
f7510105
JB
652 maximum: 0,
653 minimum: 0,
a4e07f72
JB
654 average: 0,
655 median: 0,
656 history: expect.any(CircularArray)
657 },
5df69fab
JB
658 elu: {
659 idle: {
660 aggregate: 0,
f7510105
JB
661 maximum: 0,
662 minimum: 0,
5df69fab
JB
663 average: 0,
664 median: 0,
665 history: expect.any(CircularArray)
666 },
667 active: {
668 aggregate: 0,
f7510105
JB
669 maximum: 0,
670 minimum: 0,
5df69fab
JB
671 average: 0,
672 median: 0,
673 history: expect.any(CircularArray)
f7510105 674 }
5df69fab 675 }
86bf340d 676 })
465b2940
JB
677 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
678 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
9e619829
JB
679 }
680 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 681 for (const workerNode of pool.workerNodes) {
465b2940 682 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
683 tasks: {
684 executed: 0,
685 executing: 0,
686 queued: 0,
df593701 687 maxQueued: 0,
a4e07f72
JB
688 failed: 0
689 },
690 runTime: {
932fc8be 691 aggregate: 0,
f7510105
JB
692 maximum: 0,
693 minimum: 0,
a4e07f72
JB
694 average: 0,
695 median: 0,
696 history: expect.any(CircularArray)
697 },
698 waitTime: {
932fc8be 699 aggregate: 0,
f7510105
JB
700 maximum: 0,
701 minimum: 0,
a4e07f72
JB
702 average: 0,
703 median: 0,
704 history: expect.any(CircularArray)
705 },
5df69fab
JB
706 elu: {
707 idle: {
708 aggregate: 0,
f7510105
JB
709 maximum: 0,
710 minimum: 0,
5df69fab
JB
711 average: 0,
712 median: 0,
713 history: expect.any(CircularArray)
714 },
715 active: {
716 aggregate: 0,
f7510105
JB
717 maximum: 0,
718 minimum: 0,
5df69fab
JB
719 average: 0,
720 median: 0,
721 history: expect.any(CircularArray)
f7510105 722 }
5df69fab 723 }
86bf340d 724 })
465b2940
JB
725 expect(workerNode.usage.runTime.history.length).toBe(0)
726 expect(workerNode.usage.waitTime.history.length).toBe(0)
ee11a4a2 727 }
fd7ebd49 728 await pool.destroy()
ee11a4a2
JB
729 })
730
164d950a
JB
731 it("Verify that pool event emitter 'full' event can register a callback", async () => {
732 const pool = new DynamicThreadPool(
733 numberOfWorkers,
734 numberOfWorkers,
735 './tests/worker-files/thread/testWorker.js'
736 )
09c2d0d3 737 const promises = new Set()
164d950a 738 let poolFull = 0
d46660cd
JB
739 let poolInfo
740 pool.emitter.on(PoolEvents.full, info => {
741 ++poolFull
742 poolInfo = info
743 })
164d950a 744 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 745 promises.add(pool.execute())
164d950a
JB
746 }
747 await Promise.all(promises)
594bfb84 748 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
fc027381
JB
749 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
750 expect(poolFull).toBe(numberOfWorkers * 2)
d46660cd 751 expect(poolInfo).toStrictEqual({
23ccf9d7 752 version,
d46660cd
JB
753 type: PoolTypes.dynamic,
754 worker: WorkerTypes.thread,
755 minSize: expect.any(Number),
756 maxSize: expect.any(Number),
757 workerNodes: expect.any(Number),
758 idleWorkerNodes: expect.any(Number),
759 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
760 executedTasks: expect.any(Number),
761 executingTasks: expect.any(Number),
d46660cd 762 queuedTasks: expect.any(Number),
a4e07f72
JB
763 maxQueuedTasks: expect.any(Number),
764 failedTasks: expect.any(Number)
d46660cd 765 })
164d950a
JB
766 await pool.destroy()
767 })
768
cf597bc5 769 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
770 const pool = new FixedThreadPool(
771 numberOfWorkers,
772 './tests/worker-files/thread/testWorker.js'
773 )
09c2d0d3 774 const promises = new Set()
7c0ba920 775 let poolBusy = 0
d46660cd
JB
776 let poolInfo
777 pool.emitter.on(PoolEvents.busy, info => {
778 ++poolBusy
779 poolInfo = info
780 })
7c0ba920 781 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 782 promises.add(pool.execute())
7c0ba920 783 }
cf597bc5 784 await Promise.all(promises)
14916bf9
JB
785 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
786 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
787 expect(poolBusy).toBe(numberOfWorkers + 1)
d46660cd 788 expect(poolInfo).toStrictEqual({
23ccf9d7 789 version,
d46660cd
JB
790 type: PoolTypes.fixed,
791 worker: WorkerTypes.thread,
792 minSize: expect.any(Number),
793 maxSize: expect.any(Number),
794 workerNodes: expect.any(Number),
795 idleWorkerNodes: expect.any(Number),
796 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
797 executedTasks: expect.any(Number),
798 executingTasks: expect.any(Number),
d46660cd 799 queuedTasks: expect.any(Number),
a4e07f72
JB
800 maxQueuedTasks: expect.any(Number),
801 failedTasks: expect.any(Number)
d46660cd 802 })
fd7ebd49 803 await pool.destroy()
7c0ba920 804 })
70a4f5ea
JB
805
806 it('Verify that multiple tasks worker is working', async () => {
807 const pool = new DynamicClusterPool(
808 numberOfWorkers,
809 numberOfWorkers * 2,
810 './tests/worker-files/cluster/testMultiTasksWorker.js'
811 )
812 const data = { n: 10 }
82888165
JB
813 const result0 = await pool.execute(data)
814 expect(result0).toBe(false)
70a4f5ea
JB
815 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
816 expect(result1).toBe(false)
817 const result2 = await pool.execute(data, 'factorial')
818 expect(result2).toBe(3628800)
819 const result3 = await pool.execute(data, 'fibonacci')
024daf59 820 expect(result3).toBe(55)
70a4f5ea 821 })
3ec964d6 822})