fix: fix pool readiness semantic
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicClusterPool,
4 DynamicThreadPool,
5 FixedClusterPool,
6 FixedThreadPool,
7 PoolEvents,
8 WorkerChoiceStrategies,
9 PoolTypes,
10 WorkerTypes
11 } = require('../../../lib')
12 const { CircularArray } = require('../../../lib/circular-array')
13 const { Queue } = require('../../../lib/queue')
14 const { version } = require('../../../package.json')
15 const { waitPoolEvents } = require('../../test-utils')
16
17 describe('Abstract pool test suite', () => {
18 const numberOfWorkers = 2
19 class StubPoolWithIsMain extends FixedThreadPool {
20 isMain () {
21 return false
22 }
23 }
24
25 it('Simulate pool creation from a non main thread/process', () => {
26 expect(
27 () =>
28 new StubPoolWithIsMain(
29 numberOfWorkers,
30 './tests/worker-files/thread/testWorker.js',
31 {
32 errorHandler: e => console.error(e)
33 }
34 )
35 ).toThrowError('Cannot start a pool from a worker!')
36 })
37
38 it('Verify that filePath is checked', () => {
39 const expectedError = new Error(
40 'Please specify a file with a worker implementation'
41 )
42 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
43 expectedError
44 )
45 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
46 expectedError
47 )
48 })
49
50 it('Verify that numberOfWorkers is checked', () => {
51 expect(() => new FixedThreadPool()).toThrowError(
52 'Cannot instantiate a pool without specifying the number of workers'
53 )
54 })
55
56 it('Verify that a negative number of workers is checked', () => {
57 expect(
58 () =>
59 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
60 ).toThrowError(
61 new RangeError(
62 'Cannot instantiate a pool with a negative number of workers'
63 )
64 )
65 })
66
67 it('Verify that a non integer number of workers is checked', () => {
68 expect(
69 () =>
70 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
71 ).toThrowError(
72 new TypeError(
73 'Cannot instantiate a pool with a non safe integer number of workers'
74 )
75 )
76 })
77
78 it('Verify that dynamic pool sizing is checked', () => {
79 expect(
80 () =>
81 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
82 ).toThrowError(
83 new RangeError(
84 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
85 )
86 )
87 expect(
88 () =>
89 new DynamicThreadPool(1, 1, './tests/worker-files/thread/testWorker.js')
90 ).toThrowError(
91 new RangeError(
92 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
93 )
94 )
95 expect(
96 () =>
97 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
98 ).toThrowError(
99 new RangeError(
100 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
101 )
102 )
103 })
104
105 it('Verify that pool options are checked', async () => {
106 let pool = new FixedThreadPool(
107 numberOfWorkers,
108 './tests/worker-files/thread/testWorker.js'
109 )
110 expect(pool.emitter).toBeDefined()
111 expect(pool.opts.enableEvents).toBe(true)
112 expect(pool.opts.restartWorkerOnError).toBe(true)
113 expect(pool.opts.enableTasksQueue).toBe(false)
114 expect(pool.opts.tasksQueueOptions).toBeUndefined()
115 expect(pool.opts.workerChoiceStrategy).toBe(
116 WorkerChoiceStrategies.ROUND_ROBIN
117 )
118 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
119 runTime: { median: false },
120 waitTime: { median: false },
121 elu: { median: false }
122 })
123 expect(pool.opts.messageHandler).toBeUndefined()
124 expect(pool.opts.errorHandler).toBeUndefined()
125 expect(pool.opts.onlineHandler).toBeUndefined()
126 expect(pool.opts.exitHandler).toBeUndefined()
127 await pool.destroy()
128 const testHandler = () => console.log('test handler executed')
129 pool = new FixedThreadPool(
130 numberOfWorkers,
131 './tests/worker-files/thread/testWorker.js',
132 {
133 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
134 workerChoiceStrategyOptions: {
135 runTime: { median: true },
136 weights: { 0: 300, 1: 200 }
137 },
138 enableEvents: false,
139 restartWorkerOnError: false,
140 enableTasksQueue: true,
141 tasksQueueOptions: { concurrency: 2 },
142 messageHandler: testHandler,
143 errorHandler: testHandler,
144 onlineHandler: testHandler,
145 exitHandler: testHandler
146 }
147 )
148 expect(pool.emitter).toBeUndefined()
149 expect(pool.opts.enableEvents).toBe(false)
150 expect(pool.opts.restartWorkerOnError).toBe(false)
151 expect(pool.opts.enableTasksQueue).toBe(true)
152 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
153 expect(pool.opts.workerChoiceStrategy).toBe(
154 WorkerChoiceStrategies.LEAST_USED
155 )
156 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
157 runTime: { median: true },
158 weights: { 0: 300, 1: 200 }
159 })
160 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
161 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
162 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
163 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
164 await pool.destroy()
165 })
166
167 it('Verify that pool options are validated', async () => {
168 expect(
169 () =>
170 new FixedThreadPool(
171 numberOfWorkers,
172 './tests/worker-files/thread/testWorker.js',
173 {
174 workerChoiceStrategy: 'invalidStrategy'
175 }
176 )
177 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
178 expect(
179 () =>
180 new FixedThreadPool(
181 numberOfWorkers,
182 './tests/worker-files/thread/testWorker.js',
183 {
184 workerChoiceStrategyOptions: 'invalidOptions'
185 }
186 )
187 ).toThrowError(
188 'Invalid worker choice strategy options: must be a plain object'
189 )
190 expect(
191 () =>
192 new FixedThreadPool(
193 numberOfWorkers,
194 './tests/worker-files/thread/testWorker.js',
195 {
196 workerChoiceStrategyOptions: { weights: {} }
197 }
198 )
199 ).toThrowError(
200 'Invalid worker choice strategy options: must have a weight for each worker node'
201 )
202 expect(
203 () =>
204 new FixedThreadPool(
205 numberOfWorkers,
206 './tests/worker-files/thread/testWorker.js',
207 {
208 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
209 }
210 )
211 ).toThrowError(
212 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
213 )
214 expect(
215 () =>
216 new FixedThreadPool(
217 numberOfWorkers,
218 './tests/worker-files/thread/testWorker.js',
219 {
220 enableTasksQueue: true,
221 tasksQueueOptions: { concurrency: 0 }
222 }
223 )
224 ).toThrowError("Invalid worker tasks concurrency '0'")
225 expect(
226 () =>
227 new FixedThreadPool(
228 numberOfWorkers,
229 './tests/worker-files/thread/testWorker.js',
230 {
231 enableTasksQueue: true,
232 tasksQueueOptions: 'invalidTasksQueueOptions'
233 }
234 )
235 ).toThrowError('Invalid tasks queue options: must be a plain object')
236 expect(
237 () =>
238 new FixedThreadPool(
239 numberOfWorkers,
240 './tests/worker-files/thread/testWorker.js',
241 {
242 enableTasksQueue: true,
243 tasksQueueOptions: { concurrency: 0.2 }
244 }
245 )
246 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
247 })
248
249 it('Verify that pool worker choice strategy options can be set', async () => {
250 const pool = new FixedThreadPool(
251 numberOfWorkers,
252 './tests/worker-files/thread/testWorker.js',
253 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
254 )
255 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
256 runTime: { median: false },
257 waitTime: { median: false },
258 elu: { median: false }
259 })
260 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
261 .workerChoiceStrategies) {
262 expect(workerChoiceStrategy.opts).toStrictEqual({
263 runTime: { median: false },
264 waitTime: { median: false },
265 elu: { median: false }
266 })
267 }
268 expect(
269 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
270 ).toStrictEqual({
271 runTime: {
272 aggregate: true,
273 average: true,
274 median: false
275 },
276 waitTime: {
277 aggregate: false,
278 average: false,
279 median: false
280 },
281 elu: {
282 aggregate: true,
283 average: true,
284 median: false
285 }
286 })
287 pool.setWorkerChoiceStrategyOptions({
288 runTime: { median: true },
289 elu: { median: true }
290 })
291 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
292 runTime: { median: true },
293 elu: { median: true }
294 })
295 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
296 .workerChoiceStrategies) {
297 expect(workerChoiceStrategy.opts).toStrictEqual({
298 runTime: { median: true },
299 elu: { median: true }
300 })
301 }
302 expect(
303 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 ).toStrictEqual({
305 runTime: {
306 aggregate: true,
307 average: false,
308 median: true
309 },
310 waitTime: {
311 aggregate: false,
312 average: false,
313 median: false
314 },
315 elu: {
316 aggregate: true,
317 average: false,
318 median: true
319 }
320 })
321 pool.setWorkerChoiceStrategyOptions({
322 runTime: { median: false },
323 elu: { median: false }
324 })
325 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
326 runTime: { median: false },
327 elu: { median: false }
328 })
329 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
330 .workerChoiceStrategies) {
331 expect(workerChoiceStrategy.opts).toStrictEqual({
332 runTime: { median: false },
333 elu: { median: false }
334 })
335 }
336 expect(
337 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
338 ).toStrictEqual({
339 runTime: {
340 aggregate: true,
341 average: true,
342 median: false
343 },
344 waitTime: {
345 aggregate: false,
346 average: false,
347 median: false
348 },
349 elu: {
350 aggregate: true,
351 average: true,
352 median: false
353 }
354 })
355 expect(() =>
356 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
357 ).toThrowError(
358 'Invalid worker choice strategy options: must be a plain object'
359 )
360 expect(() =>
361 pool.setWorkerChoiceStrategyOptions({ weights: {} })
362 ).toThrowError(
363 'Invalid worker choice strategy options: must have a weight for each worker node'
364 )
365 expect(() =>
366 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
367 ).toThrowError(
368 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
369 )
370 await pool.destroy()
371 })
372
373 it('Verify that pool tasks queue can be enabled/disabled', async () => {
374 const pool = new FixedThreadPool(
375 numberOfWorkers,
376 './tests/worker-files/thread/testWorker.js'
377 )
378 expect(pool.opts.enableTasksQueue).toBe(false)
379 expect(pool.opts.tasksQueueOptions).toBeUndefined()
380 pool.enableTasksQueue(true)
381 expect(pool.opts.enableTasksQueue).toBe(true)
382 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
383 pool.enableTasksQueue(true, { concurrency: 2 })
384 expect(pool.opts.enableTasksQueue).toBe(true)
385 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
386 pool.enableTasksQueue(false)
387 expect(pool.opts.enableTasksQueue).toBe(false)
388 expect(pool.opts.tasksQueueOptions).toBeUndefined()
389 await pool.destroy()
390 })
391
392 it('Verify that pool tasks queue options can be set', async () => {
393 const pool = new FixedThreadPool(
394 numberOfWorkers,
395 './tests/worker-files/thread/testWorker.js',
396 { enableTasksQueue: true }
397 )
398 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
399 pool.setTasksQueueOptions({ concurrency: 2 })
400 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
401 expect(() =>
402 pool.setTasksQueueOptions('invalidTasksQueueOptions')
403 ).toThrowError('Invalid tasks queue options: must be a plain object')
404 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
405 "Invalid worker tasks concurrency '0'"
406 )
407 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
408 'Invalid worker tasks concurrency: must be an integer'
409 )
410 await pool.destroy()
411 })
412
413 it('Verify that pool info is set', async () => {
414 let pool = new FixedThreadPool(
415 numberOfWorkers,
416 './tests/worker-files/thread/testWorker.js'
417 )
418 expect(pool.info).toStrictEqual({
419 version,
420 type: PoolTypes.fixed,
421 worker: WorkerTypes.thread,
422 ready: false,
423 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
424 minSize: numberOfWorkers,
425 maxSize: numberOfWorkers,
426 workerNodes: numberOfWorkers,
427 idleWorkerNodes: numberOfWorkers,
428 busyWorkerNodes: 0,
429 executedTasks: 0,
430 executingTasks: 0,
431 queuedTasks: 0,
432 maxQueuedTasks: 0,
433 failedTasks: 0
434 })
435 await pool.destroy()
436 pool = new DynamicClusterPool(
437 Math.floor(numberOfWorkers / 2),
438 numberOfWorkers,
439 './tests/worker-files/cluster/testWorker.js'
440 )
441 expect(pool.info).toStrictEqual({
442 version,
443 type: PoolTypes.dynamic,
444 worker: WorkerTypes.cluster,
445 ready: false,
446 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
447 minSize: Math.floor(numberOfWorkers / 2),
448 maxSize: numberOfWorkers,
449 workerNodes: Math.floor(numberOfWorkers / 2),
450 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
451 busyWorkerNodes: 0,
452 executedTasks: 0,
453 executingTasks: 0,
454 queuedTasks: 0,
455 maxQueuedTasks: 0,
456 failedTasks: 0
457 })
458 await pool.destroy()
459 })
460
461 it('Verify that pool worker tasks usage are initialized', async () => {
462 const pool = new FixedClusterPool(
463 numberOfWorkers,
464 './tests/worker-files/cluster/testWorker.js'
465 )
466 for (const workerNode of pool.workerNodes) {
467 expect(workerNode.usage).toStrictEqual({
468 tasks: {
469 executed: 0,
470 executing: 0,
471 queued: 0,
472 maxQueued: 0,
473 failed: 0
474 },
475 runTime: {
476 history: expect.any(CircularArray)
477 },
478 waitTime: {
479 history: expect.any(CircularArray)
480 },
481 elu: {
482 idle: {
483 history: expect.any(CircularArray)
484 },
485 active: {
486 history: expect.any(CircularArray)
487 }
488 }
489 })
490 }
491 await pool.destroy()
492 })
493
494 it('Verify that pool worker tasks queue are initialized', async () => {
495 let pool = new FixedClusterPool(
496 numberOfWorkers,
497 './tests/worker-files/cluster/testWorker.js'
498 )
499 for (const workerNode of pool.workerNodes) {
500 expect(workerNode.tasksQueue).toBeDefined()
501 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
502 expect(workerNode.tasksQueue.size).toBe(0)
503 expect(workerNode.tasksQueue.maxSize).toBe(0)
504 }
505 await pool.destroy()
506 pool = new DynamicThreadPool(
507 Math.floor(numberOfWorkers / 2),
508 numberOfWorkers,
509 './tests/worker-files/thread/testWorker.js'
510 )
511 for (const workerNode of pool.workerNodes) {
512 expect(workerNode.tasksQueue).toBeDefined()
513 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
514 expect(workerNode.tasksQueue.size).toBe(0)
515 expect(workerNode.tasksQueue.maxSize).toBe(0)
516 }
517 })
518
519 it('Verify that pool worker info are initialized', async () => {
520 let pool = new FixedClusterPool(
521 numberOfWorkers,
522 './tests/worker-files/cluster/testWorker.js'
523 )
524 for (const workerNode of pool.workerNodes) {
525 expect(workerNode.info).toStrictEqual({
526 id: expect.any(Number),
527 type: WorkerTypes.cluster,
528 dynamic: false,
529 ready: false
530 })
531 }
532 await pool.destroy()
533 pool = new DynamicThreadPool(
534 Math.floor(numberOfWorkers / 2),
535 numberOfWorkers,
536 './tests/worker-files/thread/testWorker.js'
537 )
538 for (const workerNode of pool.workerNodes) {
539 expect(workerNode.info).toStrictEqual({
540 id: expect.any(Number),
541 type: WorkerTypes.thread,
542 dynamic: false,
543 ready: false
544 })
545 }
546 })
547
548 it('Verify that pool worker tasks usage are computed', async () => {
549 const pool = new FixedClusterPool(
550 numberOfWorkers,
551 './tests/worker-files/cluster/testWorker.js'
552 )
553 const promises = new Set()
554 const maxMultiplier = 2
555 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
556 promises.add(pool.execute())
557 }
558 for (const workerNode of pool.workerNodes) {
559 expect(workerNode.usage).toStrictEqual({
560 tasks: {
561 executed: 0,
562 executing: maxMultiplier,
563 queued: 0,
564 maxQueued: 0,
565 failed: 0
566 },
567 runTime: {
568 history: expect.any(CircularArray)
569 },
570 waitTime: {
571 history: expect.any(CircularArray)
572 },
573 elu: {
574 idle: {
575 history: expect.any(CircularArray)
576 },
577 active: {
578 history: expect.any(CircularArray)
579 }
580 }
581 })
582 }
583 await Promise.all(promises)
584 for (const workerNode of pool.workerNodes) {
585 expect(workerNode.usage).toStrictEqual({
586 tasks: {
587 executed: maxMultiplier,
588 executing: 0,
589 queued: 0,
590 maxQueued: 0,
591 failed: 0
592 },
593 runTime: {
594 history: expect.any(CircularArray)
595 },
596 waitTime: {
597 history: expect.any(CircularArray)
598 },
599 elu: {
600 idle: {
601 history: expect.any(CircularArray)
602 },
603 active: {
604 history: expect.any(CircularArray)
605 }
606 }
607 })
608 }
609 await pool.destroy()
610 })
611
612 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
613 const pool = new DynamicThreadPool(
614 Math.floor(numberOfWorkers / 2),
615 numberOfWorkers,
616 './tests/worker-files/thread/testWorker.js'
617 )
618 const promises = new Set()
619 const maxMultiplier = 2
620 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
621 promises.add(pool.execute())
622 }
623 await Promise.all(promises)
624 for (const workerNode of pool.workerNodes) {
625 expect(workerNode.usage).toStrictEqual({
626 tasks: {
627 executed: expect.any(Number),
628 executing: 0,
629 queued: 0,
630 maxQueued: 0,
631 failed: 0
632 },
633 runTime: {
634 history: expect.any(CircularArray)
635 },
636 waitTime: {
637 history: expect.any(CircularArray)
638 },
639 elu: {
640 idle: {
641 history: expect.any(CircularArray)
642 },
643 active: {
644 history: expect.any(CircularArray)
645 }
646 }
647 })
648 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
649 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
650 }
651 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
652 for (const workerNode of pool.workerNodes) {
653 expect(workerNode.usage).toStrictEqual({
654 tasks: {
655 executed: 0,
656 executing: 0,
657 queued: 0,
658 maxQueued: 0,
659 failed: 0
660 },
661 runTime: {
662 history: expect.any(CircularArray)
663 },
664 waitTime: {
665 history: expect.any(CircularArray)
666 },
667 elu: {
668 idle: {
669 history: expect.any(CircularArray)
670 },
671 active: {
672 history: expect.any(CircularArray)
673 }
674 }
675 })
676 expect(workerNode.usage.runTime.history.length).toBe(0)
677 expect(workerNode.usage.waitTime.history.length).toBe(0)
678 }
679 await pool.destroy()
680 })
681
682 it("Verify that pool event emitter 'full' event can register a callback", async () => {
683 const pool = new DynamicThreadPool(
684 Math.floor(numberOfWorkers / 2),
685 numberOfWorkers,
686 './tests/worker-files/thread/testWorker.js'
687 )
688 const promises = new Set()
689 let poolFull = 0
690 let poolInfo
691 pool.emitter.on(PoolEvents.full, info => {
692 ++poolFull
693 poolInfo = info
694 })
695 for (let i = 0; i < numberOfWorkers * 2; i++) {
696 promises.add(pool.execute())
697 }
698 await Promise.all(promises)
699 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
700 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
701 expect(poolFull).toBe(numberOfWorkers * 2 - 1)
702 expect(poolInfo).toStrictEqual({
703 version,
704 type: PoolTypes.dynamic,
705 worker: WorkerTypes.thread,
706 ready: expect.any(Boolean),
707 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
708 minSize: expect.any(Number),
709 maxSize: expect.any(Number),
710 workerNodes: expect.any(Number),
711 idleWorkerNodes: expect.any(Number),
712 busyWorkerNodes: expect.any(Number),
713 executedTasks: expect.any(Number),
714 executingTasks: expect.any(Number),
715 queuedTasks: expect.any(Number),
716 maxQueuedTasks: expect.any(Number),
717 failedTasks: expect.any(Number)
718 })
719 await pool.destroy()
720 })
721
722 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
723 const pool = new DynamicClusterPool(
724 Math.floor(numberOfWorkers / 2),
725 numberOfWorkers,
726 './tests/worker-files/cluster/testWorker.js'
727 )
728 let poolReady = 0
729 let poolInfo
730 pool.emitter.on(PoolEvents.ready, info => {
731 ++poolReady
732 poolInfo = info
733 })
734 await waitPoolEvents(pool, PoolEvents.ready, 1)
735 expect(poolReady).toBe(1)
736 expect(poolInfo).toStrictEqual({
737 version,
738 type: PoolTypes.dynamic,
739 worker: WorkerTypes.cluster,
740 ready: true,
741 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
742 minSize: expect.any(Number),
743 maxSize: expect.any(Number),
744 workerNodes: expect.any(Number),
745 idleWorkerNodes: expect.any(Number),
746 busyWorkerNodes: expect.any(Number),
747 executedTasks: expect.any(Number),
748 executingTasks: expect.any(Number),
749 queuedTasks: expect.any(Number),
750 maxQueuedTasks: expect.any(Number),
751 failedTasks: expect.any(Number)
752 })
753 await pool.destroy()
754 })
755
756 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
757 const pool = new FixedThreadPool(
758 numberOfWorkers,
759 './tests/worker-files/thread/testWorker.js'
760 )
761 const promises = new Set()
762 let poolBusy = 0
763 let poolInfo
764 pool.emitter.on(PoolEvents.busy, info => {
765 ++poolBusy
766 poolInfo = info
767 })
768 for (let i = 0; i < numberOfWorkers * 2; i++) {
769 promises.add(pool.execute())
770 }
771 await Promise.all(promises)
772 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
773 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
774 expect(poolBusy).toBe(numberOfWorkers + 1)
775 expect(poolInfo).toStrictEqual({
776 version,
777 type: PoolTypes.fixed,
778 worker: WorkerTypes.thread,
779 ready: expect.any(Boolean),
780 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
781 minSize: expect.any(Number),
782 maxSize: expect.any(Number),
783 workerNodes: expect.any(Number),
784 idleWorkerNodes: expect.any(Number),
785 busyWorkerNodes: expect.any(Number),
786 executedTasks: expect.any(Number),
787 executingTasks: expect.any(Number),
788 queuedTasks: expect.any(Number),
789 maxQueuedTasks: expect.any(Number),
790 failedTasks: expect.any(Number)
791 })
792 await pool.destroy()
793 })
794
795 it('Verify that multiple tasks worker is working', async () => {
796 const pool = new DynamicClusterPool(
797 Math.floor(numberOfWorkers / 2),
798 numberOfWorkers,
799 './tests/worker-files/cluster/testMultiTasksWorker.js'
800 )
801 const data = { n: 10 }
802 const result0 = await pool.execute(data)
803 expect(result0).toStrictEqual({ ok: 1 })
804 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
805 expect(result1).toStrictEqual({ ok: 1 })
806 const result2 = await pool.execute(data, 'factorial')
807 expect(result2).toBe(3628800)
808 const result3 = await pool.execute(data, 'fibonacci')
809 expect(result3).toBe(55)
810 })
811 })