fix: ensure pool event full is emitted only once
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 PoolEvents,
8 PoolTypes,
9 WorkerChoiceStrategies,
10 WorkerTypes
11 } = require('../../../lib')
12 const { CircularArray } = require('../../../lib/circular-array')
13 const { Queue } = require('../../../lib/queue')
14 const { version } = require('../../../package.json')
15 const { waitPoolEvents } = require('../../test-utils')
16
17 describe('Abstract pool test suite', () => {
18 const numberOfWorkers = 2
19 class StubPoolWithIsMain extends FixedThreadPool {
20 isMain () {
21 return false
22 }
23 }
24
25 it('Simulate pool creation from a non main thread/process', () => {
26 expect(
27 () =>
28 new StubPoolWithIsMain(
29 numberOfWorkers,
30 './tests/worker-files/thread/testWorker.js',
31 {
32 errorHandler: (e) => console.error(e)
33 }
34 )
35 ).toThrowError(
36 'Cannot start a pool from a worker with the same type as the pool'
37 )
38 })
39
40 it('Verify that filePath is checked', () => {
41 const expectedError = new Error(
42 'Please specify a file with a worker implementation'
43 )
44 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
45 expectedError
46 )
47 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
48 expectedError
49 )
50 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
51 expectedError
52 )
53 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
54 expectedError
55 )
56 expect(
57 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
58 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
59 })
60
61 it('Verify that numberOfWorkers is checked', () => {
62 expect(() => new FixedThreadPool()).toThrowError(
63 'Cannot instantiate a pool without specifying the number of workers'
64 )
65 })
66
67 it('Verify that a negative number of workers is checked', () => {
68 expect(
69 () =>
70 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
71 ).toThrowError(
72 new RangeError(
73 'Cannot instantiate a pool with a negative number of workers'
74 )
75 )
76 })
77
78 it('Verify that a non integer number of workers is checked', () => {
79 expect(
80 () =>
81 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
82 ).toThrowError(
83 new TypeError(
84 'Cannot instantiate a pool with a non safe integer number of workers'
85 )
86 )
87 })
88
89 it('Verify that dynamic pool sizing is checked', () => {
90 expect(
91 () =>
92 new DynamicClusterPool(
93 1,
94 undefined,
95 './tests/worker-files/cluster/testWorker.js'
96 )
97 ).toThrowError(
98 new TypeError(
99 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
100 )
101 )
102 expect(
103 () =>
104 new DynamicThreadPool(
105 0.5,
106 1,
107 './tests/worker-files/thread/testWorker.js'
108 )
109 ).toThrowError(
110 new TypeError(
111 'Cannot instantiate a pool with a non safe integer number of workers'
112 )
113 )
114 expect(
115 () =>
116 new DynamicClusterPool(
117 0,
118 0.5,
119 './tests/worker-files/cluster/testWorker.js'
120 )
121 ).toThrowError(
122 new TypeError(
123 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
124 )
125 )
126 expect(
127 () =>
128 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
129 ).toThrowError(
130 new RangeError(
131 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
132 )
133 )
134 expect(
135 () =>
136 new DynamicClusterPool(
137 1,
138 1,
139 './tests/worker-files/cluster/testWorker.js'
140 )
141 ).toThrowError(
142 new RangeError(
143 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
144 )
145 )
146 expect(
147 () =>
148 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
149 ).toThrowError(
150 new RangeError(
151 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
152 )
153 )
154 })
155
156 it('Verify that pool options are checked', async () => {
157 let pool = new FixedThreadPool(
158 numberOfWorkers,
159 './tests/worker-files/thread/testWorker.js'
160 )
161 expect(pool.emitter).toBeDefined()
162 expect(pool.opts.enableEvents).toBe(true)
163 expect(pool.opts.restartWorkerOnError).toBe(true)
164 expect(pool.opts.enableTasksQueue).toBe(false)
165 expect(pool.opts.tasksQueueOptions).toBeUndefined()
166 expect(pool.opts.workerChoiceStrategy).toBe(
167 WorkerChoiceStrategies.ROUND_ROBIN
168 )
169 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
170 choiceRetries: 6,
171 runTime: { median: false },
172 waitTime: { median: false },
173 elu: { median: false }
174 })
175 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
176 choiceRetries: 6,
177 runTime: { median: false },
178 waitTime: { median: false },
179 elu: { median: false }
180 })
181 expect(pool.opts.messageHandler).toBeUndefined()
182 expect(pool.opts.errorHandler).toBeUndefined()
183 expect(pool.opts.onlineHandler).toBeUndefined()
184 expect(pool.opts.exitHandler).toBeUndefined()
185 await pool.destroy()
186 const testHandler = () => console.info('test handler executed')
187 pool = new FixedThreadPool(
188 numberOfWorkers,
189 './tests/worker-files/thread/testWorker.js',
190 {
191 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
192 workerChoiceStrategyOptions: {
193 runTime: { median: true },
194 weights: { 0: 300, 1: 200 }
195 },
196 enableEvents: false,
197 restartWorkerOnError: false,
198 enableTasksQueue: true,
199 tasksQueueOptions: { concurrency: 2 },
200 messageHandler: testHandler,
201 errorHandler: testHandler,
202 onlineHandler: testHandler,
203 exitHandler: testHandler
204 }
205 )
206 expect(pool.emitter).toBeUndefined()
207 expect(pool.opts.enableEvents).toBe(false)
208 expect(pool.opts.restartWorkerOnError).toBe(false)
209 expect(pool.opts.enableTasksQueue).toBe(true)
210 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
211 expect(pool.opts.workerChoiceStrategy).toBe(
212 WorkerChoiceStrategies.LEAST_USED
213 )
214 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
215 choiceRetries: 6,
216 runTime: { median: true },
217 waitTime: { median: false },
218 elu: { median: false },
219 weights: { 0: 300, 1: 200 }
220 })
221 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
222 choiceRetries: 6,
223 runTime: { median: true },
224 waitTime: { median: false },
225 elu: { median: false },
226 weights: { 0: 300, 1: 200 }
227 })
228 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
229 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
230 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
231 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
232 await pool.destroy()
233 })
234
235 it('Verify that pool options are validated', async () => {
236 expect(
237 () =>
238 new FixedThreadPool(
239 numberOfWorkers,
240 './tests/worker-files/thread/testWorker.js',
241 {
242 workerChoiceStrategy: 'invalidStrategy'
243 }
244 )
245 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
246 expect(
247 () =>
248 new FixedThreadPool(
249 numberOfWorkers,
250 './tests/worker-files/thread/testWorker.js',
251 {
252 workerChoiceStrategyOptions: { weights: {} }
253 }
254 )
255 ).toThrowError(
256 'Invalid worker choice strategy options: must have a weight for each worker node'
257 )
258 expect(
259 () =>
260 new FixedThreadPool(
261 numberOfWorkers,
262 './tests/worker-files/thread/testWorker.js',
263 {
264 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
265 }
266 )
267 ).toThrowError(
268 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
269 )
270 expect(
271 () =>
272 new FixedThreadPool(
273 numberOfWorkers,
274 './tests/worker-files/thread/testWorker.js',
275 {
276 enableTasksQueue: true,
277 tasksQueueOptions: { concurrency: 0 }
278 }
279 )
280 ).toThrowError("Invalid worker tasks concurrency '0'")
281 expect(
282 () =>
283 new FixedThreadPool(
284 numberOfWorkers,
285 './tests/worker-files/thread/testWorker.js',
286 {
287 enableTasksQueue: true,
288 tasksQueueOptions: 'invalidTasksQueueOptions'
289 }
290 )
291 ).toThrowError('Invalid tasks queue options: must be a plain object')
292 expect(
293 () =>
294 new FixedThreadPool(
295 numberOfWorkers,
296 './tests/worker-files/thread/testWorker.js',
297 {
298 enableTasksQueue: true,
299 tasksQueueOptions: { concurrency: 0.2 }
300 }
301 )
302 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
303 })
304
305 it('Verify that pool worker choice strategy options can be set', async () => {
306 const pool = new FixedThreadPool(
307 numberOfWorkers,
308 './tests/worker-files/thread/testWorker.js',
309 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
310 )
311 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
312 choiceRetries: 6,
313 runTime: { median: false },
314 waitTime: { median: false },
315 elu: { median: false }
316 })
317 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
318 choiceRetries: 6,
319 runTime: { median: false },
320 waitTime: { median: false },
321 elu: { median: false }
322 })
323 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
324 .workerChoiceStrategies) {
325 expect(workerChoiceStrategy.opts).toStrictEqual({
326 choiceRetries: 6,
327 runTime: { median: false },
328 waitTime: { median: false },
329 elu: { median: false }
330 })
331 }
332 expect(
333 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
334 ).toStrictEqual({
335 runTime: {
336 aggregate: true,
337 average: true,
338 median: false
339 },
340 waitTime: {
341 aggregate: false,
342 average: false,
343 median: false
344 },
345 elu: {
346 aggregate: true,
347 average: true,
348 median: false
349 }
350 })
351 pool.setWorkerChoiceStrategyOptions({
352 runTime: { median: true },
353 elu: { median: true }
354 })
355 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
356 choiceRetries: 6,
357 runTime: { median: true },
358 waitTime: { median: false },
359 elu: { median: true }
360 })
361 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
362 choiceRetries: 6,
363 runTime: { median: true },
364 waitTime: { median: false },
365 elu: { median: true }
366 })
367 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
368 .workerChoiceStrategies) {
369 expect(workerChoiceStrategy.opts).toStrictEqual({
370 choiceRetries: 6,
371 runTime: { median: true },
372 waitTime: { median: false },
373 elu: { median: true }
374 })
375 }
376 expect(
377 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
378 ).toStrictEqual({
379 runTime: {
380 aggregate: true,
381 average: false,
382 median: true
383 },
384 waitTime: {
385 aggregate: false,
386 average: false,
387 median: false
388 },
389 elu: {
390 aggregate: true,
391 average: false,
392 median: true
393 }
394 })
395 pool.setWorkerChoiceStrategyOptions({
396 runTime: { median: false },
397 elu: { median: false }
398 })
399 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
400 choiceRetries: 6,
401 runTime: { median: false },
402 waitTime: { median: false },
403 elu: { median: false }
404 })
405 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
406 choiceRetries: 6,
407 runTime: { median: false },
408 waitTime: { median: false },
409 elu: { median: false }
410 })
411 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
412 .workerChoiceStrategies) {
413 expect(workerChoiceStrategy.opts).toStrictEqual({
414 choiceRetries: 6,
415 runTime: { median: false },
416 waitTime: { median: false },
417 elu: { median: false }
418 })
419 }
420 expect(
421 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
422 ).toStrictEqual({
423 runTime: {
424 aggregate: true,
425 average: true,
426 median: false
427 },
428 waitTime: {
429 aggregate: false,
430 average: false,
431 median: false
432 },
433 elu: {
434 aggregate: true,
435 average: true,
436 median: false
437 }
438 })
439 expect(() =>
440 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
441 ).toThrowError(
442 'Invalid worker choice strategy options: must be a plain object'
443 )
444 expect(() =>
445 pool.setWorkerChoiceStrategyOptions({ weights: {} })
446 ).toThrowError(
447 'Invalid worker choice strategy options: must have a weight for each worker node'
448 )
449 expect(() =>
450 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
451 ).toThrowError(
452 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
453 )
454 await pool.destroy()
455 })
456
457 it('Verify that pool tasks queue can be enabled/disabled', async () => {
458 const pool = new FixedThreadPool(
459 numberOfWorkers,
460 './tests/worker-files/thread/testWorker.js'
461 )
462 expect(pool.opts.enableTasksQueue).toBe(false)
463 expect(pool.opts.tasksQueueOptions).toBeUndefined()
464 pool.enableTasksQueue(true)
465 expect(pool.opts.enableTasksQueue).toBe(true)
466 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
467 pool.enableTasksQueue(true, { concurrency: 2 })
468 expect(pool.opts.enableTasksQueue).toBe(true)
469 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
470 pool.enableTasksQueue(false)
471 expect(pool.opts.enableTasksQueue).toBe(false)
472 expect(pool.opts.tasksQueueOptions).toBeUndefined()
473 await pool.destroy()
474 })
475
476 it('Verify that pool tasks queue options can be set', async () => {
477 const pool = new FixedThreadPool(
478 numberOfWorkers,
479 './tests/worker-files/thread/testWorker.js',
480 { enableTasksQueue: true }
481 )
482 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
483 pool.setTasksQueueOptions({ concurrency: 2 })
484 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
485 expect(() =>
486 pool.setTasksQueueOptions('invalidTasksQueueOptions')
487 ).toThrowError('Invalid tasks queue options: must be a plain object')
488 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
489 "Invalid worker tasks concurrency '0'"
490 )
491 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
492 'Invalid worker tasks concurrency: must be an integer'
493 )
494 await pool.destroy()
495 })
496
497 it('Verify that pool info is set', async () => {
498 let pool = new FixedThreadPool(
499 numberOfWorkers,
500 './tests/worker-files/thread/testWorker.js'
501 )
502 expect(pool.info).toStrictEqual({
503 version,
504 type: PoolTypes.fixed,
505 worker: WorkerTypes.thread,
506 ready: true,
507 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
508 minSize: numberOfWorkers,
509 maxSize: numberOfWorkers,
510 workerNodes: numberOfWorkers,
511 idleWorkerNodes: numberOfWorkers,
512 busyWorkerNodes: 0,
513 executedTasks: 0,
514 executingTasks: 0,
515 failedTasks: 0
516 })
517 await pool.destroy()
518 pool = new DynamicClusterPool(
519 Math.floor(numberOfWorkers / 2),
520 numberOfWorkers,
521 './tests/worker-files/cluster/testWorker.js'
522 )
523 expect(pool.info).toStrictEqual({
524 version,
525 type: PoolTypes.dynamic,
526 worker: WorkerTypes.cluster,
527 ready: true,
528 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
529 minSize: Math.floor(numberOfWorkers / 2),
530 maxSize: numberOfWorkers,
531 workerNodes: Math.floor(numberOfWorkers / 2),
532 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
533 busyWorkerNodes: 0,
534 executedTasks: 0,
535 executingTasks: 0,
536 failedTasks: 0
537 })
538 await pool.destroy()
539 })
540
541 it('Verify that pool worker tasks usage are initialized', async () => {
542 const pool = new FixedClusterPool(
543 numberOfWorkers,
544 './tests/worker-files/cluster/testWorker.js'
545 )
546 for (const workerNode of pool.workerNodes) {
547 expect(workerNode.usage).toStrictEqual({
548 tasks: {
549 executed: 0,
550 executing: 0,
551 queued: 0,
552 maxQueued: 0,
553 failed: 0
554 },
555 runTime: {
556 history: expect.any(CircularArray)
557 },
558 waitTime: {
559 history: expect.any(CircularArray)
560 },
561 elu: {
562 idle: {
563 history: expect.any(CircularArray)
564 },
565 active: {
566 history: expect.any(CircularArray)
567 }
568 }
569 })
570 }
571 await pool.destroy()
572 })
573
574 it('Verify that pool worker tasks queue are initialized', async () => {
575 let pool = new FixedClusterPool(
576 numberOfWorkers,
577 './tests/worker-files/cluster/testWorker.js'
578 )
579 for (const workerNode of pool.workerNodes) {
580 expect(workerNode.tasksQueue).toBeDefined()
581 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
582 expect(workerNode.tasksQueue.size).toBe(0)
583 expect(workerNode.tasksQueue.maxSize).toBe(0)
584 }
585 await pool.destroy()
586 pool = new DynamicThreadPool(
587 Math.floor(numberOfWorkers / 2),
588 numberOfWorkers,
589 './tests/worker-files/thread/testWorker.js'
590 )
591 for (const workerNode of pool.workerNodes) {
592 expect(workerNode.tasksQueue).toBeDefined()
593 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
594 expect(workerNode.tasksQueue.size).toBe(0)
595 expect(workerNode.tasksQueue.maxSize).toBe(0)
596 }
597 })
598
599 it('Verify that pool worker info are initialized', async () => {
600 let pool = new FixedClusterPool(
601 numberOfWorkers,
602 './tests/worker-files/cluster/testWorker.js'
603 )
604 for (const workerNode of pool.workerNodes) {
605 expect(workerNode.info).toStrictEqual({
606 id: expect.any(Number),
607 type: WorkerTypes.cluster,
608 dynamic: false,
609 ready: true
610 })
611 }
612 await pool.destroy()
613 pool = new DynamicThreadPool(
614 Math.floor(numberOfWorkers / 2),
615 numberOfWorkers,
616 './tests/worker-files/thread/testWorker.js'
617 )
618 for (const workerNode of pool.workerNodes) {
619 expect(workerNode.info).toStrictEqual({
620 id: expect.any(Number),
621 type: WorkerTypes.thread,
622 dynamic: false,
623 ready: true
624 })
625 }
626 })
627
628 it('Verify that pool worker tasks usage are computed', async () => {
629 const pool = new FixedClusterPool(
630 numberOfWorkers,
631 './tests/worker-files/cluster/testWorker.js'
632 )
633 const promises = new Set()
634 const maxMultiplier = 2
635 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
636 promises.add(pool.execute())
637 }
638 for (const workerNode of pool.workerNodes) {
639 expect(workerNode.usage).toStrictEqual({
640 tasks: {
641 executed: 0,
642 executing: maxMultiplier,
643 queued: 0,
644 maxQueued: 0,
645 failed: 0
646 },
647 runTime: {
648 history: expect.any(CircularArray)
649 },
650 waitTime: {
651 history: expect.any(CircularArray)
652 },
653 elu: {
654 idle: {
655 history: expect.any(CircularArray)
656 },
657 active: {
658 history: expect.any(CircularArray)
659 }
660 }
661 })
662 }
663 await Promise.all(promises)
664 for (const workerNode of pool.workerNodes) {
665 expect(workerNode.usage).toStrictEqual({
666 tasks: {
667 executed: maxMultiplier,
668 executing: 0,
669 queued: 0,
670 maxQueued: 0,
671 failed: 0
672 },
673 runTime: {
674 history: expect.any(CircularArray)
675 },
676 waitTime: {
677 history: expect.any(CircularArray)
678 },
679 elu: {
680 idle: {
681 history: expect.any(CircularArray)
682 },
683 active: {
684 history: expect.any(CircularArray)
685 }
686 }
687 })
688 }
689 await pool.destroy()
690 })
691
692 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
693 const pool = new DynamicThreadPool(
694 Math.floor(numberOfWorkers / 2),
695 numberOfWorkers,
696 './tests/worker-files/thread/testWorker.js'
697 )
698 const promises = new Set()
699 const maxMultiplier = 2
700 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
701 promises.add(pool.execute())
702 }
703 await Promise.all(promises)
704 for (const workerNode of pool.workerNodes) {
705 expect(workerNode.usage).toStrictEqual({
706 tasks: {
707 executed: expect.any(Number),
708 executing: 0,
709 queued: 0,
710 maxQueued: 0,
711 failed: 0
712 },
713 runTime: {
714 history: expect.any(CircularArray)
715 },
716 waitTime: {
717 history: expect.any(CircularArray)
718 },
719 elu: {
720 idle: {
721 history: expect.any(CircularArray)
722 },
723 active: {
724 history: expect.any(CircularArray)
725 }
726 }
727 })
728 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
729 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
730 expect(workerNode.usage.runTime.history.length).toBe(0)
731 expect(workerNode.usage.waitTime.history.length).toBe(0)
732 expect(workerNode.usage.elu.idle.history.length).toBe(0)
733 expect(workerNode.usage.elu.active.history.length).toBe(0)
734 }
735 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
736 for (const workerNode of pool.workerNodes) {
737 expect(workerNode.usage).toStrictEqual({
738 tasks: {
739 executed: 0,
740 executing: 0,
741 queued: 0,
742 maxQueued: 0,
743 failed: 0
744 },
745 runTime: {
746 history: expect.any(CircularArray)
747 },
748 waitTime: {
749 history: expect.any(CircularArray)
750 },
751 elu: {
752 idle: {
753 history: expect.any(CircularArray)
754 },
755 active: {
756 history: expect.any(CircularArray)
757 }
758 }
759 })
760 expect(workerNode.usage.runTime.history.length).toBe(0)
761 expect(workerNode.usage.waitTime.history.length).toBe(0)
762 expect(workerNode.usage.elu.idle.history.length).toBe(0)
763 expect(workerNode.usage.elu.active.history.length).toBe(0)
764 }
765 await pool.destroy()
766 })
767
768 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
769 const pool = new DynamicClusterPool(
770 Math.floor(numberOfWorkers / 2),
771 numberOfWorkers,
772 './tests/worker-files/cluster/testWorker.js'
773 )
774 let poolInfo
775 let poolReady = 0
776 pool.emitter.on(PoolEvents.ready, (info) => {
777 ++poolReady
778 poolInfo = info
779 })
780 await waitPoolEvents(pool, PoolEvents.ready, 1)
781 expect(poolReady).toBe(1)
782 expect(poolInfo).toStrictEqual({
783 version,
784 type: PoolTypes.dynamic,
785 worker: WorkerTypes.cluster,
786 ready: true,
787 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
788 minSize: expect.any(Number),
789 maxSize: expect.any(Number),
790 workerNodes: expect.any(Number),
791 idleWorkerNodes: expect.any(Number),
792 busyWorkerNodes: expect.any(Number),
793 executedTasks: expect.any(Number),
794 executingTasks: expect.any(Number),
795 failedTasks: expect.any(Number)
796 })
797 await pool.destroy()
798 })
799
800 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
801 const pool = new FixedThreadPool(
802 numberOfWorkers,
803 './tests/worker-files/thread/testWorker.js'
804 )
805 const promises = new Set()
806 let poolBusy = 0
807 let poolInfo
808 pool.emitter.on(PoolEvents.busy, (info) => {
809 ++poolBusy
810 poolInfo = info
811 })
812 for (let i = 0; i < numberOfWorkers * 2; i++) {
813 promises.add(pool.execute())
814 }
815 await Promise.all(promises)
816 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
817 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
818 expect(poolBusy).toBe(numberOfWorkers + 1)
819 expect(poolInfo).toStrictEqual({
820 version,
821 type: PoolTypes.fixed,
822 worker: WorkerTypes.thread,
823 ready: expect.any(Boolean),
824 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
825 minSize: expect.any(Number),
826 maxSize: expect.any(Number),
827 workerNodes: expect.any(Number),
828 idleWorkerNodes: expect.any(Number),
829 busyWorkerNodes: expect.any(Number),
830 executedTasks: expect.any(Number),
831 executingTasks: expect.any(Number),
832 failedTasks: expect.any(Number)
833 })
834 await pool.destroy()
835 })
836
837 it("Verify that pool event emitter 'full' event can register a callback", async () => {
838 const pool = new DynamicThreadPool(
839 Math.floor(numberOfWorkers / 2),
840 numberOfWorkers,
841 './tests/worker-files/thread/testWorker.js'
842 )
843 const promises = new Set()
844 let poolFull = 0
845 let poolInfo
846 pool.emitter.on(PoolEvents.full, (info) => {
847 ++poolFull
848 poolInfo = info
849 })
850 for (let i = 0; i < numberOfWorkers * 2; i++) {
851 promises.add(pool.execute())
852 }
853 await Promise.all(promises)
854 expect(poolFull).toBe(1)
855 expect(poolInfo).toStrictEqual({
856 version,
857 type: PoolTypes.dynamic,
858 worker: WorkerTypes.thread,
859 ready: expect.any(Boolean),
860 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
861 minSize: expect.any(Number),
862 maxSize: expect.any(Number),
863 workerNodes: expect.any(Number),
864 idleWorkerNodes: expect.any(Number),
865 busyWorkerNodes: expect.any(Number),
866 executedTasks: expect.any(Number),
867 executingTasks: expect.any(Number),
868 failedTasks: expect.any(Number)
869 })
870 await pool.destroy()
871 })
872
873 it('Verify that listTaskFunctions() is working', async () => {
874 const dynamicThreadPool = new DynamicThreadPool(
875 Math.floor(numberOfWorkers / 2),
876 numberOfWorkers,
877 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
878 )
879 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
880 expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
881 'default',
882 'jsonIntegerSerialization',
883 'factorial',
884 'fibonacci'
885 ])
886 const fixedClusterPool = new FixedClusterPool(
887 numberOfWorkers,
888 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
889 )
890 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
891 expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
892 'default',
893 'jsonIntegerSerialization',
894 'factorial',
895 'fibonacci'
896 ])
897 })
898
899 it('Verify that multiple task functions worker is working', async () => {
900 const pool = new DynamicClusterPool(
901 Math.floor(numberOfWorkers / 2),
902 numberOfWorkers,
903 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
904 )
905 const data = { n: 10 }
906 const result0 = await pool.execute(data)
907 expect(result0).toStrictEqual({ ok: 1 })
908 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
909 expect(result1).toStrictEqual({ ok: 1 })
910 const result2 = await pool.execute(data, 'factorial')
911 expect(result2).toBe(3628800)
912 const result3 = await pool.execute(data, 'fibonacci')
913 expect(result3).toBe(55)
914 expect(pool.info.executingTasks).toBe(0)
915 expect(pool.info.executedTasks).toBe(4)
916 for (const workerNode of pool.workerNodes) {
917 expect(workerNode.info.taskFunctions).toStrictEqual([
918 'default',
919 'jsonIntegerSerialization',
920 'factorial',
921 'fibonacci'
922 ])
923 expect(workerNode.taskFunctionsUsage.size).toBe(3)
924 for (const name of pool.listTaskFunctions()) {
925 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
926 tasks: {
927 executed: expect.any(Number),
928 executing: expect.any(Number),
929 failed: 0,
930 queued: 0
931 },
932 runTime: {
933 history: expect.any(CircularArray)
934 },
935 waitTime: {
936 history: expect.any(CircularArray)
937 },
938 elu: {
939 idle: {
940 history: expect.any(CircularArray)
941 },
942 active: {
943 history: expect.any(CircularArray)
944 }
945 }
946 })
947 expect(
948 workerNode.getTaskFunctionWorkerUsage(name).tasks.executing
949 ).toBeGreaterThanOrEqual(0)
950 }
951 }
952 })
953 })