build: switch from prettier to rome as code formatter
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { MessageChannel } = require('worker_threads')
2 const { expect } = require('expect')
3 const {
4 DynamicClusterPool,
5 DynamicThreadPool,
6 FixedClusterPool,
7 FixedThreadPool,
8 PoolEvents,
9 PoolTypes,
10 WorkerChoiceStrategies,
11 WorkerTypes
12 } = require('../../../lib')
13 const { CircularArray } = require('../../../lib/circular-array')
14 const { Queue } = require('../../../lib/queue')
15 const { version } = require('../../../package.json')
16 const { waitPoolEvents } = require('../../test-utils')
17
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers = 2
20 class StubPoolWithIsMain extends FixedThreadPool {
21 isMain () {
22 return false
23 }
24 }
25
26 it('Simulate pool creation from a non main thread/process', () => {
27 expect(
28 () =>
29 new StubPoolWithIsMain(
30 numberOfWorkers,
31 './tests/worker-files/thread/testWorker.js',
32 {
33 errorHandler: (e) => console.error(e)
34 }
35 )
36 ).toThrowError('Cannot start a pool from a worker!')
37 })
38
39 it('Verify that filePath is checked', () => {
40 const expectedError = new Error(
41 'Please specify a file with a worker implementation'
42 )
43 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
44 expectedError
45 )
46 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
47 expectedError
48 )
49 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
50 expectedError
51 )
52 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
53 expectedError
54 )
55 expect(
56 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
57 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
58 })
59
60 it('Verify that numberOfWorkers is checked', () => {
61 expect(() => new FixedThreadPool()).toThrowError(
62 'Cannot instantiate a pool without specifying the number of workers'
63 )
64 })
65
66 it('Verify that a negative number of workers is checked', () => {
67 expect(
68 () =>
69 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
70 ).toThrowError(
71 new RangeError(
72 'Cannot instantiate a pool with a negative number of workers'
73 )
74 )
75 })
76
77 it('Verify that a non integer number of workers is checked', () => {
78 expect(
79 () =>
80 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
81 ).toThrowError(
82 new TypeError(
83 'Cannot instantiate a pool with a non safe integer number of workers'
84 )
85 )
86 })
87
88 it('Verify that dynamic pool sizing is checked', () => {
89 expect(
90 () =>
91 new DynamicClusterPool(
92 1,
93 undefined,
94 './tests/worker-files/cluster/testWorker.js'
95 )
96 ).toThrowError(
97 new TypeError(
98 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
99 )
100 )
101 expect(
102 () =>
103 new DynamicThreadPool(
104 0.5,
105 1,
106 './tests/worker-files/thread/testWorker.js'
107 )
108 ).toThrowError(
109 new TypeError(
110 'Cannot instantiate a pool with a non safe integer number of workers'
111 )
112 )
113 expect(
114 () =>
115 new DynamicClusterPool(
116 0,
117 0.5,
118 './tests/worker-files/cluster/testWorker.js'
119 )
120 ).toThrowError(
121 new TypeError(
122 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
123 )
124 )
125 expect(
126 () =>
127 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
128 ).toThrowError(
129 new RangeError(
130 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
131 )
132 )
133 expect(
134 () =>
135 new DynamicClusterPool(
136 1,
137 1,
138 './tests/worker-files/cluster/testWorker.js'
139 )
140 ).toThrowError(
141 new RangeError(
142 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
143 )
144 )
145 expect(
146 () =>
147 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
148 ).toThrowError(
149 new RangeError(
150 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
151 )
152 )
153 })
154
155 it('Verify that pool options are checked', async () => {
156 let pool = new FixedThreadPool(
157 numberOfWorkers,
158 './tests/worker-files/thread/testWorker.js'
159 )
160 expect(pool.emitter).toBeDefined()
161 expect(pool.opts.enableEvents).toBe(true)
162 expect(pool.opts.restartWorkerOnError).toBe(true)
163 expect(pool.opts.enableTasksQueue).toBe(false)
164 expect(pool.opts.tasksQueueOptions).toBeUndefined()
165 expect(pool.opts.workerChoiceStrategy).toBe(
166 WorkerChoiceStrategies.ROUND_ROBIN
167 )
168 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
169 runTime: { median: false },
170 waitTime: { median: false },
171 elu: { median: false }
172 })
173 expect(pool.opts.messageHandler).toBeUndefined()
174 expect(pool.opts.errorHandler).toBeUndefined()
175 expect(pool.opts.onlineHandler).toBeUndefined()
176 expect(pool.opts.exitHandler).toBeUndefined()
177 await pool.destroy()
178 const testHandler = () => console.info('test handler executed')
179 pool = new FixedThreadPool(
180 numberOfWorkers,
181 './tests/worker-files/thread/testWorker.js',
182 {
183 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
184 workerChoiceStrategyOptions: {
185 runTime: { median: true },
186 weights: { 0: 300, 1: 200 }
187 },
188 enableEvents: false,
189 restartWorkerOnError: false,
190 enableTasksQueue: true,
191 tasksQueueOptions: { concurrency: 2 },
192 messageHandler: testHandler,
193 errorHandler: testHandler,
194 onlineHandler: testHandler,
195 exitHandler: testHandler
196 }
197 )
198 expect(pool.emitter).toBeUndefined()
199 expect(pool.opts.enableEvents).toBe(false)
200 expect(pool.opts.restartWorkerOnError).toBe(false)
201 expect(pool.opts.enableTasksQueue).toBe(true)
202 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
203 expect(pool.opts.workerChoiceStrategy).toBe(
204 WorkerChoiceStrategies.LEAST_USED
205 )
206 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
207 runTime: { median: true },
208 weights: { 0: 300, 1: 200 }
209 })
210 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
211 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
212 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
213 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
214 await pool.destroy()
215 })
216
217 it('Verify that pool options are validated', async () => {
218 expect(
219 () =>
220 new FixedThreadPool(
221 numberOfWorkers,
222 './tests/worker-files/thread/testWorker.js',
223 {
224 workerChoiceStrategy: 'invalidStrategy'
225 }
226 )
227 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
228 expect(
229 () =>
230 new FixedThreadPool(
231 numberOfWorkers,
232 './tests/worker-files/thread/testWorker.js',
233 {
234 workerChoiceStrategyOptions: 'invalidOptions'
235 }
236 )
237 ).toThrowError(
238 'Invalid worker choice strategy options: must be a plain object'
239 )
240 expect(
241 () =>
242 new FixedThreadPool(
243 numberOfWorkers,
244 './tests/worker-files/thread/testWorker.js',
245 {
246 workerChoiceStrategyOptions: { weights: {} }
247 }
248 )
249 ).toThrowError(
250 'Invalid worker choice strategy options: must have a weight for each worker node'
251 )
252 expect(
253 () =>
254 new FixedThreadPool(
255 numberOfWorkers,
256 './tests/worker-files/thread/testWorker.js',
257 {
258 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
259 }
260 )
261 ).toThrowError(
262 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
263 )
264 expect(
265 () =>
266 new FixedThreadPool(
267 numberOfWorkers,
268 './tests/worker-files/thread/testWorker.js',
269 {
270 enableTasksQueue: true,
271 tasksQueueOptions: { concurrency: 0 }
272 }
273 )
274 ).toThrowError("Invalid worker tasks concurrency '0'")
275 expect(
276 () =>
277 new FixedThreadPool(
278 numberOfWorkers,
279 './tests/worker-files/thread/testWorker.js',
280 {
281 enableTasksQueue: true,
282 tasksQueueOptions: 'invalidTasksQueueOptions'
283 }
284 )
285 ).toThrowError('Invalid tasks queue options: must be a plain object')
286 expect(
287 () =>
288 new FixedThreadPool(
289 numberOfWorkers,
290 './tests/worker-files/thread/testWorker.js',
291 {
292 enableTasksQueue: true,
293 tasksQueueOptions: { concurrency: 0.2 }
294 }
295 )
296 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
297 })
298
299 it('Verify that pool worker choice strategy options can be set', async () => {
300 const pool = new FixedThreadPool(
301 numberOfWorkers,
302 './tests/worker-files/thread/testWorker.js',
303 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
304 )
305 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
306 runTime: { median: false },
307 waitTime: { median: false },
308 elu: { median: false }
309 })
310 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
311 .workerChoiceStrategies) {
312 expect(workerChoiceStrategy.opts).toStrictEqual({
313 runTime: { median: false },
314 waitTime: { median: false },
315 elu: { median: false }
316 })
317 }
318 expect(
319 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
320 ).toStrictEqual({
321 runTime: {
322 aggregate: true,
323 average: true,
324 median: false
325 },
326 waitTime: {
327 aggregate: false,
328 average: false,
329 median: false
330 },
331 elu: {
332 aggregate: true,
333 average: true,
334 median: false
335 }
336 })
337 pool.setWorkerChoiceStrategyOptions({
338 runTime: { median: true },
339 elu: { median: true }
340 })
341 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
342 runTime: { median: true },
343 elu: { median: true }
344 })
345 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
346 .workerChoiceStrategies) {
347 expect(workerChoiceStrategy.opts).toStrictEqual({
348 runTime: { median: true },
349 elu: { median: true }
350 })
351 }
352 expect(
353 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
354 ).toStrictEqual({
355 runTime: {
356 aggregate: true,
357 average: false,
358 median: true
359 },
360 waitTime: {
361 aggregate: false,
362 average: false,
363 median: false
364 },
365 elu: {
366 aggregate: true,
367 average: false,
368 median: true
369 }
370 })
371 pool.setWorkerChoiceStrategyOptions({
372 runTime: { median: false },
373 elu: { median: false }
374 })
375 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
376 runTime: { median: false },
377 elu: { median: false }
378 })
379 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
380 .workerChoiceStrategies) {
381 expect(workerChoiceStrategy.opts).toStrictEqual({
382 runTime: { median: false },
383 elu: { median: false }
384 })
385 }
386 expect(
387 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
388 ).toStrictEqual({
389 runTime: {
390 aggregate: true,
391 average: true,
392 median: false
393 },
394 waitTime: {
395 aggregate: false,
396 average: false,
397 median: false
398 },
399 elu: {
400 aggregate: true,
401 average: true,
402 median: false
403 }
404 })
405 expect(() =>
406 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
407 ).toThrowError(
408 'Invalid worker choice strategy options: must be a plain object'
409 )
410 expect(() =>
411 pool.setWorkerChoiceStrategyOptions({ weights: {} })
412 ).toThrowError(
413 'Invalid worker choice strategy options: must have a weight for each worker node'
414 )
415 expect(() =>
416 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
417 ).toThrowError(
418 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
419 )
420 await pool.destroy()
421 })
422
423 it('Verify that pool tasks queue can be enabled/disabled', async () => {
424 const pool = new FixedThreadPool(
425 numberOfWorkers,
426 './tests/worker-files/thread/testWorker.js'
427 )
428 expect(pool.opts.enableTasksQueue).toBe(false)
429 expect(pool.opts.tasksQueueOptions).toBeUndefined()
430 pool.enableTasksQueue(true)
431 expect(pool.opts.enableTasksQueue).toBe(true)
432 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
433 pool.enableTasksQueue(true, { concurrency: 2 })
434 expect(pool.opts.enableTasksQueue).toBe(true)
435 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
436 pool.enableTasksQueue(false)
437 expect(pool.opts.enableTasksQueue).toBe(false)
438 expect(pool.opts.tasksQueueOptions).toBeUndefined()
439 await pool.destroy()
440 })
441
442 it('Verify that pool tasks queue options can be set', async () => {
443 const pool = new FixedThreadPool(
444 numberOfWorkers,
445 './tests/worker-files/thread/testWorker.js',
446 { enableTasksQueue: true }
447 )
448 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
449 pool.setTasksQueueOptions({ concurrency: 2 })
450 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
451 expect(() =>
452 pool.setTasksQueueOptions('invalidTasksQueueOptions')
453 ).toThrowError('Invalid tasks queue options: must be a plain object')
454 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
455 "Invalid worker tasks concurrency '0'"
456 )
457 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
458 'Invalid worker tasks concurrency: must be an integer'
459 )
460 await pool.destroy()
461 })
462
463 it('Verify that pool info is set', async () => {
464 let pool = new FixedThreadPool(
465 numberOfWorkers,
466 './tests/worker-files/thread/testWorker.js'
467 )
468 expect(pool.info).toStrictEqual({
469 version,
470 type: PoolTypes.fixed,
471 worker: WorkerTypes.thread,
472 ready: true,
473 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
474 minSize: numberOfWorkers,
475 maxSize: numberOfWorkers,
476 workerNodes: numberOfWorkers,
477 idleWorkerNodes: numberOfWorkers,
478 busyWorkerNodes: 0,
479 executedTasks: 0,
480 executingTasks: 0,
481 failedTasks: 0
482 })
483 await pool.destroy()
484 pool = new DynamicClusterPool(
485 Math.floor(numberOfWorkers / 2),
486 numberOfWorkers,
487 './tests/worker-files/cluster/testWorker.js'
488 )
489 expect(pool.info).toStrictEqual({
490 version,
491 type: PoolTypes.dynamic,
492 worker: WorkerTypes.cluster,
493 ready: true,
494 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
495 minSize: Math.floor(numberOfWorkers / 2),
496 maxSize: numberOfWorkers,
497 workerNodes: Math.floor(numberOfWorkers / 2),
498 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
499 busyWorkerNodes: 0,
500 executedTasks: 0,
501 executingTasks: 0,
502 failedTasks: 0
503 })
504 await pool.destroy()
505 })
506
507 it('Verify that pool worker tasks usage are initialized', async () => {
508 const pool = new FixedClusterPool(
509 numberOfWorkers,
510 './tests/worker-files/cluster/testWorker.js'
511 )
512 for (const workerNode of pool.workerNodes) {
513 expect(workerNode.usage).toStrictEqual({
514 tasks: {
515 executed: 0,
516 executing: 0,
517 queued: 0,
518 maxQueued: 0,
519 failed: 0
520 },
521 runTime: {
522 history: expect.any(CircularArray)
523 },
524 waitTime: {
525 history: expect.any(CircularArray)
526 },
527 elu: {
528 idle: {
529 history: expect.any(CircularArray)
530 },
531 active: {
532 history: expect.any(CircularArray)
533 }
534 }
535 })
536 }
537 await pool.destroy()
538 })
539
540 it('Verify that pool worker tasks queue are initialized', async () => {
541 let pool = new FixedClusterPool(
542 numberOfWorkers,
543 './tests/worker-files/cluster/testWorker.js'
544 )
545 for (const workerNode of pool.workerNodes) {
546 expect(workerNode.tasksQueue).toBeDefined()
547 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
548 expect(workerNode.tasksQueue.size).toBe(0)
549 expect(workerNode.tasksQueue.maxSize).toBe(0)
550 }
551 await pool.destroy()
552 pool = new DynamicThreadPool(
553 Math.floor(numberOfWorkers / 2),
554 numberOfWorkers,
555 './tests/worker-files/thread/testWorker.js'
556 )
557 for (const workerNode of pool.workerNodes) {
558 expect(workerNode.tasksQueue).toBeDefined()
559 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
560 expect(workerNode.tasksQueue.size).toBe(0)
561 expect(workerNode.tasksQueue.maxSize).toBe(0)
562 }
563 })
564
565 it('Verify that pool worker info are initialized', async () => {
566 let pool = new FixedClusterPool(
567 numberOfWorkers,
568 './tests/worker-files/cluster/testWorker.js'
569 )
570 for (const workerNode of pool.workerNodes) {
571 expect(workerNode.info).toStrictEqual({
572 id: expect.any(Number),
573 type: WorkerTypes.cluster,
574 dynamic: false,
575 ready: true
576 })
577 }
578 await pool.destroy()
579 pool = new DynamicThreadPool(
580 Math.floor(numberOfWorkers / 2),
581 numberOfWorkers,
582 './tests/worker-files/thread/testWorker.js'
583 )
584 for (const workerNode of pool.workerNodes) {
585 expect(workerNode.info).toStrictEqual({
586 id: expect.any(Number),
587 type: WorkerTypes.thread,
588 dynamic: false,
589 ready: true,
590 messageChannel: expect.any(MessageChannel)
591 })
592 }
593 })
594
595 it('Verify that pool worker tasks usage are computed', async () => {
596 const pool = new FixedClusterPool(
597 numberOfWorkers,
598 './tests/worker-files/cluster/testWorker.js'
599 )
600 const promises = new Set()
601 const maxMultiplier = 2
602 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
603 promises.add(pool.execute())
604 }
605 for (const workerNode of pool.workerNodes) {
606 expect(workerNode.usage).toStrictEqual({
607 tasks: {
608 executed: 0,
609 executing: maxMultiplier,
610 queued: 0,
611 maxQueued: 0,
612 failed: 0
613 },
614 runTime: {
615 history: expect.any(CircularArray)
616 },
617 waitTime: {
618 history: expect.any(CircularArray)
619 },
620 elu: {
621 idle: {
622 history: expect.any(CircularArray)
623 },
624 active: {
625 history: expect.any(CircularArray)
626 }
627 }
628 })
629 }
630 await Promise.all(promises)
631 for (const workerNode of pool.workerNodes) {
632 expect(workerNode.usage).toStrictEqual({
633 tasks: {
634 executed: maxMultiplier,
635 executing: 0,
636 queued: 0,
637 maxQueued: 0,
638 failed: 0
639 },
640 runTime: {
641 history: expect.any(CircularArray)
642 },
643 waitTime: {
644 history: expect.any(CircularArray)
645 },
646 elu: {
647 idle: {
648 history: expect.any(CircularArray)
649 },
650 active: {
651 history: expect.any(CircularArray)
652 }
653 }
654 })
655 }
656 await pool.destroy()
657 })
658
659 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
660 const pool = new DynamicThreadPool(
661 Math.floor(numberOfWorkers / 2),
662 numberOfWorkers,
663 './tests/worker-files/thread/testWorker.js'
664 )
665 const promises = new Set()
666 const maxMultiplier = 2
667 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
668 promises.add(pool.execute())
669 }
670 await Promise.all(promises)
671 for (const workerNode of pool.workerNodes) {
672 expect(workerNode.usage).toStrictEqual({
673 tasks: {
674 executed: expect.any(Number),
675 executing: 0,
676 queued: 0,
677 maxQueued: 0,
678 failed: 0
679 },
680 runTime: {
681 history: expect.any(CircularArray)
682 },
683 waitTime: {
684 history: expect.any(CircularArray)
685 },
686 elu: {
687 idle: {
688 history: expect.any(CircularArray)
689 },
690 active: {
691 history: expect.any(CircularArray)
692 }
693 }
694 })
695 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
696 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
697 expect(workerNode.usage.runTime.history.length).toBe(0)
698 expect(workerNode.usage.waitTime.history.length).toBe(0)
699 expect(workerNode.usage.elu.idle.history.length).toBe(0)
700 expect(workerNode.usage.elu.active.history.length).toBe(0)
701 }
702 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
703 for (const workerNode of pool.workerNodes) {
704 expect(workerNode.usage).toStrictEqual({
705 tasks: {
706 executed: 0,
707 executing: 0,
708 queued: 0,
709 maxQueued: 0,
710 failed: 0
711 },
712 runTime: {
713 history: expect.any(CircularArray)
714 },
715 waitTime: {
716 history: expect.any(CircularArray)
717 },
718 elu: {
719 idle: {
720 history: expect.any(CircularArray)
721 },
722 active: {
723 history: expect.any(CircularArray)
724 }
725 }
726 })
727 expect(workerNode.usage.runTime.history.length).toBe(0)
728 expect(workerNode.usage.waitTime.history.length).toBe(0)
729 expect(workerNode.usage.elu.idle.history.length).toBe(0)
730 expect(workerNode.usage.elu.active.history.length).toBe(0)
731 }
732 await pool.destroy()
733 })
734
735 it("Verify that pool event emitter 'full' event can register a callback", async () => {
736 const pool = new DynamicThreadPool(
737 Math.floor(numberOfWorkers / 2),
738 numberOfWorkers,
739 './tests/worker-files/thread/testWorker.js'
740 )
741 const promises = new Set()
742 let poolFull = 0
743 let poolInfo
744 pool.emitter.on(PoolEvents.full, (info) => {
745 ++poolFull
746 poolInfo = info
747 })
748 for (let i = 0; i < numberOfWorkers * 2; i++) {
749 promises.add(pool.execute())
750 }
751 await Promise.all(promises)
752 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
753 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
754 expect(poolFull).toBe(numberOfWorkers * 2 - 1)
755 expect(poolInfo).toStrictEqual({
756 version,
757 type: PoolTypes.dynamic,
758 worker: WorkerTypes.thread,
759 ready: expect.any(Boolean),
760 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
761 minSize: expect.any(Number),
762 maxSize: expect.any(Number),
763 workerNodes: expect.any(Number),
764 idleWorkerNodes: expect.any(Number),
765 busyWorkerNodes: expect.any(Number),
766 executedTasks: expect.any(Number),
767 executingTasks: expect.any(Number),
768 failedTasks: expect.any(Number)
769 })
770 await pool.destroy()
771 })
772
773 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
774 const pool = new DynamicClusterPool(
775 Math.floor(numberOfWorkers / 2),
776 numberOfWorkers,
777 './tests/worker-files/cluster/testWorker.js'
778 )
779 let poolInfo
780 let poolReady = 0
781 pool.emitter.on(PoolEvents.ready, (info) => {
782 ++poolReady
783 poolInfo = info
784 })
785 await waitPoolEvents(pool, PoolEvents.ready, 1)
786 expect(poolReady).toBe(1)
787 expect(poolInfo).toStrictEqual({
788 version,
789 type: PoolTypes.dynamic,
790 worker: WorkerTypes.cluster,
791 ready: true,
792 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
793 minSize: expect.any(Number),
794 maxSize: expect.any(Number),
795 workerNodes: expect.any(Number),
796 idleWorkerNodes: expect.any(Number),
797 busyWorkerNodes: expect.any(Number),
798 executedTasks: expect.any(Number),
799 executingTasks: expect.any(Number),
800 failedTasks: expect.any(Number)
801 })
802 await pool.destroy()
803 })
804
805 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
806 const pool = new FixedThreadPool(
807 numberOfWorkers,
808 './tests/worker-files/thread/testWorker.js'
809 )
810 const promises = new Set()
811 let poolBusy = 0
812 let poolInfo
813 pool.emitter.on(PoolEvents.busy, (info) => {
814 ++poolBusy
815 poolInfo = info
816 })
817 for (let i = 0; i < numberOfWorkers * 2; i++) {
818 promises.add(pool.execute())
819 }
820 await Promise.all(promises)
821 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
822 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
823 expect(poolBusy).toBe(numberOfWorkers + 1)
824 expect(poolInfo).toStrictEqual({
825 version,
826 type: PoolTypes.fixed,
827 worker: WorkerTypes.thread,
828 ready: expect.any(Boolean),
829 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
830 minSize: expect.any(Number),
831 maxSize: expect.any(Number),
832 workerNodes: expect.any(Number),
833 idleWorkerNodes: expect.any(Number),
834 busyWorkerNodes: expect.any(Number),
835 executedTasks: expect.any(Number),
836 executingTasks: expect.any(Number),
837 failedTasks: expect.any(Number)
838 })
839 await pool.destroy()
840 })
841
842 it('Verify that multiple tasks worker is working', async () => {
843 const pool = new DynamicClusterPool(
844 Math.floor(numberOfWorkers / 2),
845 numberOfWorkers,
846 './tests/worker-files/cluster/testMultiTasksWorker.js'
847 )
848 const data = { n: 10 }
849 const result0 = await pool.execute(data)
850 expect(result0).toStrictEqual({ ok: 1 })
851 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
852 expect(result1).toStrictEqual({ ok: 1 })
853 const result2 = await pool.execute(data, 'factorial')
854 expect(result2).toBe(3628800)
855 const result3 = await pool.execute(data, 'fibonacci')
856 expect(result3).toBe(55)
857 })
858 })