Merge branch 'master' into worker-info
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
6b27d407 8 WorkerChoiceStrategies,
184855e6
JB
9 PoolTypes,
10 WorkerTypes
cdace0e5 11} = require('../../../lib')
78099a15 12const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 13const { Queue } = require('../../../lib/queue')
e1ffb94f
JB
14
15describe('Abstract pool test suite', () => {
fc027381 16 const numberOfWorkers = 2
a8884ffd 17 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 18 removeAllWorker () {
d4aeae5a 19 this.workerNodes = []
c923ce56 20 this.promiseResponseMap.clear()
e1ffb94f 21 }
3ec964d6 22 }
a8884ffd 23 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
24 isMain () {
25 return false
26 }
3ec964d6 27 }
3ec964d6 28
3ec964d6 29 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
30 expect(
31 () =>
a8884ffd 32 new StubPoolWithIsMain(
7c0ba920 33 numberOfWorkers,
8d3782fa
JB
34 './tests/worker-files/thread/testWorker.js',
35 {
36 errorHandler: e => console.error(e)
37 }
38 )
d4aeae5a 39 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 40 })
c510fea7
APA
41
42 it('Verify that filePath is checked', () => {
292ad316
JB
43 const expectedError = new Error(
44 'Please specify a file with a worker implementation'
45 )
7c0ba920 46 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 47 expectedError
8d3782fa 48 )
7c0ba920 49 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 50 expectedError
8d3782fa
JB
51 )
52 })
53
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 56 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
57 )
58 })
59
60 it('Verify that a negative number of workers is checked', () => {
61 expect(
62 () =>
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 ).toThrowError(
473c717a
JB
65 new RangeError(
66 'Cannot instantiate a pool with a negative number of workers'
67 )
8d3782fa
JB
68 )
69 })
70
71 it('Verify that a non integer number of workers is checked', () => {
72 expect(
73 () =>
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 ).toThrowError(
473c717a 76 new TypeError(
0d80593b 77 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
78 )
79 )
c510fea7 80 })
7c0ba920 81
fd7ebd49 82 it('Verify that pool options are checked', async () => {
7c0ba920
JB
83 let pool = new FixedThreadPool(
84 numberOfWorkers,
85 './tests/worker-files/thread/testWorker.js'
86 )
7c0ba920 87 expect(pool.emitter).toBeDefined()
1f68cede
JB
88 expect(pool.opts.enableEvents).toBe(true)
89 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 90 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 91 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
92 expect(pool.opts.workerChoiceStrategy).toBe(
93 WorkerChoiceStrategies.ROUND_ROBIN
94 )
da309861 95 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 96 runTime: { median: false },
5df69fab
JB
97 waitTime: { median: false },
98 elu: { median: false }
da309861 99 })
35cf1c03
JB
100 expect(pool.opts.messageHandler).toBeUndefined()
101 expect(pool.opts.errorHandler).toBeUndefined()
102 expect(pool.opts.onlineHandler).toBeUndefined()
103 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 104 await pool.destroy()
35cf1c03 105 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
106 pool = new FixedThreadPool(
107 numberOfWorkers,
108 './tests/worker-files/thread/testWorker.js',
109 {
e4543b14 110 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 111 workerChoiceStrategyOptions: {
932fc8be 112 runTime: { median: true },
fc027381 113 weights: { 0: 300, 1: 200 }
49be33fe 114 },
35cf1c03 115 enableEvents: false,
1f68cede 116 restartWorkerOnError: false,
ff733df7 117 enableTasksQueue: true,
d4aeae5a 118 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
119 messageHandler: testHandler,
120 errorHandler: testHandler,
121 onlineHandler: testHandler,
122 exitHandler: testHandler
7c0ba920
JB
123 }
124 )
7c0ba920 125 expect(pool.emitter).toBeUndefined()
1f68cede
JB
126 expect(pool.opts.enableEvents).toBe(false)
127 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 128 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 129 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 130 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 131 WorkerChoiceStrategies.LEAST_USED
e843b904 132 )
da309861 133 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 134 runTime: { median: true },
fc027381 135 weights: { 0: 300, 1: 200 }
da309861 136 })
35cf1c03
JB
137 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
138 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
139 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
140 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 141 await pool.destroy()
7c0ba920
JB
142 })
143
a20f0ba5 144 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
145 expect(
146 () =>
147 new FixedThreadPool(
148 numberOfWorkers,
149 './tests/worker-files/thread/testWorker.js',
150 {
f0d7f803 151 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
152 }
153 )
f0d7f803 154 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
d4aeae5a
JB
155 expect(
156 () =>
157 new FixedThreadPool(
158 numberOfWorkers,
159 './tests/worker-files/thread/testWorker.js',
160 {
f0d7f803 161 workerChoiceStrategyOptions: 'invalidOptions'
d4aeae5a
JB
162 }
163 )
f0d7f803
JB
164 ).toThrowError(
165 'Invalid worker choice strategy options: must be a plain object'
166 )
49be33fe
JB
167 expect(
168 () =>
169 new FixedThreadPool(
170 numberOfWorkers,
171 './tests/worker-files/thread/testWorker.js',
172 {
173 workerChoiceStrategyOptions: { weights: {} }
174 }
175 )
176 ).toThrowError(
177 'Invalid worker choice strategy options: must have a weight for each worker node'
178 )
f0d7f803
JB
179 expect(
180 () =>
181 new FixedThreadPool(
182 numberOfWorkers,
183 './tests/worker-files/thread/testWorker.js',
184 {
185 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
186 }
187 )
188 ).toThrowError(
189 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
190 )
191 expect(
192 () =>
193 new FixedThreadPool(
194 numberOfWorkers,
195 './tests/worker-files/thread/testWorker.js',
196 {
197 enableTasksQueue: true,
198 tasksQueueOptions: { concurrency: 0 }
199 }
200 )
201 ).toThrowError("Invalid worker tasks concurrency '0'")
202 expect(
203 () =>
204 new FixedThreadPool(
205 numberOfWorkers,
206 './tests/worker-files/thread/testWorker.js',
207 {
208 enableTasksQueue: true,
209 tasksQueueOptions: 'invalidTasksQueueOptions'
210 }
211 )
212 ).toThrowError('Invalid tasks queue options: must be a plain object')
213 expect(
214 () =>
215 new FixedThreadPool(
216 numberOfWorkers,
217 './tests/worker-files/thread/testWorker.js',
218 {
219 enableTasksQueue: true,
220 tasksQueueOptions: { concurrency: 0.2 }
221 }
222 )
223 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
d4aeae5a
JB
224 })
225
a20f0ba5
JB
226 it('Verify that worker choice strategy options can be set', async () => {
227 const pool = new FixedThreadPool(
228 numberOfWorkers,
229 './tests/worker-files/thread/testWorker.js',
230 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
231 )
232 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 233 runTime: { median: false },
5df69fab
JB
234 waitTime: { median: false },
235 elu: { median: false }
a20f0ba5
JB
236 })
237 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
238 .workerChoiceStrategies) {
86bf340d 239 expect(workerChoiceStrategy.opts).toStrictEqual({
932fc8be 240 runTime: { median: false },
5df69fab
JB
241 waitTime: { median: false },
242 elu: { median: false }
86bf340d 243 })
a20f0ba5 244 }
87de9ff5
JB
245 expect(
246 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
247 ).toStrictEqual({
932fc8be
JB
248 runTime: {
249 aggregate: true,
250 average: true,
251 median: false
252 },
253 waitTime: {
254 aggregate: false,
255 average: false,
256 median: false
257 },
5df69fab 258 elu: {
9adcefab
JB
259 aggregate: true,
260 average: true,
5df69fab
JB
261 median: false
262 }
86bf340d 263 })
9adcefab
JB
264 pool.setWorkerChoiceStrategyOptions({
265 runTime: { median: true },
266 elu: { median: true }
267 })
a20f0ba5 268 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
269 runTime: { median: true },
270 elu: { median: true }
a20f0ba5
JB
271 })
272 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
273 .workerChoiceStrategies) {
932fc8be 274 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
275 runTime: { median: true },
276 elu: { median: true }
932fc8be 277 })
a20f0ba5 278 }
87de9ff5
JB
279 expect(
280 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
281 ).toStrictEqual({
932fc8be
JB
282 runTime: {
283 aggregate: true,
284 average: false,
285 median: true
286 },
287 waitTime: {
288 aggregate: false,
289 average: false,
290 median: false
291 },
5df69fab 292 elu: {
9adcefab 293 aggregate: true,
5df69fab 294 average: false,
9adcefab 295 median: true
5df69fab 296 }
86bf340d 297 })
9adcefab
JB
298 pool.setWorkerChoiceStrategyOptions({
299 runTime: { median: false },
300 elu: { median: false }
301 })
a20f0ba5 302 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
303 runTime: { median: false },
304 elu: { median: false }
a20f0ba5
JB
305 })
306 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
307 .workerChoiceStrategies) {
932fc8be 308 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
309 runTime: { median: false },
310 elu: { median: false }
932fc8be 311 })
a20f0ba5 312 }
87de9ff5
JB
313 expect(
314 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
315 ).toStrictEqual({
932fc8be
JB
316 runTime: {
317 aggregate: true,
318 average: true,
319 median: false
320 },
321 waitTime: {
322 aggregate: false,
323 average: false,
324 median: false
325 },
5df69fab 326 elu: {
9adcefab
JB
327 aggregate: true,
328 average: true,
5df69fab
JB
329 median: false
330 }
86bf340d 331 })
1f95d544
JB
332 expect(() =>
333 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
334 ).toThrowError(
335 'Invalid worker choice strategy options: must be a plain object'
336 )
337 expect(() =>
338 pool.setWorkerChoiceStrategyOptions({ weights: {} })
339 ).toThrowError(
340 'Invalid worker choice strategy options: must have a weight for each worker node'
341 )
342 expect(() =>
343 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
344 ).toThrowError(
345 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
346 )
a20f0ba5
JB
347 await pool.destroy()
348 })
349
350 it('Verify that tasks queue can be enabled/disabled', async () => {
351 const pool = new FixedThreadPool(
352 numberOfWorkers,
353 './tests/worker-files/thread/testWorker.js'
354 )
355 expect(pool.opts.enableTasksQueue).toBe(false)
356 expect(pool.opts.tasksQueueOptions).toBeUndefined()
357 pool.enableTasksQueue(true)
358 expect(pool.opts.enableTasksQueue).toBe(true)
359 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
360 pool.enableTasksQueue(true, { concurrency: 2 })
361 expect(pool.opts.enableTasksQueue).toBe(true)
362 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
363 pool.enableTasksQueue(false)
364 expect(pool.opts.enableTasksQueue).toBe(false)
365 expect(pool.opts.tasksQueueOptions).toBeUndefined()
366 await pool.destroy()
367 })
368
369 it('Verify that tasks queue options can be set', async () => {
370 const pool = new FixedThreadPool(
371 numberOfWorkers,
372 './tests/worker-files/thread/testWorker.js',
373 { enableTasksQueue: true }
374 )
375 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
376 pool.setTasksQueueOptions({ concurrency: 2 })
377 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
f0d7f803
JB
378 expect(() =>
379 pool.setTasksQueueOptions('invalidTasksQueueOptions')
380 ).toThrowError('Invalid tasks queue options: must be a plain object')
a20f0ba5
JB
381 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
382 "Invalid worker tasks concurrency '0'"
383 )
f0d7f803
JB
384 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
385 'Invalid worker tasks concurrency: must be an integer'
386 )
a20f0ba5
JB
387 await pool.destroy()
388 })
389
6b27d407
JB
390 it('Verify that pool info is set', async () => {
391 let pool = new FixedThreadPool(
392 numberOfWorkers,
393 './tests/worker-files/thread/testWorker.js'
394 )
395 expect(pool.info).toStrictEqual({
396 type: PoolTypes.fixed,
184855e6 397 worker: WorkerTypes.thread,
6b27d407
JB
398 minSize: numberOfWorkers,
399 maxSize: numberOfWorkers,
afe0d5bf 400 utilization: 0,
6b27d407
JB
401 workerNodes: numberOfWorkers,
402 idleWorkerNodes: numberOfWorkers,
403 busyWorkerNodes: 0,
a4e07f72
JB
404 executedTasks: 0,
405 executingTasks: 0,
6b27d407 406 queuedTasks: 0,
a4e07f72
JB
407 maxQueuedTasks: 0,
408 failedTasks: 0
6b27d407 409 })
f59e1027
JB
410 for (const workerNode of pool.workerNodes) {
411 console.log('thread:workerNode.info', workerNode.info)
412 }
6b27d407
JB
413 await pool.destroy()
414 pool = new DynamicClusterPool(
415 numberOfWorkers,
416 numberOfWorkers * 2,
f59e1027 417 './tests/worker-files/cluster/testWorker.js'
6b27d407
JB
418 )
419 expect(pool.info).toStrictEqual({
420 type: PoolTypes.dynamic,
184855e6 421 worker: WorkerTypes.cluster,
6b27d407
JB
422 minSize: numberOfWorkers,
423 maxSize: numberOfWorkers * 2,
afe0d5bf 424 utilization: 0,
6b27d407
JB
425 workerNodes: numberOfWorkers,
426 idleWorkerNodes: numberOfWorkers,
427 busyWorkerNodes: 0,
a4e07f72
JB
428 executedTasks: 0,
429 executingTasks: 0,
6b27d407 430 queuedTasks: 0,
a4e07f72
JB
431 maxQueuedTasks: 0,
432 failedTasks: 0
6b27d407 433 })
f59e1027
JB
434 for (const workerNode of pool.workerNodes) {
435 console.log('cluster:workerNode.info', workerNode.info)
436 }
6b27d407
JB
437 await pool.destroy()
438 })
439
9d16d33e 440 it('Simulate worker not found', async () => {
a8884ffd 441 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4 442 numberOfWorkers,
f59e1027 443 './tests/worker-files/thread/testWorker.js',
10fcfaf4 444 {
10fcfaf4
JB
445 errorHandler: e => console.error(e)
446 }
447 )
d4aeae5a 448 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
449 // Simulate worker not found.
450 pool.removeAllWorker()
d4aeae5a 451 expect(pool.workerNodes.length).toBe(0)
fd7ebd49 452 await pool.destroy()
bf9549ae
JB
453 })
454
fd7ebd49 455 it('Verify that worker pool tasks usage are initialized', async () => {
bf9549ae
JB
456 const pool = new FixedClusterPool(
457 numberOfWorkers,
458 './tests/worker-files/cluster/testWorker.js'
459 )
f06e48d8 460 for (const workerNode of pool.workerNodes) {
f59e1027 461 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
462 tasks: {
463 executed: 0,
464 executing: 0,
465 queued: 0,
df593701 466 maxQueued: 0,
a4e07f72
JB
467 failed: 0
468 },
469 runTime: {
932fc8be 470 aggregate: 0,
a4e07f72
JB
471 average: 0,
472 median: 0,
473 history: expect.any(CircularArray)
474 },
475 waitTime: {
932fc8be 476 aggregate: 0,
a4e07f72
JB
477 average: 0,
478 median: 0,
479 history: expect.any(CircularArray)
480 },
5df69fab
JB
481 elu: {
482 idle: {
483 aggregate: 0,
484 average: 0,
485 median: 0,
486 history: expect.any(CircularArray)
487 },
488 active: {
489 aggregate: 0,
490 average: 0,
491 median: 0,
492 history: expect.any(CircularArray)
493 },
494 utilization: 0
495 }
86bf340d 496 })
f06e48d8
JB
497 }
498 await pool.destroy()
499 })
500
501 it('Verify that worker pool tasks queue are initialized', async () => {
502 const pool = new FixedClusterPool(
503 numberOfWorkers,
504 './tests/worker-files/cluster/testWorker.js'
505 )
506 for (const workerNode of pool.workerNodes) {
507 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 508 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 509 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 510 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 511 }
fd7ebd49 512 await pool.destroy()
bf9549ae
JB
513 })
514
515 it('Verify that worker pool tasks usage are computed', async () => {
516 const pool = new FixedClusterPool(
517 numberOfWorkers,
518 './tests/worker-files/cluster/testWorker.js'
519 )
09c2d0d3 520 const promises = new Set()
fc027381
JB
521 const maxMultiplier = 2
522 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 523 promises.add(pool.execute())
bf9549ae 524 }
f06e48d8 525 for (const workerNode of pool.workerNodes) {
f59e1027 526 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
527 tasks: {
528 executed: 0,
529 executing: maxMultiplier,
530 queued: 0,
df593701 531 maxQueued: 0,
a4e07f72
JB
532 failed: 0
533 },
534 runTime: {
932fc8be 535 aggregate: 0,
a4e07f72
JB
536 average: 0,
537 median: 0,
538 history: expect.any(CircularArray)
539 },
540 waitTime: {
932fc8be 541 aggregate: 0,
a4e07f72
JB
542 average: 0,
543 median: 0,
544 history: expect.any(CircularArray)
545 },
5df69fab
JB
546 elu: {
547 idle: {
548 aggregate: 0,
549 average: 0,
550 median: 0,
551 history: expect.any(CircularArray)
552 },
553 active: {
554 aggregate: 0,
555 average: 0,
556 median: 0,
557 history: expect.any(CircularArray)
558 },
559 utilization: 0
560 }
86bf340d 561 })
bf9549ae
JB
562 }
563 await Promise.all(promises)
f06e48d8 564 for (const workerNode of pool.workerNodes) {
f59e1027 565 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
566 tasks: {
567 executed: maxMultiplier,
568 executing: 0,
569 queued: 0,
df593701 570 maxQueued: 0,
a4e07f72
JB
571 failed: 0
572 },
573 runTime: {
932fc8be 574 aggregate: 0,
a4e07f72
JB
575 average: 0,
576 median: 0,
577 history: expect.any(CircularArray)
578 },
579 waitTime: {
932fc8be 580 aggregate: 0,
a4e07f72
JB
581 average: 0,
582 median: 0,
583 history: expect.any(CircularArray)
584 },
5df69fab
JB
585 elu: {
586 idle: {
587 aggregate: 0,
588 average: 0,
589 median: 0,
590 history: expect.any(CircularArray)
591 },
592 active: {
593 aggregate: 0,
594 average: 0,
595 median: 0,
596 history: expect.any(CircularArray)
597 },
598 utilization: 0
599 }
86bf340d 600 })
bf9549ae 601 }
fd7ebd49 602 await pool.destroy()
bf9549ae
JB
603 })
604
ee11a4a2 605 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 606 const pool = new DynamicThreadPool(
9e619829 607 numberOfWorkers,
8f4878b7 608 numberOfWorkers,
9e619829
JB
609 './tests/worker-files/thread/testWorker.js'
610 )
09c2d0d3 611 const promises = new Set()
ee9f5295
JB
612 const maxMultiplier = 2
613 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 614 promises.add(pool.execute())
9e619829
JB
615 }
616 await Promise.all(promises)
f06e48d8 617 for (const workerNode of pool.workerNodes) {
f59e1027 618 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
619 tasks: {
620 executed: expect.any(Number),
621 executing: 0,
622 queued: 0,
df593701 623 maxQueued: 0,
a4e07f72
JB
624 failed: 0
625 },
626 runTime: {
932fc8be 627 aggregate: 0,
a4e07f72
JB
628 average: 0,
629 median: 0,
630 history: expect.any(CircularArray)
631 },
632 waitTime: {
932fc8be 633 aggregate: 0,
a4e07f72
JB
634 average: 0,
635 median: 0,
636 history: expect.any(CircularArray)
637 },
5df69fab
JB
638 elu: {
639 idle: {
640 aggregate: 0,
641 average: 0,
642 median: 0,
643 history: expect.any(CircularArray)
644 },
645 active: {
646 aggregate: 0,
647 average: 0,
648 median: 0,
649 history: expect.any(CircularArray)
650 },
651 utilization: 0
652 }
86bf340d 653 })
f59e1027
JB
654 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
655 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
9e619829
JB
656 }
657 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 658 for (const workerNode of pool.workerNodes) {
f59e1027 659 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
660 tasks: {
661 executed: 0,
662 executing: 0,
663 queued: 0,
df593701 664 maxQueued: 0,
a4e07f72
JB
665 failed: 0
666 },
667 runTime: {
932fc8be 668 aggregate: 0,
a4e07f72
JB
669 average: 0,
670 median: 0,
671 history: expect.any(CircularArray)
672 },
673 waitTime: {
932fc8be 674 aggregate: 0,
a4e07f72
JB
675 average: 0,
676 median: 0,
677 history: expect.any(CircularArray)
678 },
5df69fab
JB
679 elu: {
680 idle: {
681 aggregate: 0,
682 average: 0,
683 median: 0,
684 history: expect.any(CircularArray)
685 },
686 active: {
687 aggregate: 0,
688 average: 0,
689 median: 0,
690 history: expect.any(CircularArray)
691 },
692 utilization: 0
693 }
86bf340d 694 })
f59e1027
JB
695 expect(workerNode.usage.runTime.history.length).toBe(0)
696 expect(workerNode.usage.waitTime.history.length).toBe(0)
ee11a4a2 697 }
fd7ebd49 698 await pool.destroy()
ee11a4a2
JB
699 })
700
164d950a
JB
701 it("Verify that pool event emitter 'full' event can register a callback", async () => {
702 const pool = new DynamicThreadPool(
703 numberOfWorkers,
704 numberOfWorkers,
705 './tests/worker-files/thread/testWorker.js'
706 )
09c2d0d3 707 const promises = new Set()
164d950a 708 let poolFull = 0
d46660cd
JB
709 let poolInfo
710 pool.emitter.on(PoolEvents.full, info => {
711 ++poolFull
712 poolInfo = info
713 })
164d950a 714 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 715 promises.add(pool.execute())
164d950a
JB
716 }
717 await Promise.all(promises)
594bfb84 718 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
fc027381
JB
719 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
720 expect(poolFull).toBe(numberOfWorkers * 2)
d46660cd
JB
721 expect(poolInfo).toStrictEqual({
722 type: PoolTypes.dynamic,
723 worker: WorkerTypes.thread,
724 minSize: expect.any(Number),
725 maxSize: expect.any(Number),
afe0d5bf 726 utilization: 0,
d46660cd
JB
727 workerNodes: expect.any(Number),
728 idleWorkerNodes: expect.any(Number),
729 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
730 executedTasks: expect.any(Number),
731 executingTasks: expect.any(Number),
d46660cd 732 queuedTasks: expect.any(Number),
a4e07f72
JB
733 maxQueuedTasks: expect.any(Number),
734 failedTasks: expect.any(Number)
d46660cd 735 })
164d950a
JB
736 await pool.destroy()
737 })
738
cf597bc5 739 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
740 const pool = new FixedThreadPool(
741 numberOfWorkers,
742 './tests/worker-files/thread/testWorker.js'
743 )
09c2d0d3 744 const promises = new Set()
7c0ba920 745 let poolBusy = 0
d46660cd
JB
746 let poolInfo
747 pool.emitter.on(PoolEvents.busy, info => {
748 ++poolBusy
749 poolInfo = info
750 })
7c0ba920 751 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 752 promises.add(pool.execute())
7c0ba920 753 }
cf597bc5 754 await Promise.all(promises)
14916bf9
JB
755 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
756 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
757 expect(poolBusy).toBe(numberOfWorkers + 1)
d46660cd
JB
758 expect(poolInfo).toStrictEqual({
759 type: PoolTypes.fixed,
760 worker: WorkerTypes.thread,
761 minSize: expect.any(Number),
762 maxSize: expect.any(Number),
afe0d5bf 763 utilization: 0,
d46660cd
JB
764 workerNodes: expect.any(Number),
765 idleWorkerNodes: expect.any(Number),
766 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
767 executedTasks: expect.any(Number),
768 executingTasks: expect.any(Number),
d46660cd 769 queuedTasks: expect.any(Number),
a4e07f72
JB
770 maxQueuedTasks: expect.any(Number),
771 failedTasks: expect.any(Number)
d46660cd 772 })
fd7ebd49 773 await pool.destroy()
7c0ba920 774 })
70a4f5ea
JB
775
776 it('Verify that multiple tasks worker is working', async () => {
777 const pool = new DynamicClusterPool(
778 numberOfWorkers,
779 numberOfWorkers * 2,
780 './tests/worker-files/cluster/testMultiTasksWorker.js'
781 )
782 const data = { n: 10 }
82888165
JB
783 const result0 = await pool.execute(data)
784 expect(result0).toBe(false)
70a4f5ea
JB
785 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
786 expect(result1).toBe(false)
787 const result2 = await pool.execute(data, 'factorial')
788 expect(result2).toBe(3628800)
789 const result3 = await pool.execute(data, 'fibonacci')
024daf59 790 expect(result3).toBe(55)
70a4f5ea 791 })
3ec964d6 792})