feat: add dynamic pool sizes type check
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
85aeb3f3 1const { MessageChannel } = require('worker_threads')
a61a0724 2const { expect } = require('expect')
e843b904 3const {
70a4f5ea 4 DynamicClusterPool,
9e619829 5 DynamicThreadPool,
aee46736 6 FixedClusterPool,
e843b904 7 FixedThreadPool,
aee46736 8 PoolEvents,
184855e6 9 PoolTypes,
3d6dd312 10 WorkerChoiceStrategies,
184855e6 11 WorkerTypes
cdace0e5 12} = require('../../../lib')
78099a15 13const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 14const { Queue } = require('../../../lib/queue')
23ccf9d7 15const { version } = require('../../../package.json')
2431bdb4 16const { waitPoolEvents } = require('../../test-utils')
e1ffb94f
JB
17
18describe('Abstract pool test suite', () => {
fc027381 19 const numberOfWorkers = 2
a8884ffd 20 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
21 isMain () {
22 return false
23 }
3ec964d6 24 }
3ec964d6 25
3ec964d6 26 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
27 expect(
28 () =>
a8884ffd 29 new StubPoolWithIsMain(
7c0ba920 30 numberOfWorkers,
8d3782fa
JB
31 './tests/worker-files/thread/testWorker.js',
32 {
33 errorHandler: e => console.error(e)
34 }
35 )
d4aeae5a 36 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 37 })
c510fea7
APA
38
39 it('Verify that filePath is checked', () => {
292ad316
JB
40 const expectedError = new Error(
41 'Please specify a file with a worker implementation'
42 )
7c0ba920 43 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 44 expectedError
8d3782fa 45 )
7c0ba920 46 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 47 expectedError
8d3782fa 48 )
3d6dd312
JB
49 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
50 expectedError
51 )
52 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
53 expectedError
54 )
55 expect(
56 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
57 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
58 })
59
60 it('Verify that numberOfWorkers is checked', () => {
61 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 62 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
63 )
64 })
65
66 it('Verify that a negative number of workers is checked', () => {
67 expect(
68 () =>
69 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
70 ).toThrowError(
473c717a
JB
71 new RangeError(
72 'Cannot instantiate a pool with a negative number of workers'
73 )
8d3782fa
JB
74 )
75 })
76
77 it('Verify that a non integer number of workers is checked', () => {
78 expect(
79 () =>
80 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
81 ).toThrowError(
473c717a 82 new TypeError(
0d80593b 83 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
84 )
85 )
c510fea7 86 })
7c0ba920 87
216541b6 88 it('Verify that dynamic pool sizing is checked', () => {
2761efb4
JB
89 expect(
90 () =>
91 new DynamicThreadPool(
92 0.5,
93 1,
94 './tests/worker-files/thread/testWorker.js'
95 )
96 ).toThrowError(
97 new TypeError(
98 'Cannot instantiate a pool with a non safe integer number of workers'
99 )
100 )
101 expect(
102 () =>
103 new DynamicClusterPool(
104 0,
105 0.5,
106 './tests/worker-files/thread/testWorker.js'
107 )
108 ).toThrowError(
109 new TypeError(
110 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
111 )
112 )
2431bdb4
JB
113 expect(
114 () =>
115 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
116 ).toThrowError(
117 new RangeError(
118 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
119 )
120 )
121 expect(
122 () =>
2761efb4
JB
123 new DynamicClusterPool(
124 1,
125 1,
126 './tests/worker-files/thread/testWorker.js'
127 )
2431bdb4
JB
128 ).toThrowError(
129 new RangeError(
130 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
131 )
132 )
21f710aa
JB
133 expect(
134 () =>
135 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
136 ).toThrowError(
137 new RangeError(
b97d82d8 138 'Cannot instantiate a dynamic pool with a pool size equal to zero'
21f710aa
JB
139 )
140 )
2431bdb4
JB
141 })
142
fd7ebd49 143 it('Verify that pool options are checked', async () => {
7c0ba920
JB
144 let pool = new FixedThreadPool(
145 numberOfWorkers,
146 './tests/worker-files/thread/testWorker.js'
147 )
7c0ba920 148 expect(pool.emitter).toBeDefined()
1f68cede
JB
149 expect(pool.opts.enableEvents).toBe(true)
150 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 151 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 152 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
153 expect(pool.opts.workerChoiceStrategy).toBe(
154 WorkerChoiceStrategies.ROUND_ROBIN
155 )
da309861 156 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 157 runTime: { median: false },
5df69fab
JB
158 waitTime: { median: false },
159 elu: { median: false }
da309861 160 })
35cf1c03
JB
161 expect(pool.opts.messageHandler).toBeUndefined()
162 expect(pool.opts.errorHandler).toBeUndefined()
163 expect(pool.opts.onlineHandler).toBeUndefined()
164 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 165 await pool.destroy()
35cf1c03 166 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
167 pool = new FixedThreadPool(
168 numberOfWorkers,
169 './tests/worker-files/thread/testWorker.js',
170 {
e4543b14 171 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 172 workerChoiceStrategyOptions: {
932fc8be 173 runTime: { median: true },
fc027381 174 weights: { 0: 300, 1: 200 }
49be33fe 175 },
35cf1c03 176 enableEvents: false,
1f68cede 177 restartWorkerOnError: false,
ff733df7 178 enableTasksQueue: true,
d4aeae5a 179 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
180 messageHandler: testHandler,
181 errorHandler: testHandler,
182 onlineHandler: testHandler,
183 exitHandler: testHandler
7c0ba920
JB
184 }
185 )
7c0ba920 186 expect(pool.emitter).toBeUndefined()
1f68cede
JB
187 expect(pool.opts.enableEvents).toBe(false)
188 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 189 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 190 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 191 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 192 WorkerChoiceStrategies.LEAST_USED
e843b904 193 )
da309861 194 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 195 runTime: { median: true },
fc027381 196 weights: { 0: 300, 1: 200 }
da309861 197 })
35cf1c03
JB
198 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
199 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
200 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
201 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 202 await pool.destroy()
7c0ba920
JB
203 })
204
a20f0ba5 205 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
206 expect(
207 () =>
208 new FixedThreadPool(
209 numberOfWorkers,
210 './tests/worker-files/thread/testWorker.js',
211 {
f0d7f803 212 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
213 }
214 )
f0d7f803 215 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
d4aeae5a
JB
216 expect(
217 () =>
218 new FixedThreadPool(
219 numberOfWorkers,
220 './tests/worker-files/thread/testWorker.js',
221 {
f0d7f803 222 workerChoiceStrategyOptions: 'invalidOptions'
d4aeae5a
JB
223 }
224 )
f0d7f803
JB
225 ).toThrowError(
226 'Invalid worker choice strategy options: must be a plain object'
227 )
49be33fe
JB
228 expect(
229 () =>
230 new FixedThreadPool(
231 numberOfWorkers,
232 './tests/worker-files/thread/testWorker.js',
233 {
234 workerChoiceStrategyOptions: { weights: {} }
235 }
236 )
237 ).toThrowError(
238 'Invalid worker choice strategy options: must have a weight for each worker node'
239 )
f0d7f803
JB
240 expect(
241 () =>
242 new FixedThreadPool(
243 numberOfWorkers,
244 './tests/worker-files/thread/testWorker.js',
245 {
246 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
247 }
248 )
249 ).toThrowError(
250 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
251 )
252 expect(
253 () =>
254 new FixedThreadPool(
255 numberOfWorkers,
256 './tests/worker-files/thread/testWorker.js',
257 {
258 enableTasksQueue: true,
259 tasksQueueOptions: { concurrency: 0 }
260 }
261 )
262 ).toThrowError("Invalid worker tasks concurrency '0'")
263 expect(
264 () =>
265 new FixedThreadPool(
266 numberOfWorkers,
267 './tests/worker-files/thread/testWorker.js',
268 {
269 enableTasksQueue: true,
270 tasksQueueOptions: 'invalidTasksQueueOptions'
271 }
272 )
273 ).toThrowError('Invalid tasks queue options: must be a plain object')
274 expect(
275 () =>
276 new FixedThreadPool(
277 numberOfWorkers,
278 './tests/worker-files/thread/testWorker.js',
279 {
280 enableTasksQueue: true,
281 tasksQueueOptions: { concurrency: 0.2 }
282 }
283 )
284 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
d4aeae5a
JB
285 })
286
2431bdb4 287 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
288 const pool = new FixedThreadPool(
289 numberOfWorkers,
290 './tests/worker-files/thread/testWorker.js',
291 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
292 )
293 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 294 runTime: { median: false },
5df69fab
JB
295 waitTime: { median: false },
296 elu: { median: false }
a20f0ba5
JB
297 })
298 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
299 .workerChoiceStrategies) {
86bf340d 300 expect(workerChoiceStrategy.opts).toStrictEqual({
932fc8be 301 runTime: { median: false },
5df69fab
JB
302 waitTime: { median: false },
303 elu: { median: false }
86bf340d 304 })
a20f0ba5 305 }
87de9ff5
JB
306 expect(
307 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
308 ).toStrictEqual({
932fc8be
JB
309 runTime: {
310 aggregate: true,
311 average: true,
312 median: false
313 },
314 waitTime: {
315 aggregate: false,
316 average: false,
317 median: false
318 },
5df69fab 319 elu: {
9adcefab
JB
320 aggregate: true,
321 average: true,
5df69fab
JB
322 median: false
323 }
86bf340d 324 })
9adcefab
JB
325 pool.setWorkerChoiceStrategyOptions({
326 runTime: { median: true },
327 elu: { median: true }
328 })
a20f0ba5 329 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
330 runTime: { median: true },
331 elu: { median: true }
a20f0ba5
JB
332 })
333 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
334 .workerChoiceStrategies) {
932fc8be 335 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
336 runTime: { median: true },
337 elu: { median: true }
932fc8be 338 })
a20f0ba5 339 }
87de9ff5
JB
340 expect(
341 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
342 ).toStrictEqual({
932fc8be
JB
343 runTime: {
344 aggregate: true,
345 average: false,
346 median: true
347 },
348 waitTime: {
349 aggregate: false,
350 average: false,
351 median: false
352 },
5df69fab 353 elu: {
9adcefab 354 aggregate: true,
5df69fab 355 average: false,
9adcefab 356 median: true
5df69fab 357 }
86bf340d 358 })
9adcefab
JB
359 pool.setWorkerChoiceStrategyOptions({
360 runTime: { median: false },
361 elu: { median: false }
362 })
a20f0ba5 363 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
364 runTime: { median: false },
365 elu: { median: false }
a20f0ba5
JB
366 })
367 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
368 .workerChoiceStrategies) {
932fc8be 369 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
370 runTime: { median: false },
371 elu: { median: false }
932fc8be 372 })
a20f0ba5 373 }
87de9ff5
JB
374 expect(
375 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
376 ).toStrictEqual({
932fc8be
JB
377 runTime: {
378 aggregate: true,
379 average: true,
380 median: false
381 },
382 waitTime: {
383 aggregate: false,
384 average: false,
385 median: false
386 },
5df69fab 387 elu: {
9adcefab
JB
388 aggregate: true,
389 average: true,
5df69fab
JB
390 median: false
391 }
86bf340d 392 })
1f95d544
JB
393 expect(() =>
394 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
395 ).toThrowError(
396 'Invalid worker choice strategy options: must be a plain object'
397 )
398 expect(() =>
399 pool.setWorkerChoiceStrategyOptions({ weights: {} })
400 ).toThrowError(
401 'Invalid worker choice strategy options: must have a weight for each worker node'
402 )
403 expect(() =>
404 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
405 ).toThrowError(
406 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
407 )
a20f0ba5
JB
408 await pool.destroy()
409 })
410
2431bdb4 411 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
412 const pool = new FixedThreadPool(
413 numberOfWorkers,
414 './tests/worker-files/thread/testWorker.js'
415 )
416 expect(pool.opts.enableTasksQueue).toBe(false)
417 expect(pool.opts.tasksQueueOptions).toBeUndefined()
418 pool.enableTasksQueue(true)
419 expect(pool.opts.enableTasksQueue).toBe(true)
420 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
421 pool.enableTasksQueue(true, { concurrency: 2 })
422 expect(pool.opts.enableTasksQueue).toBe(true)
423 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
424 pool.enableTasksQueue(false)
425 expect(pool.opts.enableTasksQueue).toBe(false)
426 expect(pool.opts.tasksQueueOptions).toBeUndefined()
427 await pool.destroy()
428 })
429
2431bdb4 430 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
431 const pool = new FixedThreadPool(
432 numberOfWorkers,
433 './tests/worker-files/thread/testWorker.js',
434 { enableTasksQueue: true }
435 )
436 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
437 pool.setTasksQueueOptions({ concurrency: 2 })
438 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
f0d7f803
JB
439 expect(() =>
440 pool.setTasksQueueOptions('invalidTasksQueueOptions')
441 ).toThrowError('Invalid tasks queue options: must be a plain object')
a20f0ba5
JB
442 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
443 "Invalid worker tasks concurrency '0'"
444 )
f0d7f803
JB
445 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
446 'Invalid worker tasks concurrency: must be an integer'
447 )
a20f0ba5
JB
448 await pool.destroy()
449 })
450
6b27d407
JB
451 it('Verify that pool info is set', async () => {
452 let pool = new FixedThreadPool(
453 numberOfWorkers,
454 './tests/worker-files/thread/testWorker.js'
455 )
2dca6cab
JB
456 expect(pool.info).toStrictEqual({
457 version,
458 type: PoolTypes.fixed,
459 worker: WorkerTypes.thread,
460 ready: true,
461 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
462 minSize: numberOfWorkers,
463 maxSize: numberOfWorkers,
464 workerNodes: numberOfWorkers,
465 idleWorkerNodes: numberOfWorkers,
466 busyWorkerNodes: 0,
467 executedTasks: 0,
468 executingTasks: 0,
469 queuedTasks: 0,
470 maxQueuedTasks: 0,
471 failedTasks: 0
472 })
6b27d407
JB
473 await pool.destroy()
474 pool = new DynamicClusterPool(
2431bdb4 475 Math.floor(numberOfWorkers / 2),
6b27d407 476 numberOfWorkers,
ecdfbdc0 477 './tests/worker-files/cluster/testWorker.js'
6b27d407 478 )
2dca6cab
JB
479 expect(pool.info).toStrictEqual({
480 version,
481 type: PoolTypes.dynamic,
482 worker: WorkerTypes.cluster,
483 ready: true,
484 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
485 minSize: Math.floor(numberOfWorkers / 2),
486 maxSize: numberOfWorkers,
487 workerNodes: Math.floor(numberOfWorkers / 2),
488 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
489 busyWorkerNodes: 0,
490 executedTasks: 0,
491 executingTasks: 0,
492 queuedTasks: 0,
493 maxQueuedTasks: 0,
494 failedTasks: 0
495 })
6b27d407
JB
496 await pool.destroy()
497 })
498
2431bdb4 499 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
500 const pool = new FixedClusterPool(
501 numberOfWorkers,
502 './tests/worker-files/cluster/testWorker.js'
503 )
f06e48d8 504 for (const workerNode of pool.workerNodes) {
465b2940 505 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
506 tasks: {
507 executed: 0,
508 executing: 0,
509 queued: 0,
df593701 510 maxQueued: 0,
a4e07f72
JB
511 failed: 0
512 },
513 runTime: {
a4e07f72
JB
514 history: expect.any(CircularArray)
515 },
516 waitTime: {
a4e07f72
JB
517 history: expect.any(CircularArray)
518 },
5df69fab
JB
519 elu: {
520 idle: {
5df69fab
JB
521 history: expect.any(CircularArray)
522 },
523 active: {
5df69fab 524 history: expect.any(CircularArray)
f7510105 525 }
5df69fab 526 }
86bf340d 527 })
f06e48d8
JB
528 }
529 await pool.destroy()
530 })
531
2431bdb4
JB
532 it('Verify that pool worker tasks queue are initialized', async () => {
533 let pool = new FixedClusterPool(
f06e48d8
JB
534 numberOfWorkers,
535 './tests/worker-files/cluster/testWorker.js'
536 )
537 for (const workerNode of pool.workerNodes) {
538 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 539 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 540 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 541 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 542 }
fd7ebd49 543 await pool.destroy()
2431bdb4
JB
544 pool = new DynamicThreadPool(
545 Math.floor(numberOfWorkers / 2),
546 numberOfWorkers,
547 './tests/worker-files/thread/testWorker.js'
548 )
549 for (const workerNode of pool.workerNodes) {
550 expect(workerNode.tasksQueue).toBeDefined()
551 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
552 expect(workerNode.tasksQueue.size).toBe(0)
553 expect(workerNode.tasksQueue.maxSize).toBe(0)
554 }
555 })
556
557 it('Verify that pool worker info are initialized', async () => {
558 let pool = new FixedClusterPool(
559 numberOfWorkers,
560 './tests/worker-files/cluster/testWorker.js'
561 )
2dca6cab
JB
562 for (const workerNode of pool.workerNodes) {
563 expect(workerNode.info).toStrictEqual({
564 id: expect.any(Number),
565 type: WorkerTypes.cluster,
566 dynamic: false,
567 ready: true
568 })
569 }
2431bdb4
JB
570 await pool.destroy()
571 pool = new DynamicThreadPool(
572 Math.floor(numberOfWorkers / 2),
573 numberOfWorkers,
574 './tests/worker-files/thread/testWorker.js'
575 )
2dca6cab
JB
576 for (const workerNode of pool.workerNodes) {
577 expect(workerNode.info).toStrictEqual({
578 id: expect.any(Number),
579 type: WorkerTypes.thread,
580 dynamic: false,
85aeb3f3
JB
581 ready: true,
582 messageChannel: expect.any(MessageChannel)
2dca6cab
JB
583 })
584 }
bf9549ae
JB
585 })
586
2431bdb4 587 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
588 const pool = new FixedClusterPool(
589 numberOfWorkers,
590 './tests/worker-files/cluster/testWorker.js'
591 )
09c2d0d3 592 const promises = new Set()
fc027381
JB
593 const maxMultiplier = 2
594 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 595 promises.add(pool.execute())
bf9549ae 596 }
f06e48d8 597 for (const workerNode of pool.workerNodes) {
465b2940 598 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
599 tasks: {
600 executed: 0,
601 executing: maxMultiplier,
602 queued: 0,
df593701 603 maxQueued: 0,
a4e07f72
JB
604 failed: 0
605 },
606 runTime: {
a4e07f72
JB
607 history: expect.any(CircularArray)
608 },
609 waitTime: {
a4e07f72
JB
610 history: expect.any(CircularArray)
611 },
5df69fab
JB
612 elu: {
613 idle: {
5df69fab
JB
614 history: expect.any(CircularArray)
615 },
616 active: {
5df69fab 617 history: expect.any(CircularArray)
f7510105 618 }
5df69fab 619 }
86bf340d 620 })
bf9549ae
JB
621 }
622 await Promise.all(promises)
f06e48d8 623 for (const workerNode of pool.workerNodes) {
465b2940 624 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
625 tasks: {
626 executed: maxMultiplier,
627 executing: 0,
628 queued: 0,
df593701 629 maxQueued: 0,
a4e07f72
JB
630 failed: 0
631 },
632 runTime: {
a4e07f72
JB
633 history: expect.any(CircularArray)
634 },
635 waitTime: {
a4e07f72
JB
636 history: expect.any(CircularArray)
637 },
5df69fab
JB
638 elu: {
639 idle: {
5df69fab
JB
640 history: expect.any(CircularArray)
641 },
642 active: {
5df69fab 643 history: expect.any(CircularArray)
f7510105 644 }
5df69fab 645 }
86bf340d 646 })
bf9549ae 647 }
fd7ebd49 648 await pool.destroy()
bf9549ae
JB
649 })
650
2431bdb4 651 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 652 const pool = new DynamicThreadPool(
2431bdb4 653 Math.floor(numberOfWorkers / 2),
8f4878b7 654 numberOfWorkers,
9e619829
JB
655 './tests/worker-files/thread/testWorker.js'
656 )
09c2d0d3 657 const promises = new Set()
ee9f5295
JB
658 const maxMultiplier = 2
659 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 660 promises.add(pool.execute())
9e619829
JB
661 }
662 await Promise.all(promises)
f06e48d8 663 for (const workerNode of pool.workerNodes) {
465b2940 664 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
665 tasks: {
666 executed: expect.any(Number),
667 executing: 0,
668 queued: 0,
df593701 669 maxQueued: 0,
a4e07f72
JB
670 failed: 0
671 },
672 runTime: {
a4e07f72
JB
673 history: expect.any(CircularArray)
674 },
675 waitTime: {
a4e07f72
JB
676 history: expect.any(CircularArray)
677 },
5df69fab
JB
678 elu: {
679 idle: {
5df69fab
JB
680 history: expect.any(CircularArray)
681 },
682 active: {
5df69fab 683 history: expect.any(CircularArray)
f7510105 684 }
5df69fab 685 }
86bf340d 686 })
465b2940 687 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
b56c2ee5 688 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
b97d82d8
JB
689 expect(workerNode.usage.runTime.history.length).toBe(0)
690 expect(workerNode.usage.waitTime.history.length).toBe(0)
691 expect(workerNode.usage.elu.idle.history.length).toBe(0)
692 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
693 }
694 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 695 for (const workerNode of pool.workerNodes) {
465b2940 696 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
697 tasks: {
698 executed: 0,
699 executing: 0,
700 queued: 0,
df593701 701 maxQueued: 0,
a4e07f72
JB
702 failed: 0
703 },
704 runTime: {
a4e07f72
JB
705 history: expect.any(CircularArray)
706 },
707 waitTime: {
a4e07f72
JB
708 history: expect.any(CircularArray)
709 },
5df69fab
JB
710 elu: {
711 idle: {
5df69fab
JB
712 history: expect.any(CircularArray)
713 },
714 active: {
5df69fab 715 history: expect.any(CircularArray)
f7510105 716 }
5df69fab 717 }
86bf340d 718 })
465b2940
JB
719 expect(workerNode.usage.runTime.history.length).toBe(0)
720 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
721 expect(workerNode.usage.elu.idle.history.length).toBe(0)
722 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 723 }
fd7ebd49 724 await pool.destroy()
ee11a4a2
JB
725 })
726
164d950a
JB
727 it("Verify that pool event emitter 'full' event can register a callback", async () => {
728 const pool = new DynamicThreadPool(
2431bdb4 729 Math.floor(numberOfWorkers / 2),
164d950a
JB
730 numberOfWorkers,
731 './tests/worker-files/thread/testWorker.js'
732 )
09c2d0d3 733 const promises = new Set()
164d950a 734 let poolFull = 0
d46660cd
JB
735 let poolInfo
736 pool.emitter.on(PoolEvents.full, info => {
737 ++poolFull
738 poolInfo = info
739 })
164d950a 740 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 741 promises.add(pool.execute())
164d950a
JB
742 }
743 await Promise.all(promises)
2431bdb4
JB
744 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
745 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
746 expect(poolFull).toBe(numberOfWorkers * 2 - 1)
d46660cd 747 expect(poolInfo).toStrictEqual({
23ccf9d7 748 version,
d46660cd
JB
749 type: PoolTypes.dynamic,
750 worker: WorkerTypes.thread,
2431bdb4
JB
751 ready: expect.any(Boolean),
752 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
753 minSize: expect.any(Number),
754 maxSize: expect.any(Number),
755 workerNodes: expect.any(Number),
756 idleWorkerNodes: expect.any(Number),
757 busyWorkerNodes: expect.any(Number),
758 executedTasks: expect.any(Number),
759 executingTasks: expect.any(Number),
760 queuedTasks: expect.any(Number),
761 maxQueuedTasks: expect.any(Number),
762 failedTasks: expect.any(Number)
763 })
764 await pool.destroy()
765 })
766
767 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
d5024c00
JB
768 const pool = new DynamicClusterPool(
769 Math.floor(numberOfWorkers / 2),
2431bdb4
JB
770 numberOfWorkers,
771 './tests/worker-files/cluster/testWorker.js'
772 )
2431bdb4 773 let poolInfo
cc3ab78b 774 let poolReady = 0
2431bdb4
JB
775 pool.emitter.on(PoolEvents.ready, info => {
776 ++poolReady
777 poolInfo = info
778 })
779 await waitPoolEvents(pool, PoolEvents.ready, 1)
780 expect(poolReady).toBe(1)
781 expect(poolInfo).toStrictEqual({
782 version,
d5024c00 783 type: PoolTypes.dynamic,
2431bdb4
JB
784 worker: WorkerTypes.cluster,
785 ready: true,
786 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
787 minSize: expect.any(Number),
788 maxSize: expect.any(Number),
789 workerNodes: expect.any(Number),
790 idleWorkerNodes: expect.any(Number),
791 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
792 executedTasks: expect.any(Number),
793 executingTasks: expect.any(Number),
d46660cd 794 queuedTasks: expect.any(Number),
a4e07f72
JB
795 maxQueuedTasks: expect.any(Number),
796 failedTasks: expect.any(Number)
d46660cd 797 })
164d950a
JB
798 await pool.destroy()
799 })
800
cf597bc5 801 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
802 const pool = new FixedThreadPool(
803 numberOfWorkers,
804 './tests/worker-files/thread/testWorker.js'
805 )
09c2d0d3 806 const promises = new Set()
7c0ba920 807 let poolBusy = 0
d46660cd
JB
808 let poolInfo
809 pool.emitter.on(PoolEvents.busy, info => {
810 ++poolBusy
811 poolInfo = info
812 })
7c0ba920 813 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 814 promises.add(pool.execute())
7c0ba920 815 }
cf597bc5 816 await Promise.all(promises)
14916bf9
JB
817 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
818 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
819 expect(poolBusy).toBe(numberOfWorkers + 1)
d46660cd 820 expect(poolInfo).toStrictEqual({
23ccf9d7 821 version,
d46660cd
JB
822 type: PoolTypes.fixed,
823 worker: WorkerTypes.thread,
2431bdb4
JB
824 ready: expect.any(Boolean),
825 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
826 minSize: expect.any(Number),
827 maxSize: expect.any(Number),
828 workerNodes: expect.any(Number),
829 idleWorkerNodes: expect.any(Number),
830 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
831 executedTasks: expect.any(Number),
832 executingTasks: expect.any(Number),
d46660cd 833 queuedTasks: expect.any(Number),
a4e07f72
JB
834 maxQueuedTasks: expect.any(Number),
835 failedTasks: expect.any(Number)
d46660cd 836 })
fd7ebd49 837 await pool.destroy()
7c0ba920 838 })
70a4f5ea
JB
839
840 it('Verify that multiple tasks worker is working', async () => {
841 const pool = new DynamicClusterPool(
2431bdb4 842 Math.floor(numberOfWorkers / 2),
70a4f5ea 843 numberOfWorkers,
70a4f5ea
JB
844 './tests/worker-files/cluster/testMultiTasksWorker.js'
845 )
846 const data = { n: 10 }
82888165 847 const result0 = await pool.execute(data)
30b963d4 848 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 849 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 850 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
851 const result2 = await pool.execute(data, 'factorial')
852 expect(result2).toBe(3628800)
853 const result3 = await pool.execute(data, 'fibonacci')
024daf59 854 expect(result3).toBe(55)
70a4f5ea 855 })
3ec964d6 856})