fix: ensure worker node cannot be instantiaed without proper arguments
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 PoolEvents,
8 PoolTypes,
9 WorkerChoiceStrategies,
10 WorkerTypes
11 } = require('../../../lib')
12 const { CircularArray } = require('../../../lib/circular-array')
13 const { Queue } = require('../../../lib/queue')
14 const { version } = require('../../../package.json')
15 const { waitPoolEvents } = require('../../test-utils')
16
17 describe('Abstract pool test suite', () => {
18 const numberOfWorkers = 2
19 class StubPoolWithIsMain extends FixedThreadPool {
20 isMain () {
21 return false
22 }
23 }
24
25 it('Simulate pool creation from a non main thread/process', () => {
26 expect(
27 () =>
28 new StubPoolWithIsMain(
29 numberOfWorkers,
30 './tests/worker-files/thread/testWorker.js',
31 {
32 errorHandler: (e) => console.error(e)
33 }
34 )
35 ).toThrowError(
36 'Cannot start a pool from a worker with the same type as the pool'
37 )
38 })
39
40 it('Verify that filePath is checked', () => {
41 const expectedError = new Error(
42 'Please specify a file with a worker implementation'
43 )
44 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
45 expectedError
46 )
47 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
48 expectedError
49 )
50 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
51 expectedError
52 )
53 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
54 expectedError
55 )
56 expect(
57 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
58 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
59 })
60
61 it('Verify that numberOfWorkers is checked', () => {
62 expect(() => new FixedThreadPool()).toThrowError(
63 'Cannot instantiate a pool without specifying the number of workers'
64 )
65 })
66
67 it('Verify that a negative number of workers is checked', () => {
68 expect(
69 () =>
70 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
71 ).toThrowError(
72 new RangeError(
73 'Cannot instantiate a pool with a negative number of workers'
74 )
75 )
76 })
77
78 it('Verify that a non integer number of workers is checked', () => {
79 expect(
80 () =>
81 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
82 ).toThrowError(
83 new TypeError(
84 'Cannot instantiate a pool with a non safe integer number of workers'
85 )
86 )
87 })
88
89 it('Verify that dynamic pool sizing is checked', () => {
90 expect(
91 () =>
92 new DynamicClusterPool(
93 1,
94 undefined,
95 './tests/worker-files/cluster/testWorker.js'
96 )
97 ).toThrowError(
98 new TypeError(
99 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
100 )
101 )
102 expect(
103 () =>
104 new DynamicThreadPool(
105 0.5,
106 1,
107 './tests/worker-files/thread/testWorker.js'
108 )
109 ).toThrowError(
110 new TypeError(
111 'Cannot instantiate a pool with a non safe integer number of workers'
112 )
113 )
114 expect(
115 () =>
116 new DynamicClusterPool(
117 0,
118 0.5,
119 './tests/worker-files/cluster/testWorker.js'
120 )
121 ).toThrowError(
122 new TypeError(
123 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
124 )
125 )
126 expect(
127 () =>
128 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
129 ).toThrowError(
130 new RangeError(
131 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
132 )
133 )
134 expect(
135 () =>
136 new DynamicClusterPool(
137 1,
138 1,
139 './tests/worker-files/cluster/testWorker.js'
140 )
141 ).toThrowError(
142 new RangeError(
143 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
144 )
145 )
146 expect(
147 () =>
148 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
149 ).toThrowError(
150 new RangeError(
151 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
152 )
153 )
154 })
155
156 it('Verify that pool options are checked', async () => {
157 let pool = new FixedThreadPool(
158 numberOfWorkers,
159 './tests/worker-files/thread/testWorker.js'
160 )
161 expect(pool.emitter).toBeDefined()
162 expect(pool.opts.enableEvents).toBe(true)
163 expect(pool.opts.restartWorkerOnError).toBe(true)
164 expect(pool.opts.enableTasksQueue).toBe(false)
165 expect(pool.opts.tasksQueueOptions).toBeUndefined()
166 expect(pool.opts.workerChoiceStrategy).toBe(
167 WorkerChoiceStrategies.ROUND_ROBIN
168 )
169 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
170 choiceRetries: 6,
171 runTime: { median: false },
172 waitTime: { median: false },
173 elu: { median: false }
174 })
175 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
176 choiceRetries: 6,
177 runTime: { median: false },
178 waitTime: { median: false },
179 elu: { median: false }
180 })
181 expect(pool.opts.messageHandler).toBeUndefined()
182 expect(pool.opts.errorHandler).toBeUndefined()
183 expect(pool.opts.onlineHandler).toBeUndefined()
184 expect(pool.opts.exitHandler).toBeUndefined()
185 await pool.destroy()
186 const testHandler = () => console.info('test handler executed')
187 pool = new FixedThreadPool(
188 numberOfWorkers,
189 './tests/worker-files/thread/testWorker.js',
190 {
191 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
192 workerChoiceStrategyOptions: {
193 runTime: { median: true },
194 weights: { 0: 300, 1: 200 }
195 },
196 enableEvents: false,
197 restartWorkerOnError: false,
198 enableTasksQueue: true,
199 tasksQueueOptions: { concurrency: 2 },
200 messageHandler: testHandler,
201 errorHandler: testHandler,
202 onlineHandler: testHandler,
203 exitHandler: testHandler
204 }
205 )
206 expect(pool.emitter).toBeUndefined()
207 expect(pool.opts.enableEvents).toBe(false)
208 expect(pool.opts.restartWorkerOnError).toBe(false)
209 expect(pool.opts.enableTasksQueue).toBe(true)
210 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
211 expect(pool.opts.workerChoiceStrategy).toBe(
212 WorkerChoiceStrategies.LEAST_USED
213 )
214 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
215 choiceRetries: 6,
216 runTime: { median: true },
217 waitTime: { median: false },
218 elu: { median: false },
219 weights: { 0: 300, 1: 200 }
220 })
221 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
222 choiceRetries: 6,
223 runTime: { median: true },
224 waitTime: { median: false },
225 elu: { median: false },
226 weights: { 0: 300, 1: 200 }
227 })
228 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
229 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
230 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
231 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
232 await pool.destroy()
233 })
234
235 it('Verify that pool options are validated', async () => {
236 expect(
237 () =>
238 new FixedThreadPool(
239 numberOfWorkers,
240 './tests/worker-files/thread/testWorker.js',
241 {
242 workerChoiceStrategy: 'invalidStrategy'
243 }
244 )
245 ).toThrowError(
246 new Error("Invalid worker choice strategy 'invalidStrategy'")
247 )
248 expect(
249 () =>
250 new FixedThreadPool(
251 numberOfWorkers,
252 './tests/worker-files/thread/testWorker.js',
253 {
254 workerChoiceStrategyOptions: { weights: {} }
255 }
256 )
257 ).toThrowError(
258 new Error(
259 'Invalid worker choice strategy options: must have a weight for each worker node'
260 )
261 )
262 expect(
263 () =>
264 new FixedThreadPool(
265 numberOfWorkers,
266 './tests/worker-files/thread/testWorker.js',
267 {
268 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
269 }
270 )
271 ).toThrowError(
272 new Error(
273 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
274 )
275 )
276 expect(
277 () =>
278 new FixedThreadPool(
279 numberOfWorkers,
280 './tests/worker-files/thread/testWorker.js',
281 {
282 enableTasksQueue: true,
283 tasksQueueOptions: { concurrency: 0 }
284 }
285 )
286 ).toThrowError(
287 new TypeError(
288 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
289 )
290 )
291 expect(
292 () =>
293 new FixedThreadPool(
294 numberOfWorkers,
295 './tests/worker-files/thread/testWorker.js',
296 {
297 enableTasksQueue: true,
298 tasksQueueOptions: 'invalidTasksQueueOptions'
299 }
300 )
301 ).toThrowError(
302 new TypeError('Invalid tasks queue options: must be a plain object')
303 )
304 expect(
305 () =>
306 new FixedThreadPool(
307 numberOfWorkers,
308 './tests/worker-files/thread/testWorker.js',
309 {
310 enableTasksQueue: true,
311 tasksQueueOptions: { concurrency: 0.2 }
312 }
313 )
314 ).toThrowError(
315 new TypeError('Invalid worker tasks concurrency: must be an integer')
316 )
317 })
318
319 it('Verify that pool worker choice strategy options can be set', async () => {
320 const pool = new FixedThreadPool(
321 numberOfWorkers,
322 './tests/worker-files/thread/testWorker.js',
323 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
324 )
325 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
326 choiceRetries: 6,
327 runTime: { median: false },
328 waitTime: { median: false },
329 elu: { median: false }
330 })
331 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
332 choiceRetries: 6,
333 runTime: { median: false },
334 waitTime: { median: false },
335 elu: { median: false }
336 })
337 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
338 .workerChoiceStrategies) {
339 expect(workerChoiceStrategy.opts).toStrictEqual({
340 choiceRetries: 6,
341 runTime: { median: false },
342 waitTime: { median: false },
343 elu: { median: false }
344 })
345 }
346 expect(
347 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
348 ).toStrictEqual({
349 runTime: {
350 aggregate: true,
351 average: true,
352 median: false
353 },
354 waitTime: {
355 aggregate: false,
356 average: false,
357 median: false
358 },
359 elu: {
360 aggregate: true,
361 average: true,
362 median: false
363 }
364 })
365 pool.setWorkerChoiceStrategyOptions({
366 runTime: { median: true },
367 elu: { median: true }
368 })
369 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
370 choiceRetries: 6,
371 runTime: { median: true },
372 waitTime: { median: false },
373 elu: { median: true }
374 })
375 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
376 choiceRetries: 6,
377 runTime: { median: true },
378 waitTime: { median: false },
379 elu: { median: true }
380 })
381 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
382 .workerChoiceStrategies) {
383 expect(workerChoiceStrategy.opts).toStrictEqual({
384 choiceRetries: 6,
385 runTime: { median: true },
386 waitTime: { median: false },
387 elu: { median: true }
388 })
389 }
390 expect(
391 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
392 ).toStrictEqual({
393 runTime: {
394 aggregate: true,
395 average: false,
396 median: true
397 },
398 waitTime: {
399 aggregate: false,
400 average: false,
401 median: false
402 },
403 elu: {
404 aggregate: true,
405 average: false,
406 median: true
407 }
408 })
409 pool.setWorkerChoiceStrategyOptions({
410 runTime: { median: false },
411 elu: { median: false }
412 })
413 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
414 choiceRetries: 6,
415 runTime: { median: false },
416 waitTime: { median: false },
417 elu: { median: false }
418 })
419 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
420 choiceRetries: 6,
421 runTime: { median: false },
422 waitTime: { median: false },
423 elu: { median: false }
424 })
425 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
426 .workerChoiceStrategies) {
427 expect(workerChoiceStrategy.opts).toStrictEqual({
428 choiceRetries: 6,
429 runTime: { median: false },
430 waitTime: { median: false },
431 elu: { median: false }
432 })
433 }
434 expect(
435 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
436 ).toStrictEqual({
437 runTime: {
438 aggregate: true,
439 average: true,
440 median: false
441 },
442 waitTime: {
443 aggregate: false,
444 average: false,
445 median: false
446 },
447 elu: {
448 aggregate: true,
449 average: true,
450 median: false
451 }
452 })
453 expect(() =>
454 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
455 ).toThrowError(
456 new TypeError(
457 'Invalid worker choice strategy options: must be a plain object'
458 )
459 )
460 expect(() =>
461 pool.setWorkerChoiceStrategyOptions({ weights: {} })
462 ).toThrowError(
463 new Error(
464 'Invalid worker choice strategy options: must have a weight for each worker node'
465 )
466 )
467 expect(() =>
468 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
469 ).toThrowError(
470 new Error(
471 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
472 )
473 )
474 await pool.destroy()
475 })
476
477 it('Verify that pool tasks queue can be enabled/disabled', async () => {
478 const pool = new FixedThreadPool(
479 numberOfWorkers,
480 './tests/worker-files/thread/testWorker.js'
481 )
482 expect(pool.opts.enableTasksQueue).toBe(false)
483 expect(pool.opts.tasksQueueOptions).toBeUndefined()
484 pool.enableTasksQueue(true)
485 expect(pool.opts.enableTasksQueue).toBe(true)
486 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
487 pool.enableTasksQueue(true, { concurrency: 2 })
488 expect(pool.opts.enableTasksQueue).toBe(true)
489 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
490 pool.enableTasksQueue(false)
491 expect(pool.opts.enableTasksQueue).toBe(false)
492 expect(pool.opts.tasksQueueOptions).toBeUndefined()
493 await pool.destroy()
494 })
495
496 it('Verify that pool tasks queue options can be set', async () => {
497 const pool = new FixedThreadPool(
498 numberOfWorkers,
499 './tests/worker-files/thread/testWorker.js',
500 { enableTasksQueue: true }
501 )
502 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
503 pool.setTasksQueueOptions({ concurrency: 2 })
504 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
505 expect(() =>
506 pool.setTasksQueueOptions('invalidTasksQueueOptions')
507 ).toThrowError(
508 new TypeError('Invalid tasks queue options: must be a plain object')
509 )
510 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
511 new Error(
512 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
513 )
514 )
515 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
516 new Error(
517 'Invalid worker tasks concurrency: -1 is a negative integer or zero'
518 )
519 )
520 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
521 new TypeError('Invalid worker tasks concurrency: must be an integer')
522 )
523 await pool.destroy()
524 })
525
526 it('Verify that pool info is set', async () => {
527 let pool = new FixedThreadPool(
528 numberOfWorkers,
529 './tests/worker-files/thread/testWorker.js'
530 )
531 expect(pool.info).toStrictEqual({
532 version,
533 type: PoolTypes.fixed,
534 worker: WorkerTypes.thread,
535 ready: true,
536 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
537 minSize: numberOfWorkers,
538 maxSize: numberOfWorkers,
539 workerNodes: numberOfWorkers,
540 idleWorkerNodes: numberOfWorkers,
541 busyWorkerNodes: 0,
542 executedTasks: 0,
543 executingTasks: 0,
544 failedTasks: 0
545 })
546 await pool.destroy()
547 pool = new DynamicClusterPool(
548 Math.floor(numberOfWorkers / 2),
549 numberOfWorkers,
550 './tests/worker-files/cluster/testWorker.js'
551 )
552 expect(pool.info).toStrictEqual({
553 version,
554 type: PoolTypes.dynamic,
555 worker: WorkerTypes.cluster,
556 ready: true,
557 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
558 minSize: Math.floor(numberOfWorkers / 2),
559 maxSize: numberOfWorkers,
560 workerNodes: Math.floor(numberOfWorkers / 2),
561 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
562 busyWorkerNodes: 0,
563 executedTasks: 0,
564 executingTasks: 0,
565 failedTasks: 0
566 })
567 await pool.destroy()
568 })
569
570 it('Verify that pool worker tasks usage are initialized', async () => {
571 const pool = new FixedClusterPool(
572 numberOfWorkers,
573 './tests/worker-files/cluster/testWorker.js'
574 )
575 for (const workerNode of pool.workerNodes) {
576 expect(workerNode.usage).toStrictEqual({
577 tasks: {
578 executed: 0,
579 executing: 0,
580 queued: 0,
581 maxQueued: 0,
582 failed: 0
583 },
584 runTime: {
585 history: expect.any(CircularArray)
586 },
587 waitTime: {
588 history: expect.any(CircularArray)
589 },
590 elu: {
591 idle: {
592 history: expect.any(CircularArray)
593 },
594 active: {
595 history: expect.any(CircularArray)
596 }
597 }
598 })
599 }
600 await pool.destroy()
601 })
602
603 it('Verify that pool worker tasks queue are initialized', async () => {
604 let pool = new FixedClusterPool(
605 numberOfWorkers,
606 './tests/worker-files/cluster/testWorker.js'
607 )
608 for (const workerNode of pool.workerNodes) {
609 expect(workerNode.tasksQueue).toBeDefined()
610 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
611 expect(workerNode.tasksQueue.size).toBe(0)
612 expect(workerNode.tasksQueue.maxSize).toBe(0)
613 }
614 await pool.destroy()
615 pool = new DynamicThreadPool(
616 Math.floor(numberOfWorkers / 2),
617 numberOfWorkers,
618 './tests/worker-files/thread/testWorker.js'
619 )
620 for (const workerNode of pool.workerNodes) {
621 expect(workerNode.tasksQueue).toBeDefined()
622 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
623 expect(workerNode.tasksQueue.size).toBe(0)
624 expect(workerNode.tasksQueue.maxSize).toBe(0)
625 }
626 })
627
628 it('Verify that pool worker info are initialized', async () => {
629 let pool = new FixedClusterPool(
630 numberOfWorkers,
631 './tests/worker-files/cluster/testWorker.js'
632 )
633 for (const workerNode of pool.workerNodes) {
634 expect(workerNode.info).toStrictEqual({
635 id: expect.any(Number),
636 type: WorkerTypes.cluster,
637 dynamic: false,
638 ready: true
639 })
640 }
641 await pool.destroy()
642 pool = new DynamicThreadPool(
643 Math.floor(numberOfWorkers / 2),
644 numberOfWorkers,
645 './tests/worker-files/thread/testWorker.js'
646 )
647 for (const workerNode of pool.workerNodes) {
648 expect(workerNode.info).toStrictEqual({
649 id: expect.any(Number),
650 type: WorkerTypes.thread,
651 dynamic: false,
652 ready: true
653 })
654 }
655 })
656
657 it('Verify that pool worker tasks usage are computed', async () => {
658 const pool = new FixedClusterPool(
659 numberOfWorkers,
660 './tests/worker-files/cluster/testWorker.js'
661 )
662 const promises = new Set()
663 const maxMultiplier = 2
664 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
665 promises.add(pool.execute())
666 }
667 for (const workerNode of pool.workerNodes) {
668 expect(workerNode.usage).toStrictEqual({
669 tasks: {
670 executed: 0,
671 executing: maxMultiplier,
672 queued: 0,
673 maxQueued: 0,
674 failed: 0
675 },
676 runTime: {
677 history: expect.any(CircularArray)
678 },
679 waitTime: {
680 history: expect.any(CircularArray)
681 },
682 elu: {
683 idle: {
684 history: expect.any(CircularArray)
685 },
686 active: {
687 history: expect.any(CircularArray)
688 }
689 }
690 })
691 }
692 await Promise.all(promises)
693 for (const workerNode of pool.workerNodes) {
694 expect(workerNode.usage).toStrictEqual({
695 tasks: {
696 executed: maxMultiplier,
697 executing: 0,
698 queued: 0,
699 maxQueued: 0,
700 failed: 0
701 },
702 runTime: {
703 history: expect.any(CircularArray)
704 },
705 waitTime: {
706 history: expect.any(CircularArray)
707 },
708 elu: {
709 idle: {
710 history: expect.any(CircularArray)
711 },
712 active: {
713 history: expect.any(CircularArray)
714 }
715 }
716 })
717 }
718 await pool.destroy()
719 })
720
721 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
722 const pool = new DynamicThreadPool(
723 Math.floor(numberOfWorkers / 2),
724 numberOfWorkers,
725 './tests/worker-files/thread/testWorker.js'
726 )
727 const promises = new Set()
728 const maxMultiplier = 2
729 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
730 promises.add(pool.execute())
731 }
732 await Promise.all(promises)
733 for (const workerNode of pool.workerNodes) {
734 expect(workerNode.usage).toStrictEqual({
735 tasks: {
736 executed: expect.any(Number),
737 executing: 0,
738 queued: 0,
739 maxQueued: 0,
740 failed: 0
741 },
742 runTime: {
743 history: expect.any(CircularArray)
744 },
745 waitTime: {
746 history: expect.any(CircularArray)
747 },
748 elu: {
749 idle: {
750 history: expect.any(CircularArray)
751 },
752 active: {
753 history: expect.any(CircularArray)
754 }
755 }
756 })
757 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
758 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
759 expect(workerNode.usage.runTime.history.length).toBe(0)
760 expect(workerNode.usage.waitTime.history.length).toBe(0)
761 expect(workerNode.usage.elu.idle.history.length).toBe(0)
762 expect(workerNode.usage.elu.active.history.length).toBe(0)
763 }
764 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
765 for (const workerNode of pool.workerNodes) {
766 expect(workerNode.usage).toStrictEqual({
767 tasks: {
768 executed: 0,
769 executing: 0,
770 queued: 0,
771 maxQueued: 0,
772 failed: 0
773 },
774 runTime: {
775 history: expect.any(CircularArray)
776 },
777 waitTime: {
778 history: expect.any(CircularArray)
779 },
780 elu: {
781 idle: {
782 history: expect.any(CircularArray)
783 },
784 active: {
785 history: expect.any(CircularArray)
786 }
787 }
788 })
789 expect(workerNode.usage.runTime.history.length).toBe(0)
790 expect(workerNode.usage.waitTime.history.length).toBe(0)
791 expect(workerNode.usage.elu.idle.history.length).toBe(0)
792 expect(workerNode.usage.elu.active.history.length).toBe(0)
793 }
794 await pool.destroy()
795 })
796
797 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
798 const pool = new DynamicClusterPool(
799 Math.floor(numberOfWorkers / 2),
800 numberOfWorkers,
801 './tests/worker-files/cluster/testWorker.js'
802 )
803 let poolInfo
804 let poolReady = 0
805 pool.emitter.on(PoolEvents.ready, (info) => {
806 ++poolReady
807 poolInfo = info
808 })
809 await waitPoolEvents(pool, PoolEvents.ready, 1)
810 expect(poolReady).toBe(1)
811 expect(poolInfo).toStrictEqual({
812 version,
813 type: PoolTypes.dynamic,
814 worker: WorkerTypes.cluster,
815 ready: true,
816 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
817 minSize: expect.any(Number),
818 maxSize: expect.any(Number),
819 workerNodes: expect.any(Number),
820 idleWorkerNodes: expect.any(Number),
821 busyWorkerNodes: expect.any(Number),
822 executedTasks: expect.any(Number),
823 executingTasks: expect.any(Number),
824 failedTasks: expect.any(Number)
825 })
826 await pool.destroy()
827 })
828
829 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
830 const pool = new FixedThreadPool(
831 numberOfWorkers,
832 './tests/worker-files/thread/testWorker.js'
833 )
834 const promises = new Set()
835 let poolBusy = 0
836 let poolInfo
837 pool.emitter.on(PoolEvents.busy, (info) => {
838 ++poolBusy
839 poolInfo = info
840 })
841 for (let i = 0; i < numberOfWorkers * 2; i++) {
842 promises.add(pool.execute())
843 }
844 await Promise.all(promises)
845 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
846 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
847 expect(poolBusy).toBe(numberOfWorkers + 1)
848 expect(poolInfo).toStrictEqual({
849 version,
850 type: PoolTypes.fixed,
851 worker: WorkerTypes.thread,
852 ready: expect.any(Boolean),
853 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
854 minSize: expect.any(Number),
855 maxSize: expect.any(Number),
856 workerNodes: expect.any(Number),
857 idleWorkerNodes: expect.any(Number),
858 busyWorkerNodes: expect.any(Number),
859 executedTasks: expect.any(Number),
860 executingTasks: expect.any(Number),
861 failedTasks: expect.any(Number)
862 })
863 await pool.destroy()
864 })
865
866 it("Verify that pool event emitter 'full' event can register a callback", async () => {
867 const pool = new DynamicThreadPool(
868 Math.floor(numberOfWorkers / 2),
869 numberOfWorkers,
870 './tests/worker-files/thread/testWorker.js'
871 )
872 const promises = new Set()
873 let poolFull = 0
874 let poolInfo
875 pool.emitter.on(PoolEvents.full, (info) => {
876 ++poolFull
877 poolInfo = info
878 })
879 for (let i = 0; i < numberOfWorkers * 2; i++) {
880 promises.add(pool.execute())
881 }
882 await Promise.all(promises)
883 expect(poolFull).toBe(1)
884 expect(poolInfo).toStrictEqual({
885 version,
886 type: PoolTypes.dynamic,
887 worker: WorkerTypes.thread,
888 ready: expect.any(Boolean),
889 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
890 minSize: expect.any(Number),
891 maxSize: expect.any(Number),
892 workerNodes: expect.any(Number),
893 idleWorkerNodes: expect.any(Number),
894 busyWorkerNodes: expect.any(Number),
895 executedTasks: expect.any(Number),
896 executingTasks: expect.any(Number),
897 failedTasks: expect.any(Number)
898 })
899 await pool.destroy()
900 })
901
902 it.skip("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
903 const pool = new DynamicThreadPool(
904 Math.floor(numberOfWorkers / 2),
905 numberOfWorkers,
906 './tests/worker-files/thread/testWorker.js',
907 {
908 enableTasksQueue: true
909 }
910 )
911 const promises = new Set()
912 let poolBackPressure = 0
913 let poolInfo
914 pool.emitter.on(PoolEvents.backPressure, (info) => {
915 ++poolBackPressure
916 poolInfo = info
917 })
918 for (let i = 0; i < Math.pow(numberOfWorkers, 2); i++) {
919 promises.add(pool.execute())
920 }
921 await Promise.all(promises)
922 expect(poolBackPressure).toBe(1)
923 expect(poolInfo).toStrictEqual({
924 version,
925 type: PoolTypes.dynamic,
926 worker: WorkerTypes.thread,
927 ready: expect.any(Boolean),
928 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
929 minSize: expect.any(Number),
930 maxSize: expect.any(Number),
931 workerNodes: expect.any(Number),
932 idleWorkerNodes: expect.any(Number),
933 busyWorkerNodes: expect.any(Number),
934 executedTasks: expect.any(Number),
935 executingTasks: expect.any(Number),
936 failedTasks: expect.any(Number)
937 })
938 await pool.destroy()
939 })
940
941 it('Verify that listTaskFunctions() is working', async () => {
942 const dynamicThreadPool = new DynamicThreadPool(
943 Math.floor(numberOfWorkers / 2),
944 numberOfWorkers,
945 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
946 )
947 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
948 expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
949 'default',
950 'jsonIntegerSerialization',
951 'factorial',
952 'fibonacci'
953 ])
954 const fixedClusterPool = new FixedClusterPool(
955 numberOfWorkers,
956 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
957 )
958 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
959 expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
960 'default',
961 'jsonIntegerSerialization',
962 'factorial',
963 'fibonacci'
964 ])
965 })
966
967 it('Verify that multiple task functions worker is working', async () => {
968 const pool = new DynamicClusterPool(
969 Math.floor(numberOfWorkers / 2),
970 numberOfWorkers,
971 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
972 )
973 const data = { n: 10 }
974 const result0 = await pool.execute(data)
975 expect(result0).toStrictEqual({ ok: 1 })
976 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
977 expect(result1).toStrictEqual({ ok: 1 })
978 const result2 = await pool.execute(data, 'factorial')
979 expect(result2).toBe(3628800)
980 const result3 = await pool.execute(data, 'fibonacci')
981 expect(result3).toBe(55)
982 expect(pool.info.executingTasks).toBe(0)
983 expect(pool.info.executedTasks).toBe(4)
984 for (const workerNode of pool.workerNodes) {
985 expect(workerNode.info.taskFunctions).toStrictEqual([
986 'default',
987 'jsonIntegerSerialization',
988 'factorial',
989 'fibonacci'
990 ])
991 expect(workerNode.taskFunctionsUsage.size).toBe(3)
992 for (const name of pool.listTaskFunctions()) {
993 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
994 tasks: {
995 executed: expect.any(Number),
996 executing: expect.any(Number),
997 failed: 0,
998 queued: 0
999 },
1000 runTime: {
1001 history: expect.any(CircularArray)
1002 },
1003 waitTime: {
1004 history: expect.any(CircularArray)
1005 },
1006 elu: {
1007 idle: {
1008 history: expect.any(CircularArray)
1009 },
1010 active: {
1011 history: expect.any(CircularArray)
1012 }
1013 }
1014 })
1015 expect(
1016 workerNode.getTaskFunctionWorkerUsage(name).tasks.executing
1017 ).toBeGreaterThanOrEqual(0)
1018 }
1019 }
1020 })
1021 })