fix: ensure pool event full is emitted only once
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
184855e6 8 PoolTypes,
3d6dd312 9 WorkerChoiceStrategies,
184855e6 10 WorkerTypes
cdace0e5 11} = require('../../../lib')
78099a15 12const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 13const { Queue } = require('../../../lib/queue')
23ccf9d7 14const { version } = require('../../../package.json')
2431bdb4 15const { waitPoolEvents } = require('../../test-utils')
e1ffb94f
JB
16
17describe('Abstract pool test suite', () => {
fc027381 18 const numberOfWorkers = 2
a8884ffd 19 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
20 isMain () {
21 return false
22 }
3ec964d6 23 }
3ec964d6 24
3ec964d6 25 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
26 expect(
27 () =>
a8884ffd 28 new StubPoolWithIsMain(
7c0ba920 29 numberOfWorkers,
8d3782fa
JB
30 './tests/worker-files/thread/testWorker.js',
31 {
8ebe6c30 32 errorHandler: (e) => console.error(e)
8d3782fa
JB
33 }
34 )
04f45163 35 ).toThrowError(
8c6d4acf 36 'Cannot start a pool from a worker with the same type as the pool'
04f45163 37 )
3ec964d6 38 })
c510fea7
APA
39
40 it('Verify that filePath is checked', () => {
292ad316
JB
41 const expectedError = new Error(
42 'Please specify a file with a worker implementation'
43 )
7c0ba920 44 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 45 expectedError
8d3782fa 46 )
7c0ba920 47 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 48 expectedError
8d3782fa 49 )
3d6dd312
JB
50 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
51 expectedError
52 )
53 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
54 expectedError
55 )
56 expect(
57 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
58 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
59 })
60
61 it('Verify that numberOfWorkers is checked', () => {
62 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 63 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
64 )
65 })
66
67 it('Verify that a negative number of workers is checked', () => {
68 expect(
69 () =>
70 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
71 ).toThrowError(
473c717a
JB
72 new RangeError(
73 'Cannot instantiate a pool with a negative number of workers'
74 )
8d3782fa
JB
75 )
76 })
77
78 it('Verify that a non integer number of workers is checked', () => {
79 expect(
80 () =>
81 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
82 ).toThrowError(
473c717a 83 new TypeError(
0d80593b 84 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
85 )
86 )
c510fea7 87 })
7c0ba920 88
216541b6 89 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
90 expect(
91 () =>
92 new DynamicClusterPool(
93 1,
94 undefined,
95 './tests/worker-files/cluster/testWorker.js'
96 )
97 ).toThrowError(
98 new TypeError(
99 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
100 )
101 )
2761efb4
JB
102 expect(
103 () =>
104 new DynamicThreadPool(
105 0.5,
106 1,
107 './tests/worker-files/thread/testWorker.js'
108 )
109 ).toThrowError(
110 new TypeError(
111 'Cannot instantiate a pool with a non safe integer number of workers'
112 )
113 )
114 expect(
115 () =>
116 new DynamicClusterPool(
117 0,
118 0.5,
a5ed75b7 119 './tests/worker-files/cluster/testWorker.js'
2761efb4
JB
120 )
121 ).toThrowError(
122 new TypeError(
123 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
124 )
125 )
2431bdb4
JB
126 expect(
127 () =>
128 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
129 ).toThrowError(
130 new RangeError(
131 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
132 )
133 )
134 expect(
135 () =>
2761efb4
JB
136 new DynamicClusterPool(
137 1,
138 1,
a5ed75b7 139 './tests/worker-files/cluster/testWorker.js'
2761efb4 140 )
2431bdb4
JB
141 ).toThrowError(
142 new RangeError(
143 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
144 )
145 )
21f710aa
JB
146 expect(
147 () =>
148 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
149 ).toThrowError(
150 new RangeError(
d640b48b 151 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
21f710aa
JB
152 )
153 )
2431bdb4
JB
154 })
155
fd7ebd49 156 it('Verify that pool options are checked', async () => {
7c0ba920
JB
157 let pool = new FixedThreadPool(
158 numberOfWorkers,
159 './tests/worker-files/thread/testWorker.js'
160 )
7c0ba920 161 expect(pool.emitter).toBeDefined()
1f68cede
JB
162 expect(pool.opts.enableEvents).toBe(true)
163 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 164 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 165 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
166 expect(pool.opts.workerChoiceStrategy).toBe(
167 WorkerChoiceStrategies.ROUND_ROBIN
168 )
da309861 169 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d
JB
170 choiceRetries: 6,
171 runTime: { median: false },
172 waitTime: { median: false },
173 elu: { median: false }
174 })
175 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
176 choiceRetries: 6,
932fc8be 177 runTime: { median: false },
5df69fab
JB
178 waitTime: { median: false },
179 elu: { median: false }
da309861 180 })
35cf1c03
JB
181 expect(pool.opts.messageHandler).toBeUndefined()
182 expect(pool.opts.errorHandler).toBeUndefined()
183 expect(pool.opts.onlineHandler).toBeUndefined()
184 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 185 await pool.destroy()
73bfd59d 186 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
187 pool = new FixedThreadPool(
188 numberOfWorkers,
189 './tests/worker-files/thread/testWorker.js',
190 {
e4543b14 191 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 192 workerChoiceStrategyOptions: {
932fc8be 193 runTime: { median: true },
fc027381 194 weights: { 0: 300, 1: 200 }
49be33fe 195 },
35cf1c03 196 enableEvents: false,
1f68cede 197 restartWorkerOnError: false,
ff733df7 198 enableTasksQueue: true,
d4aeae5a 199 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
200 messageHandler: testHandler,
201 errorHandler: testHandler,
202 onlineHandler: testHandler,
203 exitHandler: testHandler
7c0ba920
JB
204 }
205 )
7c0ba920 206 expect(pool.emitter).toBeUndefined()
1f68cede
JB
207 expect(pool.opts.enableEvents).toBe(false)
208 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 209 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 210 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 211 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 212 WorkerChoiceStrategies.LEAST_USED
e843b904 213 )
da309861 214 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 215 choiceRetries: 6,
932fc8be 216 runTime: { median: true },
8990357d
JB
217 waitTime: { median: false },
218 elu: { median: false },
219 weights: { 0: 300, 1: 200 }
220 })
221 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
222 choiceRetries: 6,
223 runTime: { median: true },
224 waitTime: { median: false },
225 elu: { median: false },
fc027381 226 weights: { 0: 300, 1: 200 }
da309861 227 })
35cf1c03
JB
228 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
229 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
230 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
231 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 232 await pool.destroy()
7c0ba920
JB
233 })
234
a20f0ba5 235 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
236 expect(
237 () =>
238 new FixedThreadPool(
239 numberOfWorkers,
240 './tests/worker-files/thread/testWorker.js',
241 {
f0d7f803 242 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
243 }
244 )
f0d7f803 245 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
49be33fe
JB
246 expect(
247 () =>
248 new FixedThreadPool(
249 numberOfWorkers,
250 './tests/worker-files/thread/testWorker.js',
251 {
252 workerChoiceStrategyOptions: { weights: {} }
253 }
254 )
255 ).toThrowError(
256 'Invalid worker choice strategy options: must have a weight for each worker node'
257 )
f0d7f803
JB
258 expect(
259 () =>
260 new FixedThreadPool(
261 numberOfWorkers,
262 './tests/worker-files/thread/testWorker.js',
263 {
264 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
265 }
266 )
267 ).toThrowError(
268 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
269 )
270 expect(
271 () =>
272 new FixedThreadPool(
273 numberOfWorkers,
274 './tests/worker-files/thread/testWorker.js',
275 {
276 enableTasksQueue: true,
277 tasksQueueOptions: { concurrency: 0 }
278 }
279 )
280 ).toThrowError("Invalid worker tasks concurrency '0'")
281 expect(
282 () =>
283 new FixedThreadPool(
284 numberOfWorkers,
285 './tests/worker-files/thread/testWorker.js',
286 {
287 enableTasksQueue: true,
288 tasksQueueOptions: 'invalidTasksQueueOptions'
289 }
290 )
291 ).toThrowError('Invalid tasks queue options: must be a plain object')
292 expect(
293 () =>
294 new FixedThreadPool(
295 numberOfWorkers,
296 './tests/worker-files/thread/testWorker.js',
297 {
298 enableTasksQueue: true,
299 tasksQueueOptions: { concurrency: 0.2 }
300 }
301 )
302 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
d4aeae5a
JB
303 })
304
2431bdb4 305 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
306 const pool = new FixedThreadPool(
307 numberOfWorkers,
308 './tests/worker-files/thread/testWorker.js',
309 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
310 )
311 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d
JB
312 choiceRetries: 6,
313 runTime: { median: false },
314 waitTime: { median: false },
315 elu: { median: false }
316 })
317 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
318 choiceRetries: 6,
932fc8be 319 runTime: { median: false },
5df69fab
JB
320 waitTime: { median: false },
321 elu: { median: false }
a20f0ba5
JB
322 })
323 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
324 .workerChoiceStrategies) {
86bf340d 325 expect(workerChoiceStrategy.opts).toStrictEqual({
8990357d 326 choiceRetries: 6,
932fc8be 327 runTime: { median: false },
5df69fab
JB
328 waitTime: { median: false },
329 elu: { median: false }
86bf340d 330 })
a20f0ba5 331 }
87de9ff5
JB
332 expect(
333 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
334 ).toStrictEqual({
932fc8be
JB
335 runTime: {
336 aggregate: true,
337 average: true,
338 median: false
339 },
340 waitTime: {
341 aggregate: false,
342 average: false,
343 median: false
344 },
5df69fab 345 elu: {
9adcefab
JB
346 aggregate: true,
347 average: true,
5df69fab
JB
348 median: false
349 }
86bf340d 350 })
9adcefab
JB
351 pool.setWorkerChoiceStrategyOptions({
352 runTime: { median: true },
353 elu: { median: true }
354 })
a20f0ba5 355 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 356 choiceRetries: 6,
9adcefab 357 runTime: { median: true },
8990357d
JB
358 waitTime: { median: false },
359 elu: { median: true }
360 })
361 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
362 choiceRetries: 6,
363 runTime: { median: true },
364 waitTime: { median: false },
9adcefab 365 elu: { median: true }
a20f0ba5
JB
366 })
367 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
368 .workerChoiceStrategies) {
932fc8be 369 expect(workerChoiceStrategy.opts).toStrictEqual({
8990357d 370 choiceRetries: 6,
9adcefab 371 runTime: { median: true },
8990357d 372 waitTime: { median: false },
9adcefab 373 elu: { median: true }
932fc8be 374 })
a20f0ba5 375 }
87de9ff5
JB
376 expect(
377 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
378 ).toStrictEqual({
932fc8be
JB
379 runTime: {
380 aggregate: true,
381 average: false,
382 median: true
383 },
384 waitTime: {
385 aggregate: false,
386 average: false,
387 median: false
388 },
5df69fab 389 elu: {
9adcefab 390 aggregate: true,
5df69fab 391 average: false,
9adcefab 392 median: true
5df69fab 393 }
86bf340d 394 })
9adcefab
JB
395 pool.setWorkerChoiceStrategyOptions({
396 runTime: { median: false },
397 elu: { median: false }
398 })
a20f0ba5 399 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d
JB
400 choiceRetries: 6,
401 runTime: { median: false },
402 waitTime: { median: false },
403 elu: { median: false }
404 })
405 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
406 choiceRetries: 6,
9adcefab 407 runTime: { median: false },
8990357d 408 waitTime: { median: false },
9adcefab 409 elu: { median: false }
a20f0ba5
JB
410 })
411 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
412 .workerChoiceStrategies) {
932fc8be 413 expect(workerChoiceStrategy.opts).toStrictEqual({
8990357d 414 choiceRetries: 6,
9adcefab 415 runTime: { median: false },
8990357d 416 waitTime: { median: false },
9adcefab 417 elu: { median: false }
932fc8be 418 })
a20f0ba5 419 }
87de9ff5
JB
420 expect(
421 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
422 ).toStrictEqual({
932fc8be
JB
423 runTime: {
424 aggregate: true,
425 average: true,
426 median: false
427 },
428 waitTime: {
429 aggregate: false,
430 average: false,
431 median: false
432 },
5df69fab 433 elu: {
9adcefab
JB
434 aggregate: true,
435 average: true,
5df69fab
JB
436 median: false
437 }
86bf340d 438 })
1f95d544
JB
439 expect(() =>
440 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
441 ).toThrowError(
442 'Invalid worker choice strategy options: must be a plain object'
443 )
444 expect(() =>
445 pool.setWorkerChoiceStrategyOptions({ weights: {} })
446 ).toThrowError(
447 'Invalid worker choice strategy options: must have a weight for each worker node'
448 )
449 expect(() =>
450 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
451 ).toThrowError(
452 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
453 )
a20f0ba5
JB
454 await pool.destroy()
455 })
456
2431bdb4 457 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
458 const pool = new FixedThreadPool(
459 numberOfWorkers,
460 './tests/worker-files/thread/testWorker.js'
461 )
462 expect(pool.opts.enableTasksQueue).toBe(false)
463 expect(pool.opts.tasksQueueOptions).toBeUndefined()
464 pool.enableTasksQueue(true)
465 expect(pool.opts.enableTasksQueue).toBe(true)
466 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
467 pool.enableTasksQueue(true, { concurrency: 2 })
468 expect(pool.opts.enableTasksQueue).toBe(true)
469 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
470 pool.enableTasksQueue(false)
471 expect(pool.opts.enableTasksQueue).toBe(false)
472 expect(pool.opts.tasksQueueOptions).toBeUndefined()
473 await pool.destroy()
474 })
475
2431bdb4 476 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
477 const pool = new FixedThreadPool(
478 numberOfWorkers,
479 './tests/worker-files/thread/testWorker.js',
480 { enableTasksQueue: true }
481 )
482 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
483 pool.setTasksQueueOptions({ concurrency: 2 })
484 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
f0d7f803
JB
485 expect(() =>
486 pool.setTasksQueueOptions('invalidTasksQueueOptions')
487 ).toThrowError('Invalid tasks queue options: must be a plain object')
a20f0ba5
JB
488 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
489 "Invalid worker tasks concurrency '0'"
490 )
f0d7f803
JB
491 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
492 'Invalid worker tasks concurrency: must be an integer'
493 )
a20f0ba5
JB
494 await pool.destroy()
495 })
496
6b27d407
JB
497 it('Verify that pool info is set', async () => {
498 let pool = new FixedThreadPool(
499 numberOfWorkers,
500 './tests/worker-files/thread/testWorker.js'
501 )
2dca6cab
JB
502 expect(pool.info).toStrictEqual({
503 version,
504 type: PoolTypes.fixed,
505 worker: WorkerTypes.thread,
506 ready: true,
507 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
508 minSize: numberOfWorkers,
509 maxSize: numberOfWorkers,
510 workerNodes: numberOfWorkers,
511 idleWorkerNodes: numberOfWorkers,
512 busyWorkerNodes: 0,
513 executedTasks: 0,
514 executingTasks: 0,
2dca6cab
JB
515 failedTasks: 0
516 })
6b27d407
JB
517 await pool.destroy()
518 pool = new DynamicClusterPool(
2431bdb4 519 Math.floor(numberOfWorkers / 2),
6b27d407 520 numberOfWorkers,
ecdfbdc0 521 './tests/worker-files/cluster/testWorker.js'
6b27d407 522 )
2dca6cab
JB
523 expect(pool.info).toStrictEqual({
524 version,
525 type: PoolTypes.dynamic,
526 worker: WorkerTypes.cluster,
527 ready: true,
528 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
529 minSize: Math.floor(numberOfWorkers / 2),
530 maxSize: numberOfWorkers,
531 workerNodes: Math.floor(numberOfWorkers / 2),
532 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
533 busyWorkerNodes: 0,
534 executedTasks: 0,
535 executingTasks: 0,
2dca6cab
JB
536 failedTasks: 0
537 })
6b27d407
JB
538 await pool.destroy()
539 })
540
2431bdb4 541 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
542 const pool = new FixedClusterPool(
543 numberOfWorkers,
544 './tests/worker-files/cluster/testWorker.js'
545 )
f06e48d8 546 for (const workerNode of pool.workerNodes) {
465b2940 547 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
548 tasks: {
549 executed: 0,
550 executing: 0,
551 queued: 0,
df593701 552 maxQueued: 0,
a4e07f72
JB
553 failed: 0
554 },
555 runTime: {
a4e07f72
JB
556 history: expect.any(CircularArray)
557 },
558 waitTime: {
a4e07f72
JB
559 history: expect.any(CircularArray)
560 },
5df69fab
JB
561 elu: {
562 idle: {
5df69fab
JB
563 history: expect.any(CircularArray)
564 },
565 active: {
5df69fab 566 history: expect.any(CircularArray)
f7510105 567 }
5df69fab 568 }
86bf340d 569 })
f06e48d8
JB
570 }
571 await pool.destroy()
572 })
573
2431bdb4
JB
574 it('Verify that pool worker tasks queue are initialized', async () => {
575 let pool = new FixedClusterPool(
f06e48d8
JB
576 numberOfWorkers,
577 './tests/worker-files/cluster/testWorker.js'
578 )
579 for (const workerNode of pool.workerNodes) {
580 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 581 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 582 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 583 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 584 }
fd7ebd49 585 await pool.destroy()
2431bdb4
JB
586 pool = new DynamicThreadPool(
587 Math.floor(numberOfWorkers / 2),
588 numberOfWorkers,
589 './tests/worker-files/thread/testWorker.js'
590 )
591 for (const workerNode of pool.workerNodes) {
592 expect(workerNode.tasksQueue).toBeDefined()
593 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
594 expect(workerNode.tasksQueue.size).toBe(0)
595 expect(workerNode.tasksQueue.maxSize).toBe(0)
596 }
597 })
598
599 it('Verify that pool worker info are initialized', async () => {
600 let pool = new FixedClusterPool(
601 numberOfWorkers,
602 './tests/worker-files/cluster/testWorker.js'
603 )
2dca6cab
JB
604 for (const workerNode of pool.workerNodes) {
605 expect(workerNode.info).toStrictEqual({
606 id: expect.any(Number),
607 type: WorkerTypes.cluster,
608 dynamic: false,
609 ready: true
610 })
611 }
2431bdb4
JB
612 await pool.destroy()
613 pool = new DynamicThreadPool(
614 Math.floor(numberOfWorkers / 2),
615 numberOfWorkers,
616 './tests/worker-files/thread/testWorker.js'
617 )
2dca6cab
JB
618 for (const workerNode of pool.workerNodes) {
619 expect(workerNode.info).toStrictEqual({
620 id: expect.any(Number),
621 type: WorkerTypes.thread,
622 dynamic: false,
7884d183 623 ready: true
2dca6cab
JB
624 })
625 }
bf9549ae
JB
626 })
627
2431bdb4 628 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
629 const pool = new FixedClusterPool(
630 numberOfWorkers,
631 './tests/worker-files/cluster/testWorker.js'
632 )
09c2d0d3 633 const promises = new Set()
fc027381
JB
634 const maxMultiplier = 2
635 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 636 promises.add(pool.execute())
bf9549ae 637 }
f06e48d8 638 for (const workerNode of pool.workerNodes) {
465b2940 639 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
640 tasks: {
641 executed: 0,
642 executing: maxMultiplier,
643 queued: 0,
df593701 644 maxQueued: 0,
a4e07f72
JB
645 failed: 0
646 },
647 runTime: {
a4e07f72
JB
648 history: expect.any(CircularArray)
649 },
650 waitTime: {
a4e07f72
JB
651 history: expect.any(CircularArray)
652 },
5df69fab
JB
653 elu: {
654 idle: {
5df69fab
JB
655 history: expect.any(CircularArray)
656 },
657 active: {
5df69fab 658 history: expect.any(CircularArray)
f7510105 659 }
5df69fab 660 }
86bf340d 661 })
bf9549ae
JB
662 }
663 await Promise.all(promises)
f06e48d8 664 for (const workerNode of pool.workerNodes) {
465b2940 665 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
666 tasks: {
667 executed: maxMultiplier,
668 executing: 0,
669 queued: 0,
df593701 670 maxQueued: 0,
a4e07f72
JB
671 failed: 0
672 },
673 runTime: {
a4e07f72
JB
674 history: expect.any(CircularArray)
675 },
676 waitTime: {
a4e07f72
JB
677 history: expect.any(CircularArray)
678 },
5df69fab
JB
679 elu: {
680 idle: {
5df69fab
JB
681 history: expect.any(CircularArray)
682 },
683 active: {
5df69fab 684 history: expect.any(CircularArray)
f7510105 685 }
5df69fab 686 }
86bf340d 687 })
bf9549ae 688 }
fd7ebd49 689 await pool.destroy()
bf9549ae
JB
690 })
691
2431bdb4 692 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 693 const pool = new DynamicThreadPool(
2431bdb4 694 Math.floor(numberOfWorkers / 2),
8f4878b7 695 numberOfWorkers,
9e619829
JB
696 './tests/worker-files/thread/testWorker.js'
697 )
09c2d0d3 698 const promises = new Set()
ee9f5295
JB
699 const maxMultiplier = 2
700 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 701 promises.add(pool.execute())
9e619829
JB
702 }
703 await Promise.all(promises)
f06e48d8 704 for (const workerNode of pool.workerNodes) {
465b2940 705 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
706 tasks: {
707 executed: expect.any(Number),
708 executing: 0,
709 queued: 0,
df593701 710 maxQueued: 0,
a4e07f72
JB
711 failed: 0
712 },
713 runTime: {
a4e07f72
JB
714 history: expect.any(CircularArray)
715 },
716 waitTime: {
a4e07f72
JB
717 history: expect.any(CircularArray)
718 },
5df69fab
JB
719 elu: {
720 idle: {
5df69fab
JB
721 history: expect.any(CircularArray)
722 },
723 active: {
5df69fab 724 history: expect.any(CircularArray)
f7510105 725 }
5df69fab 726 }
86bf340d 727 })
465b2940 728 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
b56c2ee5 729 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
b97d82d8
JB
730 expect(workerNode.usage.runTime.history.length).toBe(0)
731 expect(workerNode.usage.waitTime.history.length).toBe(0)
732 expect(workerNode.usage.elu.idle.history.length).toBe(0)
733 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
734 }
735 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 736 for (const workerNode of pool.workerNodes) {
465b2940 737 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
738 tasks: {
739 executed: 0,
740 executing: 0,
741 queued: 0,
df593701 742 maxQueued: 0,
a4e07f72
JB
743 failed: 0
744 },
745 runTime: {
a4e07f72
JB
746 history: expect.any(CircularArray)
747 },
748 waitTime: {
a4e07f72
JB
749 history: expect.any(CircularArray)
750 },
5df69fab
JB
751 elu: {
752 idle: {
5df69fab
JB
753 history: expect.any(CircularArray)
754 },
755 active: {
5df69fab 756 history: expect.any(CircularArray)
f7510105 757 }
5df69fab 758 }
86bf340d 759 })
465b2940
JB
760 expect(workerNode.usage.runTime.history.length).toBe(0)
761 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
762 expect(workerNode.usage.elu.idle.history.length).toBe(0)
763 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 764 }
fd7ebd49 765 await pool.destroy()
ee11a4a2
JB
766 })
767
a1763c54
JB
768 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
769 const pool = new DynamicClusterPool(
2431bdb4 770 Math.floor(numberOfWorkers / 2),
164d950a 771 numberOfWorkers,
a1763c54 772 './tests/worker-files/cluster/testWorker.js'
164d950a 773 )
d46660cd 774 let poolInfo
a1763c54
JB
775 let poolReady = 0
776 pool.emitter.on(PoolEvents.ready, (info) => {
777 ++poolReady
d46660cd
JB
778 poolInfo = info
779 })
a1763c54
JB
780 await waitPoolEvents(pool, PoolEvents.ready, 1)
781 expect(poolReady).toBe(1)
d46660cd 782 expect(poolInfo).toStrictEqual({
23ccf9d7 783 version,
d46660cd 784 type: PoolTypes.dynamic,
a1763c54
JB
785 worker: WorkerTypes.cluster,
786 ready: true,
2431bdb4
JB
787 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
788 minSize: expect.any(Number),
789 maxSize: expect.any(Number),
790 workerNodes: expect.any(Number),
791 idleWorkerNodes: expect.any(Number),
792 busyWorkerNodes: expect.any(Number),
793 executedTasks: expect.any(Number),
794 executingTasks: expect.any(Number),
2431bdb4
JB
795 failedTasks: expect.any(Number)
796 })
797 await pool.destroy()
798 })
799
a1763c54
JB
800 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
801 const pool = new FixedThreadPool(
2431bdb4 802 numberOfWorkers,
a1763c54 803 './tests/worker-files/thread/testWorker.js'
2431bdb4 804 )
a1763c54
JB
805 const promises = new Set()
806 let poolBusy = 0
2431bdb4 807 let poolInfo
a1763c54
JB
808 pool.emitter.on(PoolEvents.busy, (info) => {
809 ++poolBusy
2431bdb4
JB
810 poolInfo = info
811 })
a1763c54
JB
812 for (let i = 0; i < numberOfWorkers * 2; i++) {
813 promises.add(pool.execute())
814 }
815 await Promise.all(promises)
816 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
817 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
818 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
819 expect(poolInfo).toStrictEqual({
820 version,
a1763c54
JB
821 type: PoolTypes.fixed,
822 worker: WorkerTypes.thread,
823 ready: expect.any(Boolean),
2431bdb4 824 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
825 minSize: expect.any(Number),
826 maxSize: expect.any(Number),
827 workerNodes: expect.any(Number),
828 idleWorkerNodes: expect.any(Number),
829 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
830 executedTasks: expect.any(Number),
831 executingTasks: expect.any(Number),
a4e07f72 832 failedTasks: expect.any(Number)
d46660cd 833 })
164d950a
JB
834 await pool.destroy()
835 })
836
a1763c54
JB
837 it("Verify that pool event emitter 'full' event can register a callback", async () => {
838 const pool = new DynamicThreadPool(
839 Math.floor(numberOfWorkers / 2),
7c0ba920
JB
840 numberOfWorkers,
841 './tests/worker-files/thread/testWorker.js'
842 )
09c2d0d3 843 const promises = new Set()
a1763c54 844 let poolFull = 0
d46660cd 845 let poolInfo
a1763c54
JB
846 pool.emitter.on(PoolEvents.full, (info) => {
847 ++poolFull
d46660cd
JB
848 poolInfo = info
849 })
7c0ba920 850 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 851 promises.add(pool.execute())
7c0ba920 852 }
cf597bc5 853 await Promise.all(promises)
33e6bb4c 854 expect(poolFull).toBe(1)
d46660cd 855 expect(poolInfo).toStrictEqual({
23ccf9d7 856 version,
a1763c54 857 type: PoolTypes.dynamic,
d46660cd 858 worker: WorkerTypes.thread,
2431bdb4
JB
859 ready: expect.any(Boolean),
860 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
861 minSize: expect.any(Number),
862 maxSize: expect.any(Number),
863 workerNodes: expect.any(Number),
864 idleWorkerNodes: expect.any(Number),
865 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
866 executedTasks: expect.any(Number),
867 executingTasks: expect.any(Number),
a4e07f72 868 failedTasks: expect.any(Number)
d46660cd 869 })
fd7ebd49 870 await pool.destroy()
7c0ba920 871 })
70a4f5ea 872
90d7d101
JB
873 it('Verify that listTaskFunctions() is working', async () => {
874 const dynamicThreadPool = new DynamicThreadPool(
875 Math.floor(numberOfWorkers / 2),
876 numberOfWorkers,
877 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
878 )
879 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
880 expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
881 'default',
882 'jsonIntegerSerialization',
883 'factorial',
884 'fibonacci'
885 ])
886 const fixedClusterPool = new FixedClusterPool(
887 numberOfWorkers,
888 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
889 )
890 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
891 expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
892 'default',
893 'jsonIntegerSerialization',
894 'factorial',
895 'fibonacci'
896 ])
897 })
898
899 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 900 const pool = new DynamicClusterPool(
2431bdb4 901 Math.floor(numberOfWorkers / 2),
70a4f5ea 902 numberOfWorkers,
90d7d101 903 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
904 )
905 const data = { n: 10 }
82888165 906 const result0 = await pool.execute(data)
30b963d4 907 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 908 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 909 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
910 const result2 = await pool.execute(data, 'factorial')
911 expect(result2).toBe(3628800)
912 const result3 = await pool.execute(data, 'fibonacci')
024daf59 913 expect(result3).toBe(55)
5bb5be17
JB
914 expect(pool.info.executingTasks).toBe(0)
915 expect(pool.info.executedTasks).toBe(4)
b414b84c
JB
916 for (const workerNode of pool.workerNodes) {
917 expect(workerNode.info.taskFunctions).toStrictEqual([
918 'default',
919 'jsonIntegerSerialization',
920 'factorial',
921 'fibonacci'
922 ])
923 expect(workerNode.taskFunctionsUsage.size).toBe(3)
924 for (const name of pool.listTaskFunctions()) {
5bb5be17
JB
925 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
926 tasks: {
927 executed: expect.any(Number),
928 executing: expect.any(Number),
929 failed: 0,
930 queued: 0
931 },
932 runTime: {
933 history: expect.any(CircularArray)
934 },
935 waitTime: {
936 history: expect.any(CircularArray)
937 },
938 elu: {
939 idle: {
940 history: expect.any(CircularArray)
941 },
942 active: {
943 history: expect.any(CircularArray)
944 }
945 }
946 })
947 expect(
948 workerNode.getTaskFunctionWorkerUsage(name).tasks.executing
949 ).toBeGreaterThanOrEqual(0)
950 }
951 }
70a4f5ea 952 })
3ec964d6 953})