feat: add support for tasks ELU in fair share strategy
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
6b27d407 8 WorkerChoiceStrategies,
184855e6
JB
9 PoolTypes,
10 WorkerTypes
cdace0e5 11} = require('../../../lib')
78099a15 12const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 13const { Queue } = require('../../../lib/queue')
e1ffb94f
JB
14
15describe('Abstract pool test suite', () => {
fc027381 16 const numberOfWorkers = 2
a8884ffd 17 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 18 removeAllWorker () {
d4aeae5a 19 this.workerNodes = []
c923ce56 20 this.promiseResponseMap.clear()
e1ffb94f 21 }
3ec964d6 22 }
a8884ffd 23 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
24 isMain () {
25 return false
26 }
3ec964d6 27 }
3ec964d6 28
3ec964d6 29 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
30 expect(
31 () =>
a8884ffd 32 new StubPoolWithIsMain(
7c0ba920 33 numberOfWorkers,
8d3782fa
JB
34 './tests/worker-files/thread/testWorker.js',
35 {
36 errorHandler: e => console.error(e)
37 }
38 )
d4aeae5a 39 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 40 })
c510fea7
APA
41
42 it('Verify that filePath is checked', () => {
292ad316
JB
43 const expectedError = new Error(
44 'Please specify a file with a worker implementation'
45 )
7c0ba920 46 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 47 expectedError
8d3782fa 48 )
7c0ba920 49 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 50 expectedError
8d3782fa
JB
51 )
52 })
53
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 56 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
57 )
58 })
59
60 it('Verify that a negative number of workers is checked', () => {
61 expect(
62 () =>
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 ).toThrowError(
473c717a
JB
65 new RangeError(
66 'Cannot instantiate a pool with a negative number of workers'
67 )
8d3782fa
JB
68 )
69 })
70
71 it('Verify that a non integer number of workers is checked', () => {
72 expect(
73 () =>
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 ).toThrowError(
473c717a 76 new TypeError(
0d80593b 77 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
78 )
79 )
c510fea7 80 })
7c0ba920 81
fd7ebd49 82 it('Verify that pool options are checked', async () => {
7c0ba920
JB
83 let pool = new FixedThreadPool(
84 numberOfWorkers,
85 './tests/worker-files/thread/testWorker.js'
86 )
7c0ba920 87 expect(pool.emitter).toBeDefined()
1f68cede
JB
88 expect(pool.opts.enableEvents).toBe(true)
89 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 90 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 91 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
92 expect(pool.opts.workerChoiceStrategy).toBe(
93 WorkerChoiceStrategies.ROUND_ROBIN
94 )
da309861 95 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 96 runTime: { median: false },
5df69fab
JB
97 waitTime: { median: false },
98 elu: { median: false }
da309861 99 })
35cf1c03
JB
100 expect(pool.opts.messageHandler).toBeUndefined()
101 expect(pool.opts.errorHandler).toBeUndefined()
102 expect(pool.opts.onlineHandler).toBeUndefined()
103 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 104 await pool.destroy()
35cf1c03 105 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
106 pool = new FixedThreadPool(
107 numberOfWorkers,
108 './tests/worker-files/thread/testWorker.js',
109 {
e4543b14 110 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 111 workerChoiceStrategyOptions: {
932fc8be 112 runTime: { median: true },
fc027381 113 weights: { 0: 300, 1: 200 }
49be33fe 114 },
35cf1c03 115 enableEvents: false,
1f68cede 116 restartWorkerOnError: false,
ff733df7 117 enableTasksQueue: true,
d4aeae5a 118 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
119 messageHandler: testHandler,
120 errorHandler: testHandler,
121 onlineHandler: testHandler,
122 exitHandler: testHandler
7c0ba920
JB
123 }
124 )
7c0ba920 125 expect(pool.emitter).toBeUndefined()
1f68cede
JB
126 expect(pool.opts.enableEvents).toBe(false)
127 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 128 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 129 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 130 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 131 WorkerChoiceStrategies.LEAST_USED
e843b904 132 )
da309861 133 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 134 runTime: { median: true },
fc027381 135 weights: { 0: 300, 1: 200 }
da309861 136 })
35cf1c03
JB
137 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
138 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
139 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
140 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 141 await pool.destroy()
7c0ba920
JB
142 })
143
a20f0ba5 144 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
145 expect(
146 () =>
147 new FixedThreadPool(
148 numberOfWorkers,
149 './tests/worker-files/thread/testWorker.js',
150 {
151 enableTasksQueue: true,
152 tasksQueueOptions: { concurrency: 0 }
153 }
154 )
155 ).toThrowError("Invalid worker tasks concurrency '0'")
156 expect(
157 () =>
158 new FixedThreadPool(
159 numberOfWorkers,
160 './tests/worker-files/thread/testWorker.js',
161 {
162 workerChoiceStrategy: 'invalidStrategy'
163 }
164 )
165 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
49be33fe
JB
166 expect(
167 () =>
168 new FixedThreadPool(
169 numberOfWorkers,
170 './tests/worker-files/thread/testWorker.js',
171 {
172 workerChoiceStrategyOptions: { weights: {} }
173 }
174 )
175 ).toThrowError(
176 'Invalid worker choice strategy options: must have a weight for each worker node'
177 )
d4aeae5a
JB
178 })
179
a20f0ba5
JB
180 it('Verify that worker choice strategy options can be set', async () => {
181 const pool = new FixedThreadPool(
182 numberOfWorkers,
183 './tests/worker-files/thread/testWorker.js',
184 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
185 )
186 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 187 runTime: { median: false },
5df69fab
JB
188 waitTime: { median: false },
189 elu: { median: false }
a20f0ba5
JB
190 })
191 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
192 .workerChoiceStrategies) {
86bf340d 193 expect(workerChoiceStrategy.opts).toStrictEqual({
932fc8be 194 runTime: { median: false },
5df69fab
JB
195 waitTime: { median: false },
196 elu: { median: false }
86bf340d 197 })
a20f0ba5 198 }
87de9ff5
JB
199 expect(
200 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
201 ).toStrictEqual({
932fc8be
JB
202 runTime: {
203 aggregate: true,
204 average: true,
205 median: false
206 },
207 waitTime: {
208 aggregate: false,
209 average: false,
210 median: false
211 },
5df69fab 212 elu: {
9adcefab
JB
213 aggregate: true,
214 average: true,
5df69fab
JB
215 median: false
216 }
86bf340d 217 })
9adcefab
JB
218 pool.setWorkerChoiceStrategyOptions({
219 runTime: { median: true },
220 elu: { median: true }
221 })
a20f0ba5 222 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
223 runTime: { median: true },
224 elu: { median: true }
a20f0ba5
JB
225 })
226 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
227 .workerChoiceStrategies) {
932fc8be 228 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
229 runTime: { median: true },
230 elu: { median: true }
932fc8be 231 })
a20f0ba5 232 }
87de9ff5
JB
233 expect(
234 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
235 ).toStrictEqual({
932fc8be
JB
236 runTime: {
237 aggregate: true,
238 average: false,
239 median: true
240 },
241 waitTime: {
242 aggregate: false,
243 average: false,
244 median: false
245 },
5df69fab 246 elu: {
9adcefab 247 aggregate: true,
5df69fab 248 average: false,
9adcefab 249 median: true
5df69fab 250 }
86bf340d 251 })
9adcefab
JB
252 pool.setWorkerChoiceStrategyOptions({
253 runTime: { median: false },
254 elu: { median: false }
255 })
a20f0ba5 256 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
257 runTime: { median: false },
258 elu: { median: false }
a20f0ba5
JB
259 })
260 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
261 .workerChoiceStrategies) {
932fc8be 262 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
263 runTime: { median: false },
264 elu: { median: false }
932fc8be 265 })
a20f0ba5 266 }
87de9ff5
JB
267 expect(
268 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
269 ).toStrictEqual({
932fc8be
JB
270 runTime: {
271 aggregate: true,
272 average: true,
273 median: false
274 },
275 waitTime: {
276 aggregate: false,
277 average: false,
278 median: false
279 },
5df69fab 280 elu: {
9adcefab
JB
281 aggregate: true,
282 average: true,
5df69fab
JB
283 median: false
284 }
86bf340d 285 })
a20f0ba5
JB
286 await pool.destroy()
287 })
288
289 it('Verify that tasks queue can be enabled/disabled', async () => {
290 const pool = new FixedThreadPool(
291 numberOfWorkers,
292 './tests/worker-files/thread/testWorker.js'
293 )
294 expect(pool.opts.enableTasksQueue).toBe(false)
295 expect(pool.opts.tasksQueueOptions).toBeUndefined()
296 pool.enableTasksQueue(true)
297 expect(pool.opts.enableTasksQueue).toBe(true)
298 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
299 pool.enableTasksQueue(true, { concurrency: 2 })
300 expect(pool.opts.enableTasksQueue).toBe(true)
301 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
302 pool.enableTasksQueue(false)
303 expect(pool.opts.enableTasksQueue).toBe(false)
304 expect(pool.opts.tasksQueueOptions).toBeUndefined()
305 await pool.destroy()
306 })
307
308 it('Verify that tasks queue options can be set', async () => {
309 const pool = new FixedThreadPool(
310 numberOfWorkers,
311 './tests/worker-files/thread/testWorker.js',
312 { enableTasksQueue: true }
313 )
314 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
315 pool.setTasksQueueOptions({ concurrency: 2 })
316 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
317 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
318 "Invalid worker tasks concurrency '0'"
319 )
320 await pool.destroy()
321 })
322
6b27d407
JB
323 it('Verify that pool info is set', async () => {
324 let pool = new FixedThreadPool(
325 numberOfWorkers,
326 './tests/worker-files/thread/testWorker.js'
327 )
328 expect(pool.info).toStrictEqual({
329 type: PoolTypes.fixed,
184855e6 330 worker: WorkerTypes.thread,
6b27d407
JB
331 minSize: numberOfWorkers,
332 maxSize: numberOfWorkers,
333 workerNodes: numberOfWorkers,
334 idleWorkerNodes: numberOfWorkers,
335 busyWorkerNodes: 0,
a4e07f72
JB
336 executedTasks: 0,
337 executingTasks: 0,
6b27d407 338 queuedTasks: 0,
a4e07f72
JB
339 maxQueuedTasks: 0,
340 failedTasks: 0
6b27d407
JB
341 })
342 await pool.destroy()
343 pool = new DynamicClusterPool(
344 numberOfWorkers,
345 numberOfWorkers * 2,
346 './tests/worker-files/thread/testWorker.js'
347 )
348 expect(pool.info).toStrictEqual({
349 type: PoolTypes.dynamic,
184855e6 350 worker: WorkerTypes.cluster,
6b27d407
JB
351 minSize: numberOfWorkers,
352 maxSize: numberOfWorkers * 2,
353 workerNodes: numberOfWorkers,
354 idleWorkerNodes: numberOfWorkers,
355 busyWorkerNodes: 0,
a4e07f72
JB
356 executedTasks: 0,
357 executingTasks: 0,
6b27d407 358 queuedTasks: 0,
a4e07f72
JB
359 maxQueuedTasks: 0,
360 failedTasks: 0
6b27d407
JB
361 })
362 await pool.destroy()
363 })
364
9d16d33e 365 it('Simulate worker not found', async () => {
a8884ffd 366 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4
JB
367 numberOfWorkers,
368 './tests/worker-files/cluster/testWorker.js',
369 {
10fcfaf4
JB
370 errorHandler: e => console.error(e)
371 }
372 )
d4aeae5a 373 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
374 // Simulate worker not found.
375 pool.removeAllWorker()
d4aeae5a 376 expect(pool.workerNodes.length).toBe(0)
fd7ebd49 377 await pool.destroy()
bf9549ae
JB
378 })
379
fd7ebd49 380 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
381 const pool = new FixedClusterPool(
382 numberOfWorkers,
383 './tests/worker-files/cluster/testWorker.js'
384 )
f06e48d8 385 for (const workerNode of pool.workerNodes) {
a4e07f72
JB
386 expect(workerNode.workerUsage).toStrictEqual({
387 tasks: {
388 executed: 0,
389 executing: 0,
390 queued: 0,
391 failed: 0
392 },
393 runTime: {
932fc8be 394 aggregate: 0,
a4e07f72
JB
395 average: 0,
396 median: 0,
397 history: expect.any(CircularArray)
398 },
399 waitTime: {
932fc8be 400 aggregate: 0,
a4e07f72
JB
401 average: 0,
402 median: 0,
403 history: expect.any(CircularArray)
404 },
5df69fab
JB
405 elu: {
406 idle: {
407 aggregate: 0,
408 average: 0,
409 median: 0,
410 history: expect.any(CircularArray)
411 },
412 active: {
413 aggregate: 0,
414 average: 0,
415 median: 0,
416 history: expect.any(CircularArray)
417 },
418 utilization: 0
419 }
86bf340d 420 })
f06e48d8
JB
421 }
422 await pool.destroy()
423 })
424
425 it('Verify that worker pool tasks queue are initialized', async () => {
426 const pool = new FixedClusterPool(
427 numberOfWorkers,
428 './tests/worker-files/cluster/testWorker.js'
429 )
430 for (const workerNode of pool.workerNodes) {
431 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 432 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 433 expect(workerNode.tasksQueue.size).toBe(0)
bf9549ae 434 }
fd7ebd49 435 await pool.destroy()
bf9549ae
JB
436 })
437
438 it('Verify that worker pool tasks usage are computed', async () => {
439 const pool = new FixedClusterPool(
440 numberOfWorkers,
441 './tests/worker-files/cluster/testWorker.js'
442 )
09c2d0d3 443 const promises = new Set()
fc027381
JB
444 const maxMultiplier = 2
445 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 446 promises.add(pool.execute())
bf9549ae 447 }
f06e48d8 448 for (const workerNode of pool.workerNodes) {
a4e07f72
JB
449 expect(workerNode.workerUsage).toStrictEqual({
450 tasks: {
451 executed: 0,
452 executing: maxMultiplier,
453 queued: 0,
454 failed: 0
455 },
456 runTime: {
932fc8be 457 aggregate: 0,
a4e07f72
JB
458 average: 0,
459 median: 0,
460 history: expect.any(CircularArray)
461 },
462 waitTime: {
932fc8be 463 aggregate: 0,
a4e07f72
JB
464 average: 0,
465 median: 0,
466 history: expect.any(CircularArray)
467 },
5df69fab
JB
468 elu: {
469 idle: {
470 aggregate: 0,
471 average: 0,
472 median: 0,
473 history: expect.any(CircularArray)
474 },
475 active: {
476 aggregate: 0,
477 average: 0,
478 median: 0,
479 history: expect.any(CircularArray)
480 },
481 utilization: 0
482 }
86bf340d 483 })
bf9549ae
JB
484 }
485 await Promise.all(promises)
f06e48d8 486 for (const workerNode of pool.workerNodes) {
a4e07f72
JB
487 expect(workerNode.workerUsage).toStrictEqual({
488 tasks: {
489 executed: maxMultiplier,
490 executing: 0,
491 queued: 0,
492 failed: 0
493 },
494 runTime: {
932fc8be 495 aggregate: 0,
a4e07f72
JB
496 average: 0,
497 median: 0,
498 history: expect.any(CircularArray)
499 },
500 waitTime: {
932fc8be 501 aggregate: 0,
a4e07f72
JB
502 average: 0,
503 median: 0,
504 history: expect.any(CircularArray)
505 },
5df69fab
JB
506 elu: {
507 idle: {
508 aggregate: 0,
509 average: 0,
510 median: 0,
511 history: expect.any(CircularArray)
512 },
513 active: {
514 aggregate: 0,
515 average: 0,
516 median: 0,
517 history: expect.any(CircularArray)
518 },
519 utilization: 0
520 }
86bf340d 521 })
bf9549ae 522 }
fd7ebd49 523 await pool.destroy()
bf9549ae
JB
524 })
525
ee11a4a2 526 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 527 const pool = new DynamicThreadPool(
9e619829 528 numberOfWorkers,
8f4878b7 529 numberOfWorkers,
9e619829
JB
530 './tests/worker-files/thread/testWorker.js'
531 )
09c2d0d3 532 const promises = new Set()
ee9f5295
JB
533 const maxMultiplier = 2
534 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 535 promises.add(pool.execute())
9e619829
JB
536 }
537 await Promise.all(promises)
f06e48d8 538 for (const workerNode of pool.workerNodes) {
a4e07f72
JB
539 expect(workerNode.workerUsage).toStrictEqual({
540 tasks: {
541 executed: expect.any(Number),
542 executing: 0,
543 queued: 0,
544 failed: 0
545 },
546 runTime: {
932fc8be 547 aggregate: 0,
a4e07f72
JB
548 average: 0,
549 median: 0,
550 history: expect.any(CircularArray)
551 },
552 waitTime: {
932fc8be 553 aggregate: 0,
a4e07f72
JB
554 average: 0,
555 median: 0,
556 history: expect.any(CircularArray)
557 },
5df69fab
JB
558 elu: {
559 idle: {
560 aggregate: 0,
561 average: 0,
562 median: 0,
563 history: expect.any(CircularArray)
564 },
565 active: {
566 aggregate: 0,
567 average: 0,
568 median: 0,
569 history: expect.any(CircularArray)
570 },
571 utilization: 0
572 }
86bf340d 573 })
a4e07f72
JB
574 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
575 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
576 maxMultiplier
577 )
9e619829
JB
578 }
579 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 580 for (const workerNode of pool.workerNodes) {
a4e07f72
JB
581 expect(workerNode.workerUsage).toStrictEqual({
582 tasks: {
583 executed: 0,
584 executing: 0,
585 queued: 0,
586 failed: 0
587 },
588 runTime: {
932fc8be 589 aggregate: 0,
a4e07f72
JB
590 average: 0,
591 median: 0,
592 history: expect.any(CircularArray)
593 },
594 waitTime: {
932fc8be 595 aggregate: 0,
a4e07f72
JB
596 average: 0,
597 median: 0,
598 history: expect.any(CircularArray)
599 },
5df69fab
JB
600 elu: {
601 idle: {
602 aggregate: 0,
603 average: 0,
604 median: 0,
605 history: expect.any(CircularArray)
606 },
607 active: {
608 aggregate: 0,
609 average: 0,
610 median: 0,
611 history: expect.any(CircularArray)
612 },
613 utilization: 0
614 }
86bf340d 615 })
a4e07f72
JB
616 expect(workerNode.workerUsage.runTime.history.length).toBe(0)
617 expect(workerNode.workerUsage.waitTime.history.length).toBe(0)
ee11a4a2 618 }
fd7ebd49 619 await pool.destroy()
ee11a4a2
JB
620 })
621
164d950a
JB
622 it("Verify that pool event emitter 'full' event can register a callback", async () => {
623 const pool = new DynamicThreadPool(
624 numberOfWorkers,
625 numberOfWorkers,
626 './tests/worker-files/thread/testWorker.js'
627 )
09c2d0d3 628 const promises = new Set()
164d950a 629 let poolFull = 0
d46660cd
JB
630 let poolInfo
631 pool.emitter.on(PoolEvents.full, info => {
632 ++poolFull
633 poolInfo = info
634 })
164d950a 635 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 636 promises.add(pool.execute())
164d950a
JB
637 }
638 await Promise.all(promises)
594bfb84 639 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
fc027381
JB
640 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
641 expect(poolFull).toBe(numberOfWorkers * 2)
d46660cd
JB
642 expect(poolInfo).toStrictEqual({
643 type: PoolTypes.dynamic,
644 worker: WorkerTypes.thread,
645 minSize: expect.any(Number),
646 maxSize: expect.any(Number),
647 workerNodes: expect.any(Number),
648 idleWorkerNodes: expect.any(Number),
649 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
650 executedTasks: expect.any(Number),
651 executingTasks: expect.any(Number),
d46660cd 652 queuedTasks: expect.any(Number),
a4e07f72
JB
653 maxQueuedTasks: expect.any(Number),
654 failedTasks: expect.any(Number)
d46660cd 655 })
164d950a
JB
656 await pool.destroy()
657 })
658
cf597bc5 659 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
660 const pool = new FixedThreadPool(
661 numberOfWorkers,
662 './tests/worker-files/thread/testWorker.js'
663 )
09c2d0d3 664 const promises = new Set()
7c0ba920 665 let poolBusy = 0
d46660cd
JB
666 let poolInfo
667 pool.emitter.on(PoolEvents.busy, info => {
668 ++poolBusy
669 poolInfo = info
670 })
7c0ba920 671 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 672 promises.add(pool.execute())
7c0ba920 673 }
cf597bc5 674 await Promise.all(promises)
14916bf9
JB
675 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
676 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
677 expect(poolBusy).toBe(numberOfWorkers + 1)
d46660cd
JB
678 expect(poolInfo).toStrictEqual({
679 type: PoolTypes.fixed,
680 worker: WorkerTypes.thread,
681 minSize: expect.any(Number),
682 maxSize: expect.any(Number),
683 workerNodes: expect.any(Number),
684 idleWorkerNodes: expect.any(Number),
685 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
686 executedTasks: expect.any(Number),
687 executingTasks: expect.any(Number),
d46660cd 688 queuedTasks: expect.any(Number),
a4e07f72
JB
689 maxQueuedTasks: expect.any(Number),
690 failedTasks: expect.any(Number)
d46660cd 691 })
fd7ebd49 692 await pool.destroy()
7c0ba920 693 })
70a4f5ea
JB
694
695 it('Verify that multiple tasks worker is working', async () => {
696 const pool = new DynamicClusterPool(
697 numberOfWorkers,
698 numberOfWorkers * 2,
699 './tests/worker-files/cluster/testMultiTasksWorker.js'
700 )
701 const data = { n: 10 }
82888165
JB
702 const result0 = await pool.execute(data)
703 expect(result0).toBe(false)
70a4f5ea
JB
704 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
705 expect(result1).toBe(false)
706 const result2 = await pool.execute(data, 'factorial')
707 expect(result2).toBe(3628800)
708 const result3 = await pool.execute(data, 'fibonacci')
709 expect(result3).toBe(89)
710 })
3ec964d6 711})