Merge pull request #883 from poolifier/dependabot/npm_and_yarn/examples/typescript...
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
b1aae695 2const sinon = require('sinon')
e843b904 3const {
70a4f5ea 4 DynamicClusterPool,
9e619829 5 DynamicThreadPool,
aee46736 6 FixedClusterPool,
e843b904 7 FixedThreadPool,
aee46736 8 PoolEvents,
184855e6 9 PoolTypes,
3d6dd312 10 WorkerChoiceStrategies,
184855e6 11 WorkerTypes
cdace0e5 12} = require('../../../lib')
78099a15 13const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 14const { Queue } = require('../../../lib/queue')
23ccf9d7 15const { version } = require('../../../package.json')
2431bdb4 16const { waitPoolEvents } = require('../../test-utils')
e1ffb94f
JB
17
18describe('Abstract pool test suite', () => {
fc027381 19 const numberOfWorkers = 2
a8884ffd 20 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
21 isMain () {
22 return false
23 }
3ec964d6 24 }
3ec964d6 25
3ec964d6 26 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
27 expect(
28 () =>
a8884ffd 29 new StubPoolWithIsMain(
7c0ba920 30 numberOfWorkers,
8d3782fa
JB
31 './tests/worker-files/thread/testWorker.js',
32 {
8ebe6c30 33 errorHandler: (e) => console.error(e)
8d3782fa
JB
34 }
35 )
04f45163 36 ).toThrowError(
8c6d4acf 37 'Cannot start a pool from a worker with the same type as the pool'
04f45163 38 )
3ec964d6 39 })
c510fea7
APA
40
41 it('Verify that filePath is checked', () => {
292ad316
JB
42 const expectedError = new Error(
43 'Please specify a file with a worker implementation'
44 )
7c0ba920 45 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 46 expectedError
8d3782fa 47 )
7c0ba920 48 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 49 expectedError
8d3782fa 50 )
3d6dd312
JB
51 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
52 expectedError
53 )
54 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
55 expectedError
56 )
57 expect(
58 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
59 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
60 })
61
62 it('Verify that numberOfWorkers is checked', () => {
63 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 64 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
65 )
66 })
67
68 it('Verify that a negative number of workers is checked', () => {
69 expect(
70 () =>
71 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
72 ).toThrowError(
473c717a
JB
73 new RangeError(
74 'Cannot instantiate a pool with a negative number of workers'
75 )
8d3782fa
JB
76 )
77 })
78
79 it('Verify that a non integer number of workers is checked', () => {
80 expect(
81 () =>
82 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
83 ).toThrowError(
473c717a 84 new TypeError(
0d80593b 85 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
86 )
87 )
c510fea7 88 })
7c0ba920 89
216541b6 90 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
91 expect(
92 () =>
93 new DynamicClusterPool(
94 1,
95 undefined,
96 './tests/worker-files/cluster/testWorker.js'
97 )
98 ).toThrowError(
99 new TypeError(
100 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
101 )
102 )
2761efb4
JB
103 expect(
104 () =>
105 new DynamicThreadPool(
106 0.5,
107 1,
108 './tests/worker-files/thread/testWorker.js'
109 )
110 ).toThrowError(
111 new TypeError(
112 'Cannot instantiate a pool with a non safe integer number of workers'
113 )
114 )
115 expect(
116 () =>
117 new DynamicClusterPool(
118 0,
119 0.5,
a5ed75b7 120 './tests/worker-files/cluster/testWorker.js'
2761efb4
JB
121 )
122 ).toThrowError(
123 new TypeError(
124 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
125 )
126 )
2431bdb4
JB
127 expect(
128 () =>
129 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
130 ).toThrowError(
131 new RangeError(
132 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
133 )
134 )
135 expect(
136 () =>
2761efb4
JB
137 new DynamicClusterPool(
138 1,
139 1,
a5ed75b7 140 './tests/worker-files/cluster/testWorker.js'
2761efb4 141 )
2431bdb4
JB
142 ).toThrowError(
143 new RangeError(
144 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
145 )
146 )
21f710aa
JB
147 expect(
148 () =>
149 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
150 ).toThrowError(
151 new RangeError(
d640b48b 152 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
21f710aa
JB
153 )
154 )
2431bdb4
JB
155 })
156
fd7ebd49 157 it('Verify that pool options are checked', async () => {
7c0ba920
JB
158 let pool = new FixedThreadPool(
159 numberOfWorkers,
160 './tests/worker-files/thread/testWorker.js'
161 )
7c0ba920 162 expect(pool.emitter).toBeDefined()
1f68cede
JB
163 expect(pool.opts.enableEvents).toBe(true)
164 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 165 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 166 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
167 expect(pool.opts.workerChoiceStrategy).toBe(
168 WorkerChoiceStrategies.ROUND_ROBIN
169 )
da309861 170 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d
JB
171 choiceRetries: 6,
172 runTime: { median: false },
173 waitTime: { median: false },
174 elu: { median: false }
175 })
176 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
177 choiceRetries: 6,
932fc8be 178 runTime: { median: false },
5df69fab
JB
179 waitTime: { median: false },
180 elu: { median: false }
da309861 181 })
35cf1c03
JB
182 expect(pool.opts.messageHandler).toBeUndefined()
183 expect(pool.opts.errorHandler).toBeUndefined()
184 expect(pool.opts.onlineHandler).toBeUndefined()
185 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 186 await pool.destroy()
73bfd59d 187 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
188 pool = new FixedThreadPool(
189 numberOfWorkers,
190 './tests/worker-files/thread/testWorker.js',
191 {
e4543b14 192 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 193 workerChoiceStrategyOptions: {
932fc8be 194 runTime: { median: true },
fc027381 195 weights: { 0: 300, 1: 200 }
49be33fe 196 },
35cf1c03 197 enableEvents: false,
1f68cede 198 restartWorkerOnError: false,
ff733df7 199 enableTasksQueue: true,
d4aeae5a 200 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
201 messageHandler: testHandler,
202 errorHandler: testHandler,
203 onlineHandler: testHandler,
204 exitHandler: testHandler
7c0ba920
JB
205 }
206 )
7c0ba920 207 expect(pool.emitter).toBeUndefined()
1f68cede
JB
208 expect(pool.opts.enableEvents).toBe(false)
209 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 210 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 211 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 212 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 213 WorkerChoiceStrategies.LEAST_USED
e843b904 214 )
da309861 215 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 216 choiceRetries: 6,
932fc8be 217 runTime: { median: true },
8990357d
JB
218 waitTime: { median: false },
219 elu: { median: false },
220 weights: { 0: 300, 1: 200 }
221 })
222 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
223 choiceRetries: 6,
224 runTime: { median: true },
225 waitTime: { median: false },
226 elu: { median: false },
fc027381 227 weights: { 0: 300, 1: 200 }
da309861 228 })
35cf1c03
JB
229 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
230 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
231 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
232 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 233 await pool.destroy()
7c0ba920
JB
234 })
235
a20f0ba5 236 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
237 expect(
238 () =>
239 new FixedThreadPool(
240 numberOfWorkers,
241 './tests/worker-files/thread/testWorker.js',
242 {
f0d7f803 243 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
244 }
245 )
8735b4e5
JB
246 ).toThrowError(
247 new Error("Invalid worker choice strategy 'invalidStrategy'")
248 )
49be33fe
JB
249 expect(
250 () =>
251 new FixedThreadPool(
252 numberOfWorkers,
253 './tests/worker-files/thread/testWorker.js',
254 {
255 workerChoiceStrategyOptions: { weights: {} }
256 }
257 )
258 ).toThrowError(
8735b4e5
JB
259 new Error(
260 'Invalid worker choice strategy options: must have a weight for each worker node'
261 )
49be33fe 262 )
f0d7f803
JB
263 expect(
264 () =>
265 new FixedThreadPool(
266 numberOfWorkers,
267 './tests/worker-files/thread/testWorker.js',
268 {
269 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
270 }
271 )
272 ).toThrowError(
8735b4e5
JB
273 new Error(
274 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
275 )
f0d7f803
JB
276 )
277 expect(
278 () =>
279 new FixedThreadPool(
280 numberOfWorkers,
281 './tests/worker-files/thread/testWorker.js',
282 {
283 enableTasksQueue: true,
284 tasksQueueOptions: { concurrency: 0 }
285 }
286 )
8735b4e5
JB
287 ).toThrowError(
288 new TypeError(
289 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
290 )
291 )
f0d7f803
JB
292 expect(
293 () =>
294 new FixedThreadPool(
295 numberOfWorkers,
296 './tests/worker-files/thread/testWorker.js',
297 {
298 enableTasksQueue: true,
299 tasksQueueOptions: 'invalidTasksQueueOptions'
300 }
301 )
8735b4e5
JB
302 ).toThrowError(
303 new TypeError('Invalid tasks queue options: must be a plain object')
304 )
f0d7f803
JB
305 expect(
306 () =>
307 new FixedThreadPool(
308 numberOfWorkers,
309 './tests/worker-files/thread/testWorker.js',
310 {
311 enableTasksQueue: true,
312 tasksQueueOptions: { concurrency: 0.2 }
313 }
314 )
8735b4e5
JB
315 ).toThrowError(
316 new TypeError('Invalid worker tasks concurrency: must be an integer')
317 )
d4aeae5a
JB
318 })
319
2431bdb4 320 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
321 const pool = new FixedThreadPool(
322 numberOfWorkers,
323 './tests/worker-files/thread/testWorker.js',
324 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
325 )
326 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d
JB
327 choiceRetries: 6,
328 runTime: { median: false },
329 waitTime: { median: false },
330 elu: { median: false }
331 })
332 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
333 choiceRetries: 6,
932fc8be 334 runTime: { median: false },
5df69fab
JB
335 waitTime: { median: false },
336 elu: { median: false }
a20f0ba5
JB
337 })
338 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
339 .workerChoiceStrategies) {
86bf340d 340 expect(workerChoiceStrategy.opts).toStrictEqual({
8990357d 341 choiceRetries: 6,
932fc8be 342 runTime: { median: false },
5df69fab
JB
343 waitTime: { median: false },
344 elu: { median: false }
86bf340d 345 })
a20f0ba5 346 }
87de9ff5
JB
347 expect(
348 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 ).toStrictEqual({
932fc8be
JB
350 runTime: {
351 aggregate: true,
352 average: true,
353 median: false
354 },
355 waitTime: {
356 aggregate: false,
357 average: false,
358 median: false
359 },
5df69fab 360 elu: {
9adcefab
JB
361 aggregate: true,
362 average: true,
5df69fab
JB
363 median: false
364 }
86bf340d 365 })
9adcefab
JB
366 pool.setWorkerChoiceStrategyOptions({
367 runTime: { median: true },
368 elu: { median: true }
369 })
a20f0ba5 370 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 371 choiceRetries: 6,
9adcefab 372 runTime: { median: true },
8990357d
JB
373 waitTime: { median: false },
374 elu: { median: true }
375 })
376 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
377 choiceRetries: 6,
378 runTime: { median: true },
379 waitTime: { median: false },
9adcefab 380 elu: { median: true }
a20f0ba5
JB
381 })
382 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
383 .workerChoiceStrategies) {
932fc8be 384 expect(workerChoiceStrategy.opts).toStrictEqual({
8990357d 385 choiceRetries: 6,
9adcefab 386 runTime: { median: true },
8990357d 387 waitTime: { median: false },
9adcefab 388 elu: { median: true }
932fc8be 389 })
a20f0ba5 390 }
87de9ff5
JB
391 expect(
392 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
393 ).toStrictEqual({
932fc8be
JB
394 runTime: {
395 aggregate: true,
396 average: false,
397 median: true
398 },
399 waitTime: {
400 aggregate: false,
401 average: false,
402 median: false
403 },
5df69fab 404 elu: {
9adcefab 405 aggregate: true,
5df69fab 406 average: false,
9adcefab 407 median: true
5df69fab 408 }
86bf340d 409 })
9adcefab
JB
410 pool.setWorkerChoiceStrategyOptions({
411 runTime: { median: false },
412 elu: { median: false }
413 })
a20f0ba5 414 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d
JB
415 choiceRetries: 6,
416 runTime: { median: false },
417 waitTime: { median: false },
418 elu: { median: false }
419 })
420 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
421 choiceRetries: 6,
9adcefab 422 runTime: { median: false },
8990357d 423 waitTime: { median: false },
9adcefab 424 elu: { median: false }
a20f0ba5
JB
425 })
426 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
427 .workerChoiceStrategies) {
932fc8be 428 expect(workerChoiceStrategy.opts).toStrictEqual({
8990357d 429 choiceRetries: 6,
9adcefab 430 runTime: { median: false },
8990357d 431 waitTime: { median: false },
9adcefab 432 elu: { median: false }
932fc8be 433 })
a20f0ba5 434 }
87de9ff5
JB
435 expect(
436 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
437 ).toStrictEqual({
932fc8be
JB
438 runTime: {
439 aggregate: true,
440 average: true,
441 median: false
442 },
443 waitTime: {
444 aggregate: false,
445 average: false,
446 median: false
447 },
5df69fab 448 elu: {
9adcefab
JB
449 aggregate: true,
450 average: true,
5df69fab
JB
451 median: false
452 }
86bf340d 453 })
1f95d544
JB
454 expect(() =>
455 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
456 ).toThrowError(
8735b4e5
JB
457 new TypeError(
458 'Invalid worker choice strategy options: must be a plain object'
459 )
1f95d544
JB
460 )
461 expect(() =>
462 pool.setWorkerChoiceStrategyOptions({ weights: {} })
463 ).toThrowError(
8735b4e5
JB
464 new Error(
465 'Invalid worker choice strategy options: must have a weight for each worker node'
466 )
1f95d544
JB
467 )
468 expect(() =>
469 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
470 ).toThrowError(
8735b4e5
JB
471 new Error(
472 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
473 )
1f95d544 474 )
a20f0ba5
JB
475 await pool.destroy()
476 })
477
2431bdb4 478 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
479 const pool = new FixedThreadPool(
480 numberOfWorkers,
481 './tests/worker-files/thread/testWorker.js'
482 )
483 expect(pool.opts.enableTasksQueue).toBe(false)
484 expect(pool.opts.tasksQueueOptions).toBeUndefined()
485 pool.enableTasksQueue(true)
486 expect(pool.opts.enableTasksQueue).toBe(true)
487 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
488 pool.enableTasksQueue(true, { concurrency: 2 })
489 expect(pool.opts.enableTasksQueue).toBe(true)
490 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
491 pool.enableTasksQueue(false)
492 expect(pool.opts.enableTasksQueue).toBe(false)
493 expect(pool.opts.tasksQueueOptions).toBeUndefined()
494 await pool.destroy()
495 })
496
2431bdb4 497 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
498 const pool = new FixedThreadPool(
499 numberOfWorkers,
500 './tests/worker-files/thread/testWorker.js',
501 { enableTasksQueue: true }
502 )
503 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
504 pool.setTasksQueueOptions({ concurrency: 2 })
505 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
f0d7f803
JB
506 expect(() =>
507 pool.setTasksQueueOptions('invalidTasksQueueOptions')
8735b4e5
JB
508 ).toThrowError(
509 new TypeError('Invalid tasks queue options: must be a plain object')
510 )
a20f0ba5 511 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
8735b4e5
JB
512 new Error(
513 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
514 )
515 )
516 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
517 new Error(
518 'Invalid worker tasks concurrency: -1 is a negative integer or zero'
519 )
a20f0ba5 520 )
f0d7f803 521 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
8735b4e5 522 new TypeError('Invalid worker tasks concurrency: must be an integer')
f0d7f803 523 )
a20f0ba5
JB
524 await pool.destroy()
525 })
526
6b27d407
JB
527 it('Verify that pool info is set', async () => {
528 let pool = new FixedThreadPool(
529 numberOfWorkers,
530 './tests/worker-files/thread/testWorker.js'
531 )
2dca6cab
JB
532 expect(pool.info).toStrictEqual({
533 version,
534 type: PoolTypes.fixed,
535 worker: WorkerTypes.thread,
536 ready: true,
537 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
538 minSize: numberOfWorkers,
539 maxSize: numberOfWorkers,
540 workerNodes: numberOfWorkers,
541 idleWorkerNodes: numberOfWorkers,
542 busyWorkerNodes: 0,
543 executedTasks: 0,
544 executingTasks: 0,
2dca6cab
JB
545 failedTasks: 0
546 })
6b27d407
JB
547 await pool.destroy()
548 pool = new DynamicClusterPool(
2431bdb4 549 Math.floor(numberOfWorkers / 2),
6b27d407 550 numberOfWorkers,
ecdfbdc0 551 './tests/worker-files/cluster/testWorker.js'
6b27d407 552 )
2dca6cab
JB
553 expect(pool.info).toStrictEqual({
554 version,
555 type: PoolTypes.dynamic,
556 worker: WorkerTypes.cluster,
557 ready: true,
558 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
559 minSize: Math.floor(numberOfWorkers / 2),
560 maxSize: numberOfWorkers,
561 workerNodes: Math.floor(numberOfWorkers / 2),
562 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
563 busyWorkerNodes: 0,
564 executedTasks: 0,
565 executingTasks: 0,
2dca6cab
JB
566 failedTasks: 0
567 })
6b27d407
JB
568 await pool.destroy()
569 })
570
2431bdb4 571 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
572 const pool = new FixedClusterPool(
573 numberOfWorkers,
574 './tests/worker-files/cluster/testWorker.js'
575 )
f06e48d8 576 for (const workerNode of pool.workerNodes) {
465b2940 577 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
578 tasks: {
579 executed: 0,
580 executing: 0,
581 queued: 0,
df593701 582 maxQueued: 0,
a4e07f72
JB
583 failed: 0
584 },
585 runTime: {
a4e07f72
JB
586 history: expect.any(CircularArray)
587 },
588 waitTime: {
a4e07f72
JB
589 history: expect.any(CircularArray)
590 },
5df69fab
JB
591 elu: {
592 idle: {
5df69fab
JB
593 history: expect.any(CircularArray)
594 },
595 active: {
5df69fab 596 history: expect.any(CircularArray)
f7510105 597 }
5df69fab 598 }
86bf340d 599 })
f06e48d8
JB
600 }
601 await pool.destroy()
602 })
603
2431bdb4
JB
604 it('Verify that pool worker tasks queue are initialized', async () => {
605 let pool = new FixedClusterPool(
f06e48d8
JB
606 numberOfWorkers,
607 './tests/worker-files/cluster/testWorker.js'
608 )
609 for (const workerNode of pool.workerNodes) {
610 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 611 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 612 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 613 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 614 }
fd7ebd49 615 await pool.destroy()
2431bdb4
JB
616 pool = new DynamicThreadPool(
617 Math.floor(numberOfWorkers / 2),
618 numberOfWorkers,
619 './tests/worker-files/thread/testWorker.js'
620 )
621 for (const workerNode of pool.workerNodes) {
622 expect(workerNode.tasksQueue).toBeDefined()
623 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
624 expect(workerNode.tasksQueue.size).toBe(0)
625 expect(workerNode.tasksQueue.maxSize).toBe(0)
626 }
627 })
628
629 it('Verify that pool worker info are initialized', async () => {
630 let pool = new FixedClusterPool(
631 numberOfWorkers,
632 './tests/worker-files/cluster/testWorker.js'
633 )
2dca6cab
JB
634 for (const workerNode of pool.workerNodes) {
635 expect(workerNode.info).toStrictEqual({
636 id: expect.any(Number),
637 type: WorkerTypes.cluster,
638 dynamic: false,
639 ready: true
640 })
641 }
2431bdb4
JB
642 await pool.destroy()
643 pool = new DynamicThreadPool(
644 Math.floor(numberOfWorkers / 2),
645 numberOfWorkers,
646 './tests/worker-files/thread/testWorker.js'
647 )
2dca6cab
JB
648 for (const workerNode of pool.workerNodes) {
649 expect(workerNode.info).toStrictEqual({
650 id: expect.any(Number),
651 type: WorkerTypes.thread,
652 dynamic: false,
7884d183 653 ready: true
2dca6cab
JB
654 })
655 }
bf9549ae
JB
656 })
657
2431bdb4 658 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
659 const pool = new FixedClusterPool(
660 numberOfWorkers,
661 './tests/worker-files/cluster/testWorker.js'
662 )
09c2d0d3 663 const promises = new Set()
fc027381
JB
664 const maxMultiplier = 2
665 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 666 promises.add(pool.execute())
bf9549ae 667 }
f06e48d8 668 for (const workerNode of pool.workerNodes) {
465b2940 669 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
670 tasks: {
671 executed: 0,
672 executing: maxMultiplier,
673 queued: 0,
df593701 674 maxQueued: 0,
a4e07f72
JB
675 failed: 0
676 },
677 runTime: {
a4e07f72
JB
678 history: expect.any(CircularArray)
679 },
680 waitTime: {
a4e07f72
JB
681 history: expect.any(CircularArray)
682 },
5df69fab
JB
683 elu: {
684 idle: {
5df69fab
JB
685 history: expect.any(CircularArray)
686 },
687 active: {
5df69fab 688 history: expect.any(CircularArray)
f7510105 689 }
5df69fab 690 }
86bf340d 691 })
bf9549ae
JB
692 }
693 await Promise.all(promises)
f06e48d8 694 for (const workerNode of pool.workerNodes) {
465b2940 695 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
696 tasks: {
697 executed: maxMultiplier,
698 executing: 0,
699 queued: 0,
df593701 700 maxQueued: 0,
a4e07f72
JB
701 failed: 0
702 },
703 runTime: {
a4e07f72
JB
704 history: expect.any(CircularArray)
705 },
706 waitTime: {
a4e07f72
JB
707 history: expect.any(CircularArray)
708 },
5df69fab
JB
709 elu: {
710 idle: {
5df69fab
JB
711 history: expect.any(CircularArray)
712 },
713 active: {
5df69fab 714 history: expect.any(CircularArray)
f7510105 715 }
5df69fab 716 }
86bf340d 717 })
bf9549ae 718 }
fd7ebd49 719 await pool.destroy()
bf9549ae
JB
720 })
721
2431bdb4 722 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 723 const pool = new DynamicThreadPool(
2431bdb4 724 Math.floor(numberOfWorkers / 2),
8f4878b7 725 numberOfWorkers,
9e619829
JB
726 './tests/worker-files/thread/testWorker.js'
727 )
09c2d0d3 728 const promises = new Set()
ee9f5295
JB
729 const maxMultiplier = 2
730 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 731 promises.add(pool.execute())
9e619829
JB
732 }
733 await Promise.all(promises)
f06e48d8 734 for (const workerNode of pool.workerNodes) {
465b2940 735 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
736 tasks: {
737 executed: expect.any(Number),
738 executing: 0,
739 queued: 0,
df593701 740 maxQueued: 0,
a4e07f72
JB
741 failed: 0
742 },
743 runTime: {
a4e07f72
JB
744 history: expect.any(CircularArray)
745 },
746 waitTime: {
a4e07f72
JB
747 history: expect.any(CircularArray)
748 },
5df69fab
JB
749 elu: {
750 idle: {
5df69fab
JB
751 history: expect.any(CircularArray)
752 },
753 active: {
5df69fab 754 history: expect.any(CircularArray)
f7510105 755 }
5df69fab 756 }
86bf340d 757 })
465b2940 758 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
94407def
JB
759 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
760 numberOfWorkers * maxMultiplier
761 )
b97d82d8
JB
762 expect(workerNode.usage.runTime.history.length).toBe(0)
763 expect(workerNode.usage.waitTime.history.length).toBe(0)
764 expect(workerNode.usage.elu.idle.history.length).toBe(0)
765 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
766 }
767 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 768 for (const workerNode of pool.workerNodes) {
465b2940 769 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
770 tasks: {
771 executed: 0,
772 executing: 0,
773 queued: 0,
df593701 774 maxQueued: 0,
a4e07f72
JB
775 failed: 0
776 },
777 runTime: {
a4e07f72
JB
778 history: expect.any(CircularArray)
779 },
780 waitTime: {
a4e07f72
JB
781 history: expect.any(CircularArray)
782 },
5df69fab
JB
783 elu: {
784 idle: {
5df69fab
JB
785 history: expect.any(CircularArray)
786 },
787 active: {
5df69fab 788 history: expect.any(CircularArray)
f7510105 789 }
5df69fab 790 }
86bf340d 791 })
465b2940
JB
792 expect(workerNode.usage.runTime.history.length).toBe(0)
793 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
794 expect(workerNode.usage.elu.idle.history.length).toBe(0)
795 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 796 }
fd7ebd49 797 await pool.destroy()
ee11a4a2
JB
798 })
799
a1763c54
JB
800 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
801 const pool = new DynamicClusterPool(
2431bdb4 802 Math.floor(numberOfWorkers / 2),
164d950a 803 numberOfWorkers,
a1763c54 804 './tests/worker-files/cluster/testWorker.js'
164d950a 805 )
d46660cd 806 let poolInfo
a1763c54
JB
807 let poolReady = 0
808 pool.emitter.on(PoolEvents.ready, (info) => {
809 ++poolReady
d46660cd
JB
810 poolInfo = info
811 })
a1763c54
JB
812 await waitPoolEvents(pool, PoolEvents.ready, 1)
813 expect(poolReady).toBe(1)
d46660cd 814 expect(poolInfo).toStrictEqual({
23ccf9d7 815 version,
d46660cd 816 type: PoolTypes.dynamic,
a1763c54
JB
817 worker: WorkerTypes.cluster,
818 ready: true,
2431bdb4
JB
819 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
820 minSize: expect.any(Number),
821 maxSize: expect.any(Number),
822 workerNodes: expect.any(Number),
823 idleWorkerNodes: expect.any(Number),
824 busyWorkerNodes: expect.any(Number),
825 executedTasks: expect.any(Number),
826 executingTasks: expect.any(Number),
2431bdb4
JB
827 failedTasks: expect.any(Number)
828 })
829 await pool.destroy()
830 })
831
a1763c54
JB
832 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
833 const pool = new FixedThreadPool(
2431bdb4 834 numberOfWorkers,
a1763c54 835 './tests/worker-files/thread/testWorker.js'
2431bdb4 836 )
a1763c54
JB
837 const promises = new Set()
838 let poolBusy = 0
2431bdb4 839 let poolInfo
a1763c54
JB
840 pool.emitter.on(PoolEvents.busy, (info) => {
841 ++poolBusy
2431bdb4
JB
842 poolInfo = info
843 })
a1763c54
JB
844 for (let i = 0; i < numberOfWorkers * 2; i++) {
845 promises.add(pool.execute())
846 }
847 await Promise.all(promises)
848 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
849 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
850 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
851 expect(poolInfo).toStrictEqual({
852 version,
a1763c54
JB
853 type: PoolTypes.fixed,
854 worker: WorkerTypes.thread,
855 ready: expect.any(Boolean),
2431bdb4 856 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
857 minSize: expect.any(Number),
858 maxSize: expect.any(Number),
859 workerNodes: expect.any(Number),
860 idleWorkerNodes: expect.any(Number),
861 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
862 executedTasks: expect.any(Number),
863 executingTasks: expect.any(Number),
a4e07f72 864 failedTasks: expect.any(Number)
d46660cd 865 })
164d950a
JB
866 await pool.destroy()
867 })
868
a1763c54
JB
869 it("Verify that pool event emitter 'full' event can register a callback", async () => {
870 const pool = new DynamicThreadPool(
871 Math.floor(numberOfWorkers / 2),
7c0ba920
JB
872 numberOfWorkers,
873 './tests/worker-files/thread/testWorker.js'
874 )
09c2d0d3 875 const promises = new Set()
a1763c54 876 let poolFull = 0
d46660cd 877 let poolInfo
a1763c54
JB
878 pool.emitter.on(PoolEvents.full, (info) => {
879 ++poolFull
d46660cd
JB
880 poolInfo = info
881 })
7c0ba920 882 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 883 promises.add(pool.execute())
7c0ba920 884 }
cf597bc5 885 await Promise.all(promises)
33e6bb4c 886 expect(poolFull).toBe(1)
d46660cd 887 expect(poolInfo).toStrictEqual({
23ccf9d7 888 version,
a1763c54 889 type: PoolTypes.dynamic,
d46660cd 890 worker: WorkerTypes.thread,
2431bdb4
JB
891 ready: expect.any(Boolean),
892 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
893 minSize: expect.any(Number),
894 maxSize: expect.any(Number),
895 workerNodes: expect.any(Number),
896 idleWorkerNodes: expect.any(Number),
897 busyWorkerNodes: expect.any(Number),
898 executedTasks: expect.any(Number),
899 executingTasks: expect.any(Number),
900 failedTasks: expect.any(Number)
901 })
902 await pool.destroy()
903 })
904
3e8611a8 905 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
b1aae695 906 const pool = new FixedThreadPool(
8735b4e5
JB
907 numberOfWorkers,
908 './tests/worker-files/thread/testWorker.js',
909 {
910 enableTasksQueue: true
911 }
912 )
3e8611a8 913 sinon.stub(pool, 'hasBackPressure').returns(true)
8735b4e5
JB
914 const promises = new Set()
915 let poolBackPressure = 0
916 let poolInfo
917 pool.emitter.on(PoolEvents.backPressure, (info) => {
918 ++poolBackPressure
919 poolInfo = info
920 })
b1aae695 921 for (let i = 0; i < numberOfWorkers * 2; i++) {
8735b4e5
JB
922 promises.add(pool.execute())
923 }
924 await Promise.all(promises)
3e8611a8 925 expect(poolBackPressure).toBe(2)
8735b4e5
JB
926 expect(poolInfo).toStrictEqual({
927 version,
3e8611a8 928 type: PoolTypes.fixed,
8735b4e5
JB
929 worker: WorkerTypes.thread,
930 ready: expect.any(Boolean),
931 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
932 minSize: expect.any(Number),
933 maxSize: expect.any(Number),
934 workerNodes: expect.any(Number),
935 idleWorkerNodes: expect.any(Number),
936 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
937 executedTasks: expect.any(Number),
938 executingTasks: expect.any(Number),
3e8611a8
JB
939 maxQueuedTasks: expect.any(Number),
940 queuedTasks: expect.any(Number),
941 backPressure: true,
a4e07f72 942 failedTasks: expect.any(Number)
d46660cd 943 })
3e8611a8 944 expect(pool.hasBackPressure.called).toBe(true)
fd7ebd49 945 await pool.destroy()
7c0ba920 946 })
70a4f5ea 947
90d7d101
JB
948 it('Verify that listTaskFunctions() is working', async () => {
949 const dynamicThreadPool = new DynamicThreadPool(
950 Math.floor(numberOfWorkers / 2),
951 numberOfWorkers,
952 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
953 )
954 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
955 expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
956 'default',
957 'jsonIntegerSerialization',
958 'factorial',
959 'fibonacci'
960 ])
961 const fixedClusterPool = new FixedClusterPool(
962 numberOfWorkers,
963 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
964 )
965 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
966 expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
967 'default',
968 'jsonIntegerSerialization',
969 'factorial',
970 'fibonacci'
971 ])
972 })
973
974 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 975 const pool = new DynamicClusterPool(
2431bdb4 976 Math.floor(numberOfWorkers / 2),
70a4f5ea 977 numberOfWorkers,
90d7d101 978 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
979 )
980 const data = { n: 10 }
82888165 981 const result0 = await pool.execute(data)
30b963d4 982 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 983 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 984 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
985 const result2 = await pool.execute(data, 'factorial')
986 expect(result2).toBe(3628800)
987 const result3 = await pool.execute(data, 'fibonacci')
024daf59 988 expect(result3).toBe(55)
5bb5be17
JB
989 expect(pool.info.executingTasks).toBe(0)
990 expect(pool.info.executedTasks).toBe(4)
b414b84c
JB
991 for (const workerNode of pool.workerNodes) {
992 expect(workerNode.info.taskFunctions).toStrictEqual([
993 'default',
994 'jsonIntegerSerialization',
995 'factorial',
996 'fibonacci'
997 ])
998 expect(workerNode.taskFunctionsUsage.size).toBe(3)
999 for (const name of pool.listTaskFunctions()) {
5bb5be17
JB
1000 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1001 tasks: {
1002 executed: expect.any(Number),
1003 executing: expect.any(Number),
1004 failed: 0,
1005 queued: 0
1006 },
1007 runTime: {
1008 history: expect.any(CircularArray)
1009 },
1010 waitTime: {
1011 history: expect.any(CircularArray)
1012 },
1013 elu: {
1014 idle: {
1015 history: expect.any(CircularArray)
1016 },
1017 active: {
1018 history: expect.any(CircularArray)
1019 }
1020 }
1021 })
1022 expect(
1023 workerNode.getTaskFunctionWorkerUsage(name).tasks.executing
1024 ).toBeGreaterThanOrEqual(0)
1025 }
1026 }
70a4f5ea 1027 })
3ec964d6 1028})