1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.js'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.js'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.js'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
232 retries: pool.info.maxSize,
233 runTime: { median: false },
234 waitTime: { median: false },
235 elu: { median: false }
237 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
238 .workerChoiceStrategies) {
239 expect(workerChoiceStrategy.opts).toStrictEqual({
240 retries: pool.info.maxSize,
241 runTime: { median: false },
242 waitTime: { median: false },
243 elu: { median: false }
247 const testHandler = () => console.info('test handler executed')
248 pool = new FixedThreadPool(
250 './tests/worker-files/thread/testWorker.mjs',
252 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
253 workerChoiceStrategyOptions: {
254 runTime: { median: true },
255 weights: { 0: 300, 1: 200 }
258 restartWorkerOnError: false,
259 enableTasksQueue: true,
260 tasksQueueOptions: { concurrency: 2 },
261 messageHandler: testHandler,
262 errorHandler: testHandler,
263 onlineHandler: testHandler,
264 exitHandler: testHandler
267 expect(pool.emitter).toBeUndefined()
268 expect(pool.opts).toStrictEqual({
271 restartWorkerOnError: false,
272 enableTasksQueue: true,
275 size: Math.pow(numberOfWorkers, 2),
277 tasksStealingOnBackPressure: true,
278 tasksFinishedTimeout: 2000
280 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
281 workerChoiceStrategyOptions: {
282 runTime: { median: true },
283 weights: { 0: 300, 1: 200 }
285 onlineHandler: testHandler,
286 messageHandler: testHandler,
287 errorHandler: testHandler,
288 exitHandler: testHandler
290 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
291 retries: pool.info.maxSize,
292 runTime: { median: true },
293 waitTime: { median: false },
294 elu: { median: false },
295 weights: { 0: 300, 1: 200 }
297 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
298 .workerChoiceStrategies) {
299 expect(workerChoiceStrategy.opts).toStrictEqual({
300 retries: pool.info.maxSize,
301 runTime: { median: true },
302 waitTime: { median: false },
303 elu: { median: false },
304 weights: { 0: 300, 1: 200 }
310 it('Verify that pool options are validated', () => {
315 './tests/worker-files/thread/testWorker.mjs',
317 workerChoiceStrategy: 'invalidStrategy'
320 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
325 './tests/worker-files/thread/testWorker.mjs',
327 workerChoiceStrategyOptions: { weights: {} }
332 'Invalid worker choice strategy options: must have a weight for each worker node'
339 './tests/worker-files/thread/testWorker.mjs',
341 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
346 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
353 './tests/worker-files/thread/testWorker.mjs',
355 enableTasksQueue: true,
356 tasksQueueOptions: 'invalidTasksQueueOptions'
360 new TypeError('Invalid tasks queue options: must be a plain object')
366 './tests/worker-files/thread/testWorker.mjs',
368 enableTasksQueue: true,
369 tasksQueueOptions: { concurrency: 0 }
374 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
381 './tests/worker-files/thread/testWorker.mjs',
383 enableTasksQueue: true,
384 tasksQueueOptions: { concurrency: -1 }
389 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
396 './tests/worker-files/thread/testWorker.mjs',
398 enableTasksQueue: true,
399 tasksQueueOptions: { concurrency: 0.2 }
403 new TypeError('Invalid worker node tasks concurrency: must be an integer')
409 './tests/worker-files/thread/testWorker.mjs',
411 enableTasksQueue: true,
412 tasksQueueOptions: { size: 0 }
417 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
424 './tests/worker-files/thread/testWorker.mjs',
426 enableTasksQueue: true,
427 tasksQueueOptions: { size: -1 }
432 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
439 './tests/worker-files/thread/testWorker.mjs',
441 enableTasksQueue: true,
442 tasksQueueOptions: { size: 0.2 }
446 new TypeError('Invalid worker node tasks queue size: must be an integer')
450 it('Verify that pool worker choice strategy options can be set', async () => {
451 const pool = new FixedThreadPool(
453 './tests/worker-files/thread/testWorker.mjs',
454 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
456 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
457 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
458 retries: pool.info.maxSize,
459 runTime: { median: false },
460 waitTime: { median: false },
461 elu: { median: false }
463 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
464 .workerChoiceStrategies) {
465 expect(workerChoiceStrategy.opts).toStrictEqual({
466 retries: pool.info.maxSize,
467 runTime: { median: false },
468 waitTime: { median: false },
469 elu: { median: false }
473 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
491 pool.setWorkerChoiceStrategyOptions({
492 runTime: { median: true },
493 elu: { median: true }
495 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
496 runTime: { median: true },
497 elu: { median: true }
499 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
500 retries: pool.info.maxSize,
501 runTime: { median: true },
502 waitTime: { median: false },
503 elu: { median: true }
505 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
506 .workerChoiceStrategies) {
507 expect(workerChoiceStrategy.opts).toStrictEqual({
508 retries: pool.info.maxSize,
509 runTime: { median: true },
510 waitTime: { median: false },
511 elu: { median: true }
515 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
533 pool.setWorkerChoiceStrategyOptions({
534 runTime: { median: false },
535 elu: { median: false }
537 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
538 runTime: { median: false },
539 elu: { median: false }
541 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
542 retries: pool.info.maxSize,
543 runTime: { median: false },
544 waitTime: { median: false },
545 elu: { median: false }
547 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
548 .workerChoiceStrategies) {
549 expect(workerChoiceStrategy.opts).toStrictEqual({
550 retries: pool.info.maxSize,
551 runTime: { median: false },
552 waitTime: { median: false },
553 elu: { median: false }
557 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
576 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
579 'Invalid worker choice strategy options: must be a plain object'
582 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
584 'Invalid worker choice strategy options: must have a weight for each worker node'
588 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
591 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
597 it('Verify that pool tasks queue can be enabled/disabled', async () => {
598 const pool = new FixedThreadPool(
600 './tests/worker-files/thread/testWorker.mjs'
602 expect(pool.opts.enableTasksQueue).toBe(false)
603 expect(pool.opts.tasksQueueOptions).toBeUndefined()
604 pool.enableTasksQueue(true)
605 expect(pool.opts.enableTasksQueue).toBe(true)
606 expect(pool.opts.tasksQueueOptions).toStrictEqual({
608 size: Math.pow(numberOfWorkers, 2),
610 tasksStealingOnBackPressure: true,
611 tasksFinishedTimeout: 2000
613 pool.enableTasksQueue(true, { concurrency: 2 })
614 expect(pool.opts.enableTasksQueue).toBe(true)
615 expect(pool.opts.tasksQueueOptions).toStrictEqual({
617 size: Math.pow(numberOfWorkers, 2),
619 tasksStealingOnBackPressure: true,
620 tasksFinishedTimeout: 2000
622 pool.enableTasksQueue(false)
623 expect(pool.opts.enableTasksQueue).toBe(false)
624 expect(pool.opts.tasksQueueOptions).toBeUndefined()
628 it('Verify that pool tasks queue options can be set', async () => {
629 const pool = new FixedThreadPool(
631 './tests/worker-files/thread/testWorker.mjs',
632 { enableTasksQueue: true }
634 expect(pool.opts.tasksQueueOptions).toStrictEqual({
636 size: Math.pow(numberOfWorkers, 2),
638 tasksStealingOnBackPressure: true,
639 tasksFinishedTimeout: 2000
641 for (const workerNode of pool.workerNodes) {
642 expect(workerNode.tasksQueueBackPressureSize).toBe(
643 pool.opts.tasksQueueOptions.size
646 pool.setTasksQueueOptions({
650 tasksStealingOnBackPressure: false,
651 tasksFinishedTimeout: 3000
653 expect(pool.opts.tasksQueueOptions).toStrictEqual({
657 tasksStealingOnBackPressure: false,
658 tasksFinishedTimeout: 3000
660 for (const workerNode of pool.workerNodes) {
661 expect(workerNode.tasksQueueBackPressureSize).toBe(
662 pool.opts.tasksQueueOptions.size
665 pool.setTasksQueueOptions({
668 tasksStealingOnBackPressure: true
670 expect(pool.opts.tasksQueueOptions).toStrictEqual({
672 size: Math.pow(numberOfWorkers, 2),
674 tasksStealingOnBackPressure: true,
675 tasksFinishedTimeout: 2000
677 for (const workerNode of pool.workerNodes) {
678 expect(workerNode.tasksQueueBackPressureSize).toBe(
679 pool.opts.tasksQueueOptions.size
682 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
683 new TypeError('Invalid tasks queue options: must be a plain object')
685 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
687 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
690 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
692 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
695 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
696 new TypeError('Invalid worker node tasks concurrency: must be an integer')
698 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
700 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
703 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
705 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
708 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
709 new TypeError('Invalid worker node tasks queue size: must be an integer')
714 it('Verify that pool info is set', async () => {
715 let pool = new FixedThreadPool(
717 './tests/worker-files/thread/testWorker.mjs'
719 expect(pool.info).toStrictEqual({
721 type: PoolTypes.fixed,
722 worker: WorkerTypes.thread,
725 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
726 minSize: numberOfWorkers,
727 maxSize: numberOfWorkers,
728 workerNodes: numberOfWorkers,
729 idleWorkerNodes: numberOfWorkers,
736 pool = new DynamicClusterPool(
737 Math.floor(numberOfWorkers / 2),
739 './tests/worker-files/cluster/testWorker.js'
741 expect(pool.info).toStrictEqual({
743 type: PoolTypes.dynamic,
744 worker: WorkerTypes.cluster,
747 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
748 minSize: Math.floor(numberOfWorkers / 2),
749 maxSize: numberOfWorkers,
750 workerNodes: Math.floor(numberOfWorkers / 2),
751 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
760 it('Verify that pool worker tasks usage are initialized', async () => {
761 const pool = new FixedClusterPool(
763 './tests/worker-files/cluster/testWorker.js'
765 for (const workerNode of pool.workerNodes) {
766 expect(workerNode).toBeInstanceOf(WorkerNode)
767 expect(workerNode.usage).toStrictEqual({
773 sequentiallyStolen: 0,
778 history: new CircularArray()
781 history: new CircularArray()
785 history: new CircularArray()
788 history: new CircularArray()
796 it('Verify that pool worker tasks queue are initialized', async () => {
797 let pool = new FixedClusterPool(
799 './tests/worker-files/cluster/testWorker.js'
801 for (const workerNode of pool.workerNodes) {
802 expect(workerNode).toBeInstanceOf(WorkerNode)
803 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
804 expect(workerNode.tasksQueue.size).toBe(0)
805 expect(workerNode.tasksQueue.maxSize).toBe(0)
808 pool = new DynamicThreadPool(
809 Math.floor(numberOfWorkers / 2),
811 './tests/worker-files/thread/testWorker.mjs'
813 for (const workerNode of pool.workerNodes) {
814 expect(workerNode).toBeInstanceOf(WorkerNode)
815 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
816 expect(workerNode.tasksQueue.size).toBe(0)
817 expect(workerNode.tasksQueue.maxSize).toBe(0)
822 it('Verify that pool worker info are initialized', async () => {
823 let pool = new FixedClusterPool(
825 './tests/worker-files/cluster/testWorker.js'
827 for (const workerNode of pool.workerNodes) {
828 expect(workerNode).toBeInstanceOf(WorkerNode)
829 expect(workerNode.info).toStrictEqual({
830 id: expect.any(Number),
831 type: WorkerTypes.cluster,
837 pool = new DynamicThreadPool(
838 Math.floor(numberOfWorkers / 2),
840 './tests/worker-files/thread/testWorker.mjs'
842 for (const workerNode of pool.workerNodes) {
843 expect(workerNode).toBeInstanceOf(WorkerNode)
844 expect(workerNode.info).toStrictEqual({
845 id: expect.any(Number),
846 type: WorkerTypes.thread,
854 it('Verify that pool statuses are checked at start or destroy', async () => {
855 const pool = new FixedThreadPool(
857 './tests/worker-files/thread/testWorker.mjs'
859 expect(pool.info.started).toBe(true)
860 expect(pool.info.ready).toBe(true)
861 expect(() => pool.start()).toThrow(
862 new Error('Cannot start an already started pool')
865 expect(pool.info.started).toBe(false)
866 expect(pool.info.ready).toBe(false)
867 await expect(pool.destroy()).rejects.toThrow(
868 new Error('Cannot destroy an already destroyed pool')
872 it('Verify that pool can be started after initialization', async () => {
873 const pool = new FixedClusterPool(
875 './tests/worker-files/cluster/testWorker.js',
880 expect(pool.info.started).toBe(false)
881 expect(pool.info.ready).toBe(false)
882 expect(pool.readyEventEmitted).toBe(false)
883 expect(pool.workerNodes).toStrictEqual([])
884 await expect(pool.execute()).rejects.toThrow(
885 new Error('Cannot execute a task on not started pool')
888 expect(pool.info.started).toBe(true)
889 expect(pool.info.ready).toBe(true)
890 await waitPoolEvents(pool, PoolEvents.ready, 1)
891 expect(pool.readyEventEmitted).toBe(true)
892 expect(pool.workerNodes.length).toBe(numberOfWorkers)
893 for (const workerNode of pool.workerNodes) {
894 expect(workerNode).toBeInstanceOf(WorkerNode)
899 it('Verify that pool execute() arguments are checked', async () => {
900 const pool = new FixedClusterPool(
902 './tests/worker-files/cluster/testWorker.js'
904 await expect(pool.execute(undefined, 0)).rejects.toThrow(
905 new TypeError('name argument must be a string')
907 await expect(pool.execute(undefined, '')).rejects.toThrow(
908 new TypeError('name argument must not be an empty string')
910 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
911 new TypeError('transferList argument must be an array')
913 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
914 "Task function 'unknown' not found"
917 await expect(pool.execute()).rejects.toThrow(
918 new Error('Cannot execute a task on not started pool')
922 it('Verify that pool worker tasks usage are computed', async () => {
923 const pool = new FixedClusterPool(
925 './tests/worker-files/cluster/testWorker.js'
927 const promises = new Set()
928 const maxMultiplier = 2
929 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
930 promises.add(pool.execute())
932 for (const workerNode of pool.workerNodes) {
933 expect(workerNode.usage).toStrictEqual({
936 executing: maxMultiplier,
939 sequentiallyStolen: 0,
944 history: expect.any(CircularArray)
947 history: expect.any(CircularArray)
951 history: expect.any(CircularArray)
954 history: expect.any(CircularArray)
959 await Promise.all(promises)
960 for (const workerNode of pool.workerNodes) {
961 expect(workerNode.usage).toStrictEqual({
963 executed: maxMultiplier,
967 sequentiallyStolen: 0,
972 history: expect.any(CircularArray)
975 history: expect.any(CircularArray)
979 history: expect.any(CircularArray)
982 history: expect.any(CircularArray)
990 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
991 const pool = new DynamicThreadPool(
992 Math.floor(numberOfWorkers / 2),
994 './tests/worker-files/thread/testWorker.mjs'
996 const promises = new Set()
997 const maxMultiplier = 2
998 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
999 promises.add(pool.execute())
1001 await Promise.all(promises)
1002 for (const workerNode of pool.workerNodes) {
1003 expect(workerNode.usage).toStrictEqual({
1005 executed: expect.any(Number),
1009 sequentiallyStolen: 0,
1014 history: expect.any(CircularArray)
1017 history: expect.any(CircularArray)
1021 history: expect.any(CircularArray)
1024 history: expect.any(CircularArray)
1028 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1029 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1030 numberOfWorkers * maxMultiplier
1032 expect(workerNode.usage.runTime.history.length).toBe(0)
1033 expect(workerNode.usage.waitTime.history.length).toBe(0)
1034 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1035 expect(workerNode.usage.elu.active.history.length).toBe(0)
1037 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1038 for (const workerNode of pool.workerNodes) {
1039 expect(workerNode.usage).toStrictEqual({
1045 sequentiallyStolen: 0,
1050 history: expect.any(CircularArray)
1053 history: expect.any(CircularArray)
1057 history: expect.any(CircularArray)
1060 history: expect.any(CircularArray)
1064 expect(workerNode.usage.runTime.history.length).toBe(0)
1065 expect(workerNode.usage.waitTime.history.length).toBe(0)
1066 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1067 expect(workerNode.usage.elu.active.history.length).toBe(0)
1069 await pool.destroy()
1072 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1073 const pool = new DynamicClusterPool(
1074 Math.floor(numberOfWorkers / 2),
1076 './tests/worker-files/cluster/testWorker.js'
1078 expect(pool.emitter.eventNames()).toStrictEqual([])
1081 pool.emitter.on(PoolEvents.ready, info => {
1085 await waitPoolEvents(pool, PoolEvents.ready, 1)
1086 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1087 expect(poolReady).toBe(1)
1088 expect(poolInfo).toStrictEqual({
1090 type: PoolTypes.dynamic,
1091 worker: WorkerTypes.cluster,
1094 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1095 minSize: expect.any(Number),
1096 maxSize: expect.any(Number),
1097 workerNodes: expect.any(Number),
1098 idleWorkerNodes: expect.any(Number),
1099 busyWorkerNodes: expect.any(Number),
1100 executedTasks: expect.any(Number),
1101 executingTasks: expect.any(Number),
1102 failedTasks: expect.any(Number)
1104 await pool.destroy()
1107 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1108 const pool = new FixedThreadPool(
1110 './tests/worker-files/thread/testWorker.mjs'
1112 expect(pool.emitter.eventNames()).toStrictEqual([])
1113 const promises = new Set()
1116 pool.emitter.on(PoolEvents.busy, info => {
1120 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1121 for (let i = 0; i < numberOfWorkers * 2; i++) {
1122 promises.add(pool.execute())
1124 await Promise.all(promises)
1125 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1126 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1127 expect(poolBusy).toBe(numberOfWorkers + 1)
1128 expect(poolInfo).toStrictEqual({
1130 type: PoolTypes.fixed,
1131 worker: WorkerTypes.thread,
1134 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1135 minSize: expect.any(Number),
1136 maxSize: expect.any(Number),
1137 workerNodes: expect.any(Number),
1138 idleWorkerNodes: expect.any(Number),
1139 busyWorkerNodes: expect.any(Number),
1140 executedTasks: expect.any(Number),
1141 executingTasks: expect.any(Number),
1142 failedTasks: expect.any(Number)
1144 await pool.destroy()
1147 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1148 const pool = new DynamicThreadPool(
1149 Math.floor(numberOfWorkers / 2),
1151 './tests/worker-files/thread/testWorker.mjs'
1153 expect(pool.emitter.eventNames()).toStrictEqual([])
1154 const promises = new Set()
1157 pool.emitter.on(PoolEvents.full, info => {
1161 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1162 for (let i = 0; i < numberOfWorkers * 2; i++) {
1163 promises.add(pool.execute())
1165 await Promise.all(promises)
1166 expect(poolFull).toBe(1)
1167 expect(poolInfo).toStrictEqual({
1169 type: PoolTypes.dynamic,
1170 worker: WorkerTypes.thread,
1173 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1174 minSize: expect.any(Number),
1175 maxSize: expect.any(Number),
1176 workerNodes: expect.any(Number),
1177 idleWorkerNodes: expect.any(Number),
1178 busyWorkerNodes: expect.any(Number),
1179 executedTasks: expect.any(Number),
1180 executingTasks: expect.any(Number),
1181 failedTasks: expect.any(Number)
1183 await pool.destroy()
1186 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1187 const pool = new FixedThreadPool(
1189 './tests/worker-files/thread/testWorker.mjs',
1191 enableTasksQueue: true
1194 stub(pool, 'hasBackPressure').returns(true)
1195 expect(pool.emitter.eventNames()).toStrictEqual([])
1196 const promises = new Set()
1197 let poolBackPressure = 0
1199 pool.emitter.on(PoolEvents.backPressure, info => {
1203 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1204 for (let i = 0; i < numberOfWorkers + 1; i++) {
1205 promises.add(pool.execute())
1207 await Promise.all(promises)
1208 expect(poolBackPressure).toBe(1)
1209 expect(poolInfo).toStrictEqual({
1211 type: PoolTypes.fixed,
1212 worker: WorkerTypes.thread,
1215 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1216 minSize: expect.any(Number),
1217 maxSize: expect.any(Number),
1218 workerNodes: expect.any(Number),
1219 idleWorkerNodes: expect.any(Number),
1220 busyWorkerNodes: expect.any(Number),
1221 executedTasks: expect.any(Number),
1222 executingTasks: expect.any(Number),
1223 maxQueuedTasks: expect.any(Number),
1224 queuedTasks: expect.any(Number),
1226 stolenTasks: expect.any(Number),
1227 failedTasks: expect.any(Number)
1229 expect(pool.hasBackPressure.callCount).toBe(5)
1230 await pool.destroy()
1233 it('Verify that destroy() waits for queued tasks to finish', async () => {
1234 const tasksFinishedTimeout = 2500
1235 const pool = new FixedThreadPool(
1237 './tests/worker-files/thread/asyncWorker.mjs',
1239 enableTasksQueue: true,
1240 tasksQueueOptions: { tasksFinishedTimeout }
1243 const maxMultiplier = 4
1244 let tasksFinished = 0
1245 for (const workerNode of pool.workerNodes) {
1246 workerNode.on('taskFinished', () => {
1250 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1253 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1254 const startTime = performance.now()
1255 await pool.destroy()
1256 const elapsedTime = performance.now() - startTime
1257 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1258 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1259 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1262 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1263 const tasksFinishedTimeout = 1000
1264 const pool = new FixedThreadPool(
1266 './tests/worker-files/thread/asyncWorker.mjs',
1268 enableTasksQueue: true,
1269 tasksQueueOptions: { tasksFinishedTimeout }
1272 const maxMultiplier = 4
1273 let tasksFinished = 0
1274 for (const workerNode of pool.workerNodes) {
1275 workerNode.on('taskFinished', () => {
1279 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1282 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1283 const startTime = performance.now()
1284 await pool.destroy()
1285 const elapsedTime = performance.now() - startTime
1286 expect(tasksFinished).toBe(0)
1287 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1290 it('Verify that pool asynchronous resource track tasks execution', async () => {
1295 let resolveCalls = 0
1296 const hook = createHook({
1297 init (asyncId, type) {
1298 if (type === 'poolifier:task') {
1300 taskAsyncId = asyncId
1304 if (asyncId === taskAsyncId) beforeCalls++
1307 if (asyncId === taskAsyncId) afterCalls++
1310 if (executionAsyncId() === taskAsyncId) resolveCalls++
1313 const pool = new FixedThreadPool(
1315 './tests/worker-files/thread/testWorker.mjs'
1318 await pool.execute()
1320 expect(initCalls).toBe(1)
1321 expect(beforeCalls).toBe(1)
1322 expect(afterCalls).toBe(1)
1323 expect(resolveCalls).toBe(1)
1324 await pool.destroy()
1327 it('Verify that hasTaskFunction() is working', async () => {
1328 const dynamicThreadPool = new DynamicThreadPool(
1329 Math.floor(numberOfWorkers / 2),
1331 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1333 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1334 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1335 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1338 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1339 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1340 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1341 await dynamicThreadPool.destroy()
1342 const fixedClusterPool = new FixedClusterPool(
1344 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1346 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1347 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1348 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1351 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1352 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1353 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1354 await fixedClusterPool.destroy()
1357 it('Verify that addTaskFunction() is working', async () => {
1358 const dynamicThreadPool = new DynamicThreadPool(
1359 Math.floor(numberOfWorkers / 2),
1361 './tests/worker-files/thread/testWorker.mjs'
1363 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1365 dynamicThreadPool.addTaskFunction(0, () => {})
1366 ).rejects.toThrow(new TypeError('name argument must be a string'))
1368 dynamicThreadPool.addTaskFunction('', () => {})
1370 new TypeError('name argument must not be an empty string')
1372 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1373 new TypeError('fn argument must be a function')
1375 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1376 new TypeError('fn argument must be a function')
1378 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1382 const echoTaskFunction = data => {
1386 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1387 ).resolves.toBe(true)
1388 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1389 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1392 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1397 const taskFunctionData = { test: 'test' }
1398 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1399 expect(echoResult).toStrictEqual(taskFunctionData)
1400 for (const workerNode of dynamicThreadPool.workerNodes) {
1401 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1403 executed: expect.any(Number),
1406 sequentiallyStolen: 0,
1411 history: new CircularArray()
1414 history: new CircularArray()
1418 history: new CircularArray()
1421 history: new CircularArray()
1426 await dynamicThreadPool.destroy()
1429 it('Verify that removeTaskFunction() is working', async () => {
1430 const dynamicThreadPool = new DynamicThreadPool(
1431 Math.floor(numberOfWorkers / 2),
1433 './tests/worker-files/thread/testWorker.mjs'
1435 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1436 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1440 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1441 new Error('Cannot remove a task function not handled on the pool side')
1443 const echoTaskFunction = data => {
1446 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1447 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1448 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1451 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1456 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1459 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1460 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1461 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1465 await dynamicThreadPool.destroy()
1468 it('Verify that listTaskFunctionNames() is working', async () => {
1469 const dynamicThreadPool = new DynamicThreadPool(
1470 Math.floor(numberOfWorkers / 2),
1472 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1474 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1475 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1477 'jsonIntegerSerialization',
1481 await dynamicThreadPool.destroy()
1482 const fixedClusterPool = new FixedClusterPool(
1484 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1486 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1487 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1489 'jsonIntegerSerialization',
1493 await fixedClusterPool.destroy()
1496 it('Verify that setDefaultTaskFunction() is working', async () => {
1497 const dynamicThreadPool = new DynamicThreadPool(
1498 Math.floor(numberOfWorkers / 2),
1500 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1502 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1503 const workerId = dynamicThreadPool.workerNodes[0].info.id
1504 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1506 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1510 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1513 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1517 dynamicThreadPool.setDefaultTaskFunction('unknown')
1520 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1523 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1525 'jsonIntegerSerialization',
1530 dynamicThreadPool.setDefaultTaskFunction('factorial')
1531 ).resolves.toBe(true)
1532 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1535 'jsonIntegerSerialization',
1539 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1540 ).resolves.toBe(true)
1541 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1544 'jsonIntegerSerialization',
1547 await dynamicThreadPool.destroy()
1550 it('Verify that multiple task functions worker is working', async () => {
1551 const pool = new DynamicClusterPool(
1552 Math.floor(numberOfWorkers / 2),
1554 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1556 const data = { n: 10 }
1557 const result0 = await pool.execute(data)
1558 expect(result0).toStrictEqual({ ok: 1 })
1559 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1560 expect(result1).toStrictEqual({ ok: 1 })
1561 const result2 = await pool.execute(data, 'factorial')
1562 expect(result2).toBe(3628800)
1563 const result3 = await pool.execute(data, 'fibonacci')
1564 expect(result3).toBe(55)
1565 expect(pool.info.executingTasks).toBe(0)
1566 expect(pool.info.executedTasks).toBe(4)
1567 for (const workerNode of pool.workerNodes) {
1568 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1570 'jsonIntegerSerialization',
1574 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1575 for (const name of pool.listTaskFunctionNames()) {
1576 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1578 executed: expect.any(Number),
1582 sequentiallyStolen: 0,
1586 history: expect.any(CircularArray)
1589 history: expect.any(CircularArray)
1593 history: expect.any(CircularArray)
1596 history: expect.any(CircularArray)
1601 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1602 ).toBeGreaterThan(0)
1605 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1607 workerNode.getTaskFunctionWorkerUsage(
1608 workerNode.info.taskFunctionNames[1]
1612 await pool.destroy()
1615 it('Verify sendKillMessageToWorker()', async () => {
1616 const pool = new DynamicClusterPool(
1617 Math.floor(numberOfWorkers / 2),
1619 './tests/worker-files/cluster/testWorker.js'
1621 const workerNodeKey = 0
1623 pool.sendKillMessageToWorker(workerNodeKey)
1624 ).resolves.toBeUndefined()
1626 pool.sendKillMessageToWorker(numberOfWorkers)
1627 ).rejects.toStrictEqual(
1628 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1630 await pool.destroy()
1633 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1634 const pool = new DynamicClusterPool(
1635 Math.floor(numberOfWorkers / 2),
1637 './tests/worker-files/cluster/testWorker.js'
1639 const workerNodeKey = 0
1641 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1642 taskFunctionOperation: 'add',
1643 taskFunctionName: 'empty',
1644 taskFunction: (() => {}).toString()
1646 ).resolves.toBe(true)
1648 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1649 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1650 await pool.destroy()
1653 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1654 const pool = new DynamicClusterPool(
1655 Math.floor(numberOfWorkers / 2),
1657 './tests/worker-files/cluster/testWorker.js'
1660 pool.sendTaskFunctionOperationToWorkers({
1661 taskFunctionOperation: 'add',
1662 taskFunctionName: 'empty',
1663 taskFunction: (() => {}).toString()
1665 ).resolves.toBe(true)
1666 for (const workerNode of pool.workerNodes) {
1667 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1673 await pool.destroy()