fix: ensure worker node cannot be instantiaed without proper arguments
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
184855e6 8 PoolTypes,
3d6dd312 9 WorkerChoiceStrategies,
184855e6 10 WorkerTypes
cdace0e5 11} = require('../../../lib')
78099a15 12const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 13const { Queue } = require('../../../lib/queue')
23ccf9d7 14const { version } = require('../../../package.json')
2431bdb4 15const { waitPoolEvents } = require('../../test-utils')
e1ffb94f
JB
16
17describe('Abstract pool test suite', () => {
fc027381 18 const numberOfWorkers = 2
a8884ffd 19 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
20 isMain () {
21 return false
22 }
3ec964d6 23 }
3ec964d6 24
3ec964d6 25 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
26 expect(
27 () =>
a8884ffd 28 new StubPoolWithIsMain(
7c0ba920 29 numberOfWorkers,
8d3782fa
JB
30 './tests/worker-files/thread/testWorker.js',
31 {
8ebe6c30 32 errorHandler: (e) => console.error(e)
8d3782fa
JB
33 }
34 )
04f45163 35 ).toThrowError(
8c6d4acf 36 'Cannot start a pool from a worker with the same type as the pool'
04f45163 37 )
3ec964d6 38 })
c510fea7
APA
39
40 it('Verify that filePath is checked', () => {
292ad316
JB
41 const expectedError = new Error(
42 'Please specify a file with a worker implementation'
43 )
7c0ba920 44 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 45 expectedError
8d3782fa 46 )
7c0ba920 47 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 48 expectedError
8d3782fa 49 )
3d6dd312
JB
50 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
51 expectedError
52 )
53 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
54 expectedError
55 )
56 expect(
57 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
58 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
59 })
60
61 it('Verify that numberOfWorkers is checked', () => {
62 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 63 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
64 )
65 })
66
67 it('Verify that a negative number of workers is checked', () => {
68 expect(
69 () =>
70 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
71 ).toThrowError(
473c717a
JB
72 new RangeError(
73 'Cannot instantiate a pool with a negative number of workers'
74 )
8d3782fa
JB
75 )
76 })
77
78 it('Verify that a non integer number of workers is checked', () => {
79 expect(
80 () =>
81 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
82 ).toThrowError(
473c717a 83 new TypeError(
0d80593b 84 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
85 )
86 )
c510fea7 87 })
7c0ba920 88
216541b6 89 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
90 expect(
91 () =>
92 new DynamicClusterPool(
93 1,
94 undefined,
95 './tests/worker-files/cluster/testWorker.js'
96 )
97 ).toThrowError(
98 new TypeError(
99 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
100 )
101 )
2761efb4
JB
102 expect(
103 () =>
104 new DynamicThreadPool(
105 0.5,
106 1,
107 './tests/worker-files/thread/testWorker.js'
108 )
109 ).toThrowError(
110 new TypeError(
111 'Cannot instantiate a pool with a non safe integer number of workers'
112 )
113 )
114 expect(
115 () =>
116 new DynamicClusterPool(
117 0,
118 0.5,
a5ed75b7 119 './tests/worker-files/cluster/testWorker.js'
2761efb4
JB
120 )
121 ).toThrowError(
122 new TypeError(
123 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
124 )
125 )
2431bdb4
JB
126 expect(
127 () =>
128 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
129 ).toThrowError(
130 new RangeError(
131 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
132 )
133 )
134 expect(
135 () =>
2761efb4
JB
136 new DynamicClusterPool(
137 1,
138 1,
a5ed75b7 139 './tests/worker-files/cluster/testWorker.js'
2761efb4 140 )
2431bdb4
JB
141 ).toThrowError(
142 new RangeError(
143 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
144 )
145 )
21f710aa
JB
146 expect(
147 () =>
148 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
149 ).toThrowError(
150 new RangeError(
d640b48b 151 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
21f710aa
JB
152 )
153 )
2431bdb4
JB
154 })
155
fd7ebd49 156 it('Verify that pool options are checked', async () => {
7c0ba920
JB
157 let pool = new FixedThreadPool(
158 numberOfWorkers,
159 './tests/worker-files/thread/testWorker.js'
160 )
7c0ba920 161 expect(pool.emitter).toBeDefined()
1f68cede
JB
162 expect(pool.opts.enableEvents).toBe(true)
163 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 164 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 165 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
166 expect(pool.opts.workerChoiceStrategy).toBe(
167 WorkerChoiceStrategies.ROUND_ROBIN
168 )
da309861 169 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d
JB
170 choiceRetries: 6,
171 runTime: { median: false },
172 waitTime: { median: false },
173 elu: { median: false }
174 })
175 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
176 choiceRetries: 6,
932fc8be 177 runTime: { median: false },
5df69fab
JB
178 waitTime: { median: false },
179 elu: { median: false }
da309861 180 })
35cf1c03
JB
181 expect(pool.opts.messageHandler).toBeUndefined()
182 expect(pool.opts.errorHandler).toBeUndefined()
183 expect(pool.opts.onlineHandler).toBeUndefined()
184 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 185 await pool.destroy()
73bfd59d 186 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
187 pool = new FixedThreadPool(
188 numberOfWorkers,
189 './tests/worker-files/thread/testWorker.js',
190 {
e4543b14 191 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 192 workerChoiceStrategyOptions: {
932fc8be 193 runTime: { median: true },
fc027381 194 weights: { 0: 300, 1: 200 }
49be33fe 195 },
35cf1c03 196 enableEvents: false,
1f68cede 197 restartWorkerOnError: false,
ff733df7 198 enableTasksQueue: true,
d4aeae5a 199 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
200 messageHandler: testHandler,
201 errorHandler: testHandler,
202 onlineHandler: testHandler,
203 exitHandler: testHandler
7c0ba920
JB
204 }
205 )
7c0ba920 206 expect(pool.emitter).toBeUndefined()
1f68cede
JB
207 expect(pool.opts.enableEvents).toBe(false)
208 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 209 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 210 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 211 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 212 WorkerChoiceStrategies.LEAST_USED
e843b904 213 )
da309861 214 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 215 choiceRetries: 6,
932fc8be 216 runTime: { median: true },
8990357d
JB
217 waitTime: { median: false },
218 elu: { median: false },
219 weights: { 0: 300, 1: 200 }
220 })
221 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
222 choiceRetries: 6,
223 runTime: { median: true },
224 waitTime: { median: false },
225 elu: { median: false },
fc027381 226 weights: { 0: 300, 1: 200 }
da309861 227 })
35cf1c03
JB
228 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
229 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
230 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
231 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 232 await pool.destroy()
7c0ba920
JB
233 })
234
a20f0ba5 235 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
236 expect(
237 () =>
238 new FixedThreadPool(
239 numberOfWorkers,
240 './tests/worker-files/thread/testWorker.js',
241 {
f0d7f803 242 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
243 }
244 )
8735b4e5
JB
245 ).toThrowError(
246 new Error("Invalid worker choice strategy 'invalidStrategy'")
247 )
49be33fe
JB
248 expect(
249 () =>
250 new FixedThreadPool(
251 numberOfWorkers,
252 './tests/worker-files/thread/testWorker.js',
253 {
254 workerChoiceStrategyOptions: { weights: {} }
255 }
256 )
257 ).toThrowError(
8735b4e5
JB
258 new Error(
259 'Invalid worker choice strategy options: must have a weight for each worker node'
260 )
49be33fe 261 )
f0d7f803
JB
262 expect(
263 () =>
264 new FixedThreadPool(
265 numberOfWorkers,
266 './tests/worker-files/thread/testWorker.js',
267 {
268 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
269 }
270 )
271 ).toThrowError(
8735b4e5
JB
272 new Error(
273 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
274 )
f0d7f803
JB
275 )
276 expect(
277 () =>
278 new FixedThreadPool(
279 numberOfWorkers,
280 './tests/worker-files/thread/testWorker.js',
281 {
282 enableTasksQueue: true,
283 tasksQueueOptions: { concurrency: 0 }
284 }
285 )
8735b4e5
JB
286 ).toThrowError(
287 new TypeError(
288 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
289 )
290 )
f0d7f803
JB
291 expect(
292 () =>
293 new FixedThreadPool(
294 numberOfWorkers,
295 './tests/worker-files/thread/testWorker.js',
296 {
297 enableTasksQueue: true,
298 tasksQueueOptions: 'invalidTasksQueueOptions'
299 }
300 )
8735b4e5
JB
301 ).toThrowError(
302 new TypeError('Invalid tasks queue options: must be a plain object')
303 )
f0d7f803
JB
304 expect(
305 () =>
306 new FixedThreadPool(
307 numberOfWorkers,
308 './tests/worker-files/thread/testWorker.js',
309 {
310 enableTasksQueue: true,
311 tasksQueueOptions: { concurrency: 0.2 }
312 }
313 )
8735b4e5
JB
314 ).toThrowError(
315 new TypeError('Invalid worker tasks concurrency: must be an integer')
316 )
d4aeae5a
JB
317 })
318
2431bdb4 319 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
320 const pool = new FixedThreadPool(
321 numberOfWorkers,
322 './tests/worker-files/thread/testWorker.js',
323 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
324 )
325 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d
JB
326 choiceRetries: 6,
327 runTime: { median: false },
328 waitTime: { median: false },
329 elu: { median: false }
330 })
331 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
332 choiceRetries: 6,
932fc8be 333 runTime: { median: false },
5df69fab
JB
334 waitTime: { median: false },
335 elu: { median: false }
a20f0ba5
JB
336 })
337 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
338 .workerChoiceStrategies) {
86bf340d 339 expect(workerChoiceStrategy.opts).toStrictEqual({
8990357d 340 choiceRetries: 6,
932fc8be 341 runTime: { median: false },
5df69fab
JB
342 waitTime: { median: false },
343 elu: { median: false }
86bf340d 344 })
a20f0ba5 345 }
87de9ff5
JB
346 expect(
347 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
348 ).toStrictEqual({
932fc8be
JB
349 runTime: {
350 aggregate: true,
351 average: true,
352 median: false
353 },
354 waitTime: {
355 aggregate: false,
356 average: false,
357 median: false
358 },
5df69fab 359 elu: {
9adcefab
JB
360 aggregate: true,
361 average: true,
5df69fab
JB
362 median: false
363 }
86bf340d 364 })
9adcefab
JB
365 pool.setWorkerChoiceStrategyOptions({
366 runTime: { median: true },
367 elu: { median: true }
368 })
a20f0ba5 369 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d 370 choiceRetries: 6,
9adcefab 371 runTime: { median: true },
8990357d
JB
372 waitTime: { median: false },
373 elu: { median: true }
374 })
375 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
376 choiceRetries: 6,
377 runTime: { median: true },
378 waitTime: { median: false },
9adcefab 379 elu: { median: true }
a20f0ba5
JB
380 })
381 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
382 .workerChoiceStrategies) {
932fc8be 383 expect(workerChoiceStrategy.opts).toStrictEqual({
8990357d 384 choiceRetries: 6,
9adcefab 385 runTime: { median: true },
8990357d 386 waitTime: { median: false },
9adcefab 387 elu: { median: true }
932fc8be 388 })
a20f0ba5 389 }
87de9ff5
JB
390 expect(
391 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
392 ).toStrictEqual({
932fc8be
JB
393 runTime: {
394 aggregate: true,
395 average: false,
396 median: true
397 },
398 waitTime: {
399 aggregate: false,
400 average: false,
401 median: false
402 },
5df69fab 403 elu: {
9adcefab 404 aggregate: true,
5df69fab 405 average: false,
9adcefab 406 median: true
5df69fab 407 }
86bf340d 408 })
9adcefab
JB
409 pool.setWorkerChoiceStrategyOptions({
410 runTime: { median: false },
411 elu: { median: false }
412 })
a20f0ba5 413 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
8990357d
JB
414 choiceRetries: 6,
415 runTime: { median: false },
416 waitTime: { median: false },
417 elu: { median: false }
418 })
419 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
420 choiceRetries: 6,
9adcefab 421 runTime: { median: false },
8990357d 422 waitTime: { median: false },
9adcefab 423 elu: { median: false }
a20f0ba5
JB
424 })
425 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
426 .workerChoiceStrategies) {
932fc8be 427 expect(workerChoiceStrategy.opts).toStrictEqual({
8990357d 428 choiceRetries: 6,
9adcefab 429 runTime: { median: false },
8990357d 430 waitTime: { median: false },
9adcefab 431 elu: { median: false }
932fc8be 432 })
a20f0ba5 433 }
87de9ff5
JB
434 expect(
435 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
436 ).toStrictEqual({
932fc8be
JB
437 runTime: {
438 aggregate: true,
439 average: true,
440 median: false
441 },
442 waitTime: {
443 aggregate: false,
444 average: false,
445 median: false
446 },
5df69fab 447 elu: {
9adcefab
JB
448 aggregate: true,
449 average: true,
5df69fab
JB
450 median: false
451 }
86bf340d 452 })
1f95d544
JB
453 expect(() =>
454 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
455 ).toThrowError(
8735b4e5
JB
456 new TypeError(
457 'Invalid worker choice strategy options: must be a plain object'
458 )
1f95d544
JB
459 )
460 expect(() =>
461 pool.setWorkerChoiceStrategyOptions({ weights: {} })
462 ).toThrowError(
8735b4e5
JB
463 new Error(
464 'Invalid worker choice strategy options: must have a weight for each worker node'
465 )
1f95d544
JB
466 )
467 expect(() =>
468 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
469 ).toThrowError(
8735b4e5
JB
470 new Error(
471 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
472 )
1f95d544 473 )
a20f0ba5
JB
474 await pool.destroy()
475 })
476
2431bdb4 477 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
478 const pool = new FixedThreadPool(
479 numberOfWorkers,
480 './tests/worker-files/thread/testWorker.js'
481 )
482 expect(pool.opts.enableTasksQueue).toBe(false)
483 expect(pool.opts.tasksQueueOptions).toBeUndefined()
484 pool.enableTasksQueue(true)
485 expect(pool.opts.enableTasksQueue).toBe(true)
486 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
487 pool.enableTasksQueue(true, { concurrency: 2 })
488 expect(pool.opts.enableTasksQueue).toBe(true)
489 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
490 pool.enableTasksQueue(false)
491 expect(pool.opts.enableTasksQueue).toBe(false)
492 expect(pool.opts.tasksQueueOptions).toBeUndefined()
493 await pool.destroy()
494 })
495
2431bdb4 496 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
497 const pool = new FixedThreadPool(
498 numberOfWorkers,
499 './tests/worker-files/thread/testWorker.js',
500 { enableTasksQueue: true }
501 )
502 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
503 pool.setTasksQueueOptions({ concurrency: 2 })
504 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
f0d7f803
JB
505 expect(() =>
506 pool.setTasksQueueOptions('invalidTasksQueueOptions')
8735b4e5
JB
507 ).toThrowError(
508 new TypeError('Invalid tasks queue options: must be a plain object')
509 )
a20f0ba5 510 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
8735b4e5
JB
511 new Error(
512 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
513 )
514 )
515 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
516 new Error(
517 'Invalid worker tasks concurrency: -1 is a negative integer or zero'
518 )
a20f0ba5 519 )
f0d7f803 520 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
8735b4e5 521 new TypeError('Invalid worker tasks concurrency: must be an integer')
f0d7f803 522 )
a20f0ba5
JB
523 await pool.destroy()
524 })
525
6b27d407
JB
526 it('Verify that pool info is set', async () => {
527 let pool = new FixedThreadPool(
528 numberOfWorkers,
529 './tests/worker-files/thread/testWorker.js'
530 )
2dca6cab
JB
531 expect(pool.info).toStrictEqual({
532 version,
533 type: PoolTypes.fixed,
534 worker: WorkerTypes.thread,
535 ready: true,
536 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
537 minSize: numberOfWorkers,
538 maxSize: numberOfWorkers,
539 workerNodes: numberOfWorkers,
540 idleWorkerNodes: numberOfWorkers,
541 busyWorkerNodes: 0,
542 executedTasks: 0,
543 executingTasks: 0,
2dca6cab
JB
544 failedTasks: 0
545 })
6b27d407
JB
546 await pool.destroy()
547 pool = new DynamicClusterPool(
2431bdb4 548 Math.floor(numberOfWorkers / 2),
6b27d407 549 numberOfWorkers,
ecdfbdc0 550 './tests/worker-files/cluster/testWorker.js'
6b27d407 551 )
2dca6cab
JB
552 expect(pool.info).toStrictEqual({
553 version,
554 type: PoolTypes.dynamic,
555 worker: WorkerTypes.cluster,
556 ready: true,
557 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
558 minSize: Math.floor(numberOfWorkers / 2),
559 maxSize: numberOfWorkers,
560 workerNodes: Math.floor(numberOfWorkers / 2),
561 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
562 busyWorkerNodes: 0,
563 executedTasks: 0,
564 executingTasks: 0,
2dca6cab
JB
565 failedTasks: 0
566 })
6b27d407
JB
567 await pool.destroy()
568 })
569
2431bdb4 570 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
571 const pool = new FixedClusterPool(
572 numberOfWorkers,
573 './tests/worker-files/cluster/testWorker.js'
574 )
f06e48d8 575 for (const workerNode of pool.workerNodes) {
465b2940 576 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
577 tasks: {
578 executed: 0,
579 executing: 0,
580 queued: 0,
df593701 581 maxQueued: 0,
a4e07f72
JB
582 failed: 0
583 },
584 runTime: {
a4e07f72
JB
585 history: expect.any(CircularArray)
586 },
587 waitTime: {
a4e07f72
JB
588 history: expect.any(CircularArray)
589 },
5df69fab
JB
590 elu: {
591 idle: {
5df69fab
JB
592 history: expect.any(CircularArray)
593 },
594 active: {
5df69fab 595 history: expect.any(CircularArray)
f7510105 596 }
5df69fab 597 }
86bf340d 598 })
f06e48d8
JB
599 }
600 await pool.destroy()
601 })
602
2431bdb4
JB
603 it('Verify that pool worker tasks queue are initialized', async () => {
604 let pool = new FixedClusterPool(
f06e48d8
JB
605 numberOfWorkers,
606 './tests/worker-files/cluster/testWorker.js'
607 )
608 for (const workerNode of pool.workerNodes) {
609 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 610 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 611 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 612 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 613 }
fd7ebd49 614 await pool.destroy()
2431bdb4
JB
615 pool = new DynamicThreadPool(
616 Math.floor(numberOfWorkers / 2),
617 numberOfWorkers,
618 './tests/worker-files/thread/testWorker.js'
619 )
620 for (const workerNode of pool.workerNodes) {
621 expect(workerNode.tasksQueue).toBeDefined()
622 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
623 expect(workerNode.tasksQueue.size).toBe(0)
624 expect(workerNode.tasksQueue.maxSize).toBe(0)
625 }
626 })
627
628 it('Verify that pool worker info are initialized', async () => {
629 let pool = new FixedClusterPool(
630 numberOfWorkers,
631 './tests/worker-files/cluster/testWorker.js'
632 )
2dca6cab
JB
633 for (const workerNode of pool.workerNodes) {
634 expect(workerNode.info).toStrictEqual({
635 id: expect.any(Number),
636 type: WorkerTypes.cluster,
637 dynamic: false,
638 ready: true
639 })
640 }
2431bdb4
JB
641 await pool.destroy()
642 pool = new DynamicThreadPool(
643 Math.floor(numberOfWorkers / 2),
644 numberOfWorkers,
645 './tests/worker-files/thread/testWorker.js'
646 )
2dca6cab
JB
647 for (const workerNode of pool.workerNodes) {
648 expect(workerNode.info).toStrictEqual({
649 id: expect.any(Number),
650 type: WorkerTypes.thread,
651 dynamic: false,
7884d183 652 ready: true
2dca6cab
JB
653 })
654 }
bf9549ae
JB
655 })
656
2431bdb4 657 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
658 const pool = new FixedClusterPool(
659 numberOfWorkers,
660 './tests/worker-files/cluster/testWorker.js'
661 )
09c2d0d3 662 const promises = new Set()
fc027381
JB
663 const maxMultiplier = 2
664 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 665 promises.add(pool.execute())
bf9549ae 666 }
f06e48d8 667 for (const workerNode of pool.workerNodes) {
465b2940 668 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
669 tasks: {
670 executed: 0,
671 executing: maxMultiplier,
672 queued: 0,
df593701 673 maxQueued: 0,
a4e07f72
JB
674 failed: 0
675 },
676 runTime: {
a4e07f72
JB
677 history: expect.any(CircularArray)
678 },
679 waitTime: {
a4e07f72
JB
680 history: expect.any(CircularArray)
681 },
5df69fab
JB
682 elu: {
683 idle: {
5df69fab
JB
684 history: expect.any(CircularArray)
685 },
686 active: {
5df69fab 687 history: expect.any(CircularArray)
f7510105 688 }
5df69fab 689 }
86bf340d 690 })
bf9549ae
JB
691 }
692 await Promise.all(promises)
f06e48d8 693 for (const workerNode of pool.workerNodes) {
465b2940 694 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
695 tasks: {
696 executed: maxMultiplier,
697 executing: 0,
698 queued: 0,
df593701 699 maxQueued: 0,
a4e07f72
JB
700 failed: 0
701 },
702 runTime: {
a4e07f72
JB
703 history: expect.any(CircularArray)
704 },
705 waitTime: {
a4e07f72
JB
706 history: expect.any(CircularArray)
707 },
5df69fab
JB
708 elu: {
709 idle: {
5df69fab
JB
710 history: expect.any(CircularArray)
711 },
712 active: {
5df69fab 713 history: expect.any(CircularArray)
f7510105 714 }
5df69fab 715 }
86bf340d 716 })
bf9549ae 717 }
fd7ebd49 718 await pool.destroy()
bf9549ae
JB
719 })
720
2431bdb4 721 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 722 const pool = new DynamicThreadPool(
2431bdb4 723 Math.floor(numberOfWorkers / 2),
8f4878b7 724 numberOfWorkers,
9e619829
JB
725 './tests/worker-files/thread/testWorker.js'
726 )
09c2d0d3 727 const promises = new Set()
ee9f5295
JB
728 const maxMultiplier = 2
729 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 730 promises.add(pool.execute())
9e619829
JB
731 }
732 await Promise.all(promises)
f06e48d8 733 for (const workerNode of pool.workerNodes) {
465b2940 734 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
735 tasks: {
736 executed: expect.any(Number),
737 executing: 0,
738 queued: 0,
df593701 739 maxQueued: 0,
a4e07f72
JB
740 failed: 0
741 },
742 runTime: {
a4e07f72
JB
743 history: expect.any(CircularArray)
744 },
745 waitTime: {
a4e07f72
JB
746 history: expect.any(CircularArray)
747 },
5df69fab
JB
748 elu: {
749 idle: {
5df69fab
JB
750 history: expect.any(CircularArray)
751 },
752 active: {
5df69fab 753 history: expect.any(CircularArray)
f7510105 754 }
5df69fab 755 }
86bf340d 756 })
465b2940 757 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
b56c2ee5 758 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
b97d82d8
JB
759 expect(workerNode.usage.runTime.history.length).toBe(0)
760 expect(workerNode.usage.waitTime.history.length).toBe(0)
761 expect(workerNode.usage.elu.idle.history.length).toBe(0)
762 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
763 }
764 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 765 for (const workerNode of pool.workerNodes) {
465b2940 766 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
767 tasks: {
768 executed: 0,
769 executing: 0,
770 queued: 0,
df593701 771 maxQueued: 0,
a4e07f72
JB
772 failed: 0
773 },
774 runTime: {
a4e07f72
JB
775 history: expect.any(CircularArray)
776 },
777 waitTime: {
a4e07f72
JB
778 history: expect.any(CircularArray)
779 },
5df69fab
JB
780 elu: {
781 idle: {
5df69fab
JB
782 history: expect.any(CircularArray)
783 },
784 active: {
5df69fab 785 history: expect.any(CircularArray)
f7510105 786 }
5df69fab 787 }
86bf340d 788 })
465b2940
JB
789 expect(workerNode.usage.runTime.history.length).toBe(0)
790 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
791 expect(workerNode.usage.elu.idle.history.length).toBe(0)
792 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 793 }
fd7ebd49 794 await pool.destroy()
ee11a4a2
JB
795 })
796
a1763c54
JB
797 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
798 const pool = new DynamicClusterPool(
2431bdb4 799 Math.floor(numberOfWorkers / 2),
164d950a 800 numberOfWorkers,
a1763c54 801 './tests/worker-files/cluster/testWorker.js'
164d950a 802 )
d46660cd 803 let poolInfo
a1763c54
JB
804 let poolReady = 0
805 pool.emitter.on(PoolEvents.ready, (info) => {
806 ++poolReady
d46660cd
JB
807 poolInfo = info
808 })
a1763c54
JB
809 await waitPoolEvents(pool, PoolEvents.ready, 1)
810 expect(poolReady).toBe(1)
d46660cd 811 expect(poolInfo).toStrictEqual({
23ccf9d7 812 version,
d46660cd 813 type: PoolTypes.dynamic,
a1763c54
JB
814 worker: WorkerTypes.cluster,
815 ready: true,
2431bdb4
JB
816 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
817 minSize: expect.any(Number),
818 maxSize: expect.any(Number),
819 workerNodes: expect.any(Number),
820 idleWorkerNodes: expect.any(Number),
821 busyWorkerNodes: expect.any(Number),
822 executedTasks: expect.any(Number),
823 executingTasks: expect.any(Number),
2431bdb4
JB
824 failedTasks: expect.any(Number)
825 })
826 await pool.destroy()
827 })
828
a1763c54
JB
829 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
830 const pool = new FixedThreadPool(
2431bdb4 831 numberOfWorkers,
a1763c54 832 './tests/worker-files/thread/testWorker.js'
2431bdb4 833 )
a1763c54
JB
834 const promises = new Set()
835 let poolBusy = 0
2431bdb4 836 let poolInfo
a1763c54
JB
837 pool.emitter.on(PoolEvents.busy, (info) => {
838 ++poolBusy
2431bdb4
JB
839 poolInfo = info
840 })
a1763c54
JB
841 for (let i = 0; i < numberOfWorkers * 2; i++) {
842 promises.add(pool.execute())
843 }
844 await Promise.all(promises)
845 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
846 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
847 expect(poolBusy).toBe(numberOfWorkers + 1)
2431bdb4
JB
848 expect(poolInfo).toStrictEqual({
849 version,
a1763c54
JB
850 type: PoolTypes.fixed,
851 worker: WorkerTypes.thread,
852 ready: expect.any(Boolean),
2431bdb4 853 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
854 minSize: expect.any(Number),
855 maxSize: expect.any(Number),
856 workerNodes: expect.any(Number),
857 idleWorkerNodes: expect.any(Number),
858 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
859 executedTasks: expect.any(Number),
860 executingTasks: expect.any(Number),
a4e07f72 861 failedTasks: expect.any(Number)
d46660cd 862 })
164d950a
JB
863 await pool.destroy()
864 })
865
a1763c54
JB
866 it("Verify that pool event emitter 'full' event can register a callback", async () => {
867 const pool = new DynamicThreadPool(
868 Math.floor(numberOfWorkers / 2),
7c0ba920
JB
869 numberOfWorkers,
870 './tests/worker-files/thread/testWorker.js'
871 )
09c2d0d3 872 const promises = new Set()
a1763c54 873 let poolFull = 0
d46660cd 874 let poolInfo
a1763c54
JB
875 pool.emitter.on(PoolEvents.full, (info) => {
876 ++poolFull
d46660cd
JB
877 poolInfo = info
878 })
7c0ba920 879 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 880 promises.add(pool.execute())
7c0ba920 881 }
cf597bc5 882 await Promise.all(promises)
33e6bb4c 883 expect(poolFull).toBe(1)
d46660cd 884 expect(poolInfo).toStrictEqual({
23ccf9d7 885 version,
a1763c54 886 type: PoolTypes.dynamic,
d46660cd 887 worker: WorkerTypes.thread,
2431bdb4
JB
888 ready: expect.any(Boolean),
889 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
8735b4e5
JB
890 minSize: expect.any(Number),
891 maxSize: expect.any(Number),
892 workerNodes: expect.any(Number),
893 idleWorkerNodes: expect.any(Number),
894 busyWorkerNodes: expect.any(Number),
895 executedTasks: expect.any(Number),
896 executingTasks: expect.any(Number),
897 failedTasks: expect.any(Number)
898 })
899 await pool.destroy()
900 })
901
902 it.skip("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
903 const pool = new DynamicThreadPool(
904 Math.floor(numberOfWorkers / 2),
905 numberOfWorkers,
906 './tests/worker-files/thread/testWorker.js',
907 {
908 enableTasksQueue: true
909 }
910 )
911 const promises = new Set()
912 let poolBackPressure = 0
913 let poolInfo
914 pool.emitter.on(PoolEvents.backPressure, (info) => {
915 ++poolBackPressure
916 poolInfo = info
917 })
918 for (let i = 0; i < Math.pow(numberOfWorkers, 2); i++) {
919 promises.add(pool.execute())
920 }
921 await Promise.all(promises)
922 expect(poolBackPressure).toBe(1)
923 expect(poolInfo).toStrictEqual({
924 version,
925 type: PoolTypes.dynamic,
926 worker: WorkerTypes.thread,
927 ready: expect.any(Boolean),
928 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
929 minSize: expect.any(Number),
930 maxSize: expect.any(Number),
931 workerNodes: expect.any(Number),
932 idleWorkerNodes: expect.any(Number),
933 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
934 executedTasks: expect.any(Number),
935 executingTasks: expect.any(Number),
a4e07f72 936 failedTasks: expect.any(Number)
d46660cd 937 })
fd7ebd49 938 await pool.destroy()
7c0ba920 939 })
70a4f5ea 940
90d7d101
JB
941 it('Verify that listTaskFunctions() is working', async () => {
942 const dynamicThreadPool = new DynamicThreadPool(
943 Math.floor(numberOfWorkers / 2),
944 numberOfWorkers,
945 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
946 )
947 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
948 expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
949 'default',
950 'jsonIntegerSerialization',
951 'factorial',
952 'fibonacci'
953 ])
954 const fixedClusterPool = new FixedClusterPool(
955 numberOfWorkers,
956 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
957 )
958 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
959 expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
960 'default',
961 'jsonIntegerSerialization',
962 'factorial',
963 'fibonacci'
964 ])
965 })
966
967 it('Verify that multiple task functions worker is working', async () => {
70a4f5ea 968 const pool = new DynamicClusterPool(
2431bdb4 969 Math.floor(numberOfWorkers / 2),
70a4f5ea 970 numberOfWorkers,
90d7d101 971 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
70a4f5ea
JB
972 )
973 const data = { n: 10 }
82888165 974 const result0 = await pool.execute(data)
30b963d4 975 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 976 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 977 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
978 const result2 = await pool.execute(data, 'factorial')
979 expect(result2).toBe(3628800)
980 const result3 = await pool.execute(data, 'fibonacci')
024daf59 981 expect(result3).toBe(55)
5bb5be17
JB
982 expect(pool.info.executingTasks).toBe(0)
983 expect(pool.info.executedTasks).toBe(4)
b414b84c
JB
984 for (const workerNode of pool.workerNodes) {
985 expect(workerNode.info.taskFunctions).toStrictEqual([
986 'default',
987 'jsonIntegerSerialization',
988 'factorial',
989 'fibonacci'
990 ])
991 expect(workerNode.taskFunctionsUsage.size).toBe(3)
992 for (const name of pool.listTaskFunctions()) {
5bb5be17
JB
993 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
994 tasks: {
995 executed: expect.any(Number),
996 executing: expect.any(Number),
997 failed: 0,
998 queued: 0
999 },
1000 runTime: {
1001 history: expect.any(CircularArray)
1002 },
1003 waitTime: {
1004 history: expect.any(CircularArray)
1005 },
1006 elu: {
1007 idle: {
1008 history: expect.any(CircularArray)
1009 },
1010 active: {
1011 history: expect.any(CircularArray)
1012 }
1013 }
1014 })
1015 expect(
1016 workerNode.getTaskFunctionWorkerUsage(name).tasks.executing
1017 ).toBeGreaterThanOrEqual(0)
1018 }
1019 }
70a4f5ea 1020 })
3ec964d6 1021})