1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.js'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.js'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.js'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
253 // weights: expect.objectContaining({
254 // 0: expect.any(Number),
255 // [pool.info.maxSize - 1]: expect.any(Number)
261 const testHandler = () => console.info('test handler executed')
262 pool = new FixedThreadPool(
264 './tests/worker-files/thread/testWorker.mjs',
266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
267 workerChoiceStrategyOptions: {
268 runTime: { median: true },
269 weights: { 0: 300, 1: 200 }
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
274 tasksQueueOptions: { concurrency: 2 },
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 onlineHandler: testHandler,
278 exitHandler: testHandler
281 expect(pool.emitter).toBeUndefined()
282 expect(pool.opts).toStrictEqual({
285 restartWorkerOnError: false,
286 enableTasksQueue: true,
289 size: Math.pow(numberOfWorkers, 2),
291 tasksStealingOnBackPressure: true,
292 tasksFinishedTimeout: 2000
294 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
295 workerChoiceStrategyOptions: {
296 runTime: { median: true },
297 weights: { 0: 300, 1: 200 }
299 onlineHandler: testHandler,
300 messageHandler: testHandler,
301 errorHandler: testHandler,
302 exitHandler: testHandler
304 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
307 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
308 runTime: { median: true },
309 waitTime: { median: false },
310 elu: { median: false },
311 weights: { 0: 300, 1: 200 }
313 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
314 .workerChoiceStrategies) {
315 expect(workerChoiceStrategy.opts).toStrictEqual({
318 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
319 runTime: { median: true },
320 waitTime: { median: false },
321 elu: { median: false },
322 weights: { 0: 300, 1: 200 }
328 it('Verify that pool options are validated', () => {
333 './tests/worker-files/thread/testWorker.mjs',
335 workerChoiceStrategy: 'invalidStrategy'
338 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
343 './tests/worker-files/thread/testWorker.mjs',
345 workerChoiceStrategyOptions: { weights: {} }
350 'Invalid worker choice strategy options: must have a weight for each worker node'
357 './tests/worker-files/thread/testWorker.mjs',
359 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
364 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
371 './tests/worker-files/thread/testWorker.mjs',
373 enableTasksQueue: true,
374 tasksQueueOptions: 'invalidTasksQueueOptions'
378 new TypeError('Invalid tasks queue options: must be a plain object')
384 './tests/worker-files/thread/testWorker.mjs',
386 enableTasksQueue: true,
387 tasksQueueOptions: { concurrency: 0 }
392 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
399 './tests/worker-files/thread/testWorker.mjs',
401 enableTasksQueue: true,
402 tasksQueueOptions: { concurrency: -1 }
407 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
414 './tests/worker-files/thread/testWorker.mjs',
416 enableTasksQueue: true,
417 tasksQueueOptions: { concurrency: 0.2 }
421 new TypeError('Invalid worker node tasks concurrency: must be an integer')
427 './tests/worker-files/thread/testWorker.mjs',
429 enableTasksQueue: true,
430 tasksQueueOptions: { size: 0 }
435 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
442 './tests/worker-files/thread/testWorker.mjs',
444 enableTasksQueue: true,
445 tasksQueueOptions: { size: -1 }
450 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
457 './tests/worker-files/thread/testWorker.mjs',
459 enableTasksQueue: true,
460 tasksQueueOptions: { size: 0.2 }
464 new TypeError('Invalid worker node tasks queue size: must be an integer')
468 it('Verify that pool worker choice strategy options can be set', async () => {
469 const pool = new FixedThreadPool(
471 './tests/worker-files/thread/testWorker.mjs',
472 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
474 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
475 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
478 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
479 runTime: { median: false },
480 waitTime: { median: false },
481 elu: { median: false },
482 weights: expect.objectContaining({
483 0: expect.any(Number),
484 [pool.info.maxSize - 1]: expect.any(Number)
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
489 expect(workerChoiceStrategy.opts).toStrictEqual(
490 expect.objectContaining({
493 Object.keys(workerChoiceStrategy.opts.weights).length,
494 runTime: { median: false },
495 waitTime: { median: false },
496 elu: { median: false }
497 // weights: expect.objectContaining({
498 // 0: expect.any(Number),
499 // [pool.info.maxSize - 1]: expect.any(Number)
505 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
523 pool.setWorkerChoiceStrategyOptions({
524 runTime: { median: true },
525 elu: { median: true }
527 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
528 runTime: { median: true },
529 elu: { median: true }
531 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
534 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
535 runTime: { median: true },
536 waitTime: { median: false },
537 elu: { median: true },
538 weights: expect.objectContaining({
539 0: expect.any(Number),
540 [pool.info.maxSize - 1]: expect.any(Number)
543 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
544 .workerChoiceStrategies) {
545 expect(workerChoiceStrategy.opts).toStrictEqual(
546 expect.objectContaining({
549 Object.keys(workerChoiceStrategy.opts.weights).length,
550 runTime: { median: true },
551 waitTime: { median: false },
552 elu: { median: true }
553 // weights: expect.objectContaining({
554 // 0: expect.any(Number),
555 // [pool.info.maxSize - 1]: expect.any(Number)
561 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
579 pool.setWorkerChoiceStrategyOptions({
580 runTime: { median: false },
581 elu: { median: false }
583 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
584 runTime: { median: false },
585 elu: { median: false }
587 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
590 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
591 runTime: { median: false },
592 waitTime: { median: false },
593 elu: { median: false },
594 weights: expect.objectContaining({
595 0: expect.any(Number),
596 [pool.info.maxSize - 1]: expect.any(Number)
599 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
600 .workerChoiceStrategies) {
601 expect(workerChoiceStrategy.opts).toStrictEqual(
602 expect.objectContaining({
605 Object.keys(workerChoiceStrategy.opts.weights).length,
606 runTime: { median: false },
607 waitTime: { median: false },
608 elu: { median: false }
609 // weights: expect.objectContaining({
610 // 0: expect.any(Number),
611 // [pool.info.maxSize - 1]: expect.any(Number)
617 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
636 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
639 'Invalid worker choice strategy options: must be a plain object'
642 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
644 'Invalid worker choice strategy options: must have a weight for each worker node'
648 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
651 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
657 it('Verify that pool tasks queue can be enabled/disabled', async () => {
658 const pool = new FixedThreadPool(
660 './tests/worker-files/thread/testWorker.mjs'
662 expect(pool.opts.enableTasksQueue).toBe(false)
663 expect(pool.opts.tasksQueueOptions).toBeUndefined()
664 pool.enableTasksQueue(true)
665 expect(pool.opts.enableTasksQueue).toBe(true)
666 expect(pool.opts.tasksQueueOptions).toStrictEqual({
668 size: Math.pow(numberOfWorkers, 2),
670 tasksStealingOnBackPressure: true,
671 tasksFinishedTimeout: 2000
673 pool.enableTasksQueue(true, { concurrency: 2 })
674 expect(pool.opts.enableTasksQueue).toBe(true)
675 expect(pool.opts.tasksQueueOptions).toStrictEqual({
677 size: Math.pow(numberOfWorkers, 2),
679 tasksStealingOnBackPressure: true,
680 tasksFinishedTimeout: 2000
682 pool.enableTasksQueue(false)
683 expect(pool.opts.enableTasksQueue).toBe(false)
684 expect(pool.opts.tasksQueueOptions).toBeUndefined()
688 it('Verify that pool tasks queue options can be set', async () => {
689 const pool = new FixedThreadPool(
691 './tests/worker-files/thread/testWorker.mjs',
692 { enableTasksQueue: true }
694 expect(pool.opts.tasksQueueOptions).toStrictEqual({
696 size: Math.pow(numberOfWorkers, 2),
698 tasksStealingOnBackPressure: true,
699 tasksFinishedTimeout: 2000
701 for (const workerNode of pool.workerNodes) {
702 expect(workerNode.tasksQueueBackPressureSize).toBe(
703 pool.opts.tasksQueueOptions.size
706 pool.setTasksQueueOptions({
710 tasksStealingOnBackPressure: false,
711 tasksFinishedTimeout: 3000
713 expect(pool.opts.tasksQueueOptions).toStrictEqual({
717 tasksStealingOnBackPressure: false,
718 tasksFinishedTimeout: 3000
720 for (const workerNode of pool.workerNodes) {
721 expect(workerNode.tasksQueueBackPressureSize).toBe(
722 pool.opts.tasksQueueOptions.size
725 pool.setTasksQueueOptions({
728 tasksStealingOnBackPressure: true
730 expect(pool.opts.tasksQueueOptions).toStrictEqual({
732 size: Math.pow(numberOfWorkers, 2),
734 tasksStealingOnBackPressure: true,
735 tasksFinishedTimeout: 2000
737 for (const workerNode of pool.workerNodes) {
738 expect(workerNode.tasksQueueBackPressureSize).toBe(
739 pool.opts.tasksQueueOptions.size
742 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
743 new TypeError('Invalid tasks queue options: must be a plain object')
745 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
747 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
750 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
752 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
755 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
756 new TypeError('Invalid worker node tasks concurrency: must be an integer')
758 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
760 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
763 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
765 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
768 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
769 new TypeError('Invalid worker node tasks queue size: must be an integer')
774 it('Verify that pool info is set', async () => {
775 let pool = new FixedThreadPool(
777 './tests/worker-files/thread/testWorker.mjs'
779 expect(pool.info).toStrictEqual({
781 type: PoolTypes.fixed,
782 worker: WorkerTypes.thread,
785 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
786 minSize: numberOfWorkers,
787 maxSize: numberOfWorkers,
788 workerNodes: numberOfWorkers,
789 idleWorkerNodes: numberOfWorkers,
796 pool = new DynamicClusterPool(
797 Math.floor(numberOfWorkers / 2),
799 './tests/worker-files/cluster/testWorker.js'
801 expect(pool.info).toStrictEqual({
803 type: PoolTypes.dynamic,
804 worker: WorkerTypes.cluster,
807 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
808 minSize: Math.floor(numberOfWorkers / 2),
809 maxSize: numberOfWorkers,
810 workerNodes: Math.floor(numberOfWorkers / 2),
811 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
820 it('Verify that pool worker tasks usage are initialized', async () => {
821 const pool = new FixedClusterPool(
823 './tests/worker-files/cluster/testWorker.js'
825 for (const workerNode of pool.workerNodes) {
826 expect(workerNode).toBeInstanceOf(WorkerNode)
827 expect(workerNode.usage).toStrictEqual({
833 sequentiallyStolen: 0,
838 history: new CircularArray()
841 history: new CircularArray()
845 history: new CircularArray()
848 history: new CircularArray()
856 it('Verify that pool worker tasks queue are initialized', async () => {
857 let pool = new FixedClusterPool(
859 './tests/worker-files/cluster/testWorker.js'
861 for (const workerNode of pool.workerNodes) {
862 expect(workerNode).toBeInstanceOf(WorkerNode)
863 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
864 expect(workerNode.tasksQueue.size).toBe(0)
865 expect(workerNode.tasksQueue.maxSize).toBe(0)
868 pool = new DynamicThreadPool(
869 Math.floor(numberOfWorkers / 2),
871 './tests/worker-files/thread/testWorker.mjs'
873 for (const workerNode of pool.workerNodes) {
874 expect(workerNode).toBeInstanceOf(WorkerNode)
875 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
876 expect(workerNode.tasksQueue.size).toBe(0)
877 expect(workerNode.tasksQueue.maxSize).toBe(0)
882 it('Verify that pool worker info are initialized', async () => {
883 let pool = new FixedClusterPool(
885 './tests/worker-files/cluster/testWorker.js'
887 for (const workerNode of pool.workerNodes) {
888 expect(workerNode).toBeInstanceOf(WorkerNode)
889 expect(workerNode.info).toStrictEqual({
890 id: expect.any(Number),
891 type: WorkerTypes.cluster,
897 pool = new DynamicThreadPool(
898 Math.floor(numberOfWorkers / 2),
900 './tests/worker-files/thread/testWorker.mjs'
902 for (const workerNode of pool.workerNodes) {
903 expect(workerNode).toBeInstanceOf(WorkerNode)
904 expect(workerNode.info).toStrictEqual({
905 id: expect.any(Number),
906 type: WorkerTypes.thread,
914 it('Verify that pool statuses are checked at start or destroy', async () => {
915 const pool = new FixedThreadPool(
917 './tests/worker-files/thread/testWorker.mjs'
919 expect(pool.info.started).toBe(true)
920 expect(pool.info.ready).toBe(true)
921 expect(() => pool.start()).toThrow(
922 new Error('Cannot start an already started pool')
925 expect(pool.info.started).toBe(false)
926 expect(pool.info.ready).toBe(false)
927 await expect(pool.destroy()).rejects.toThrow(
928 new Error('Cannot destroy an already destroyed pool')
932 it('Verify that pool can be started after initialization', async () => {
933 const pool = new FixedClusterPool(
935 './tests/worker-files/cluster/testWorker.js',
940 expect(pool.info.started).toBe(false)
941 expect(pool.info.ready).toBe(false)
942 expect(pool.readyEventEmitted).toBe(false)
943 expect(pool.workerNodes).toStrictEqual([])
944 await expect(pool.execute()).rejects.toThrow(
945 new Error('Cannot execute a task on not started pool')
948 expect(pool.info.started).toBe(true)
949 expect(pool.info.ready).toBe(true)
950 await waitPoolEvents(pool, PoolEvents.ready, 1)
951 expect(pool.readyEventEmitted).toBe(true)
952 expect(pool.workerNodes.length).toBe(numberOfWorkers)
953 for (const workerNode of pool.workerNodes) {
954 expect(workerNode).toBeInstanceOf(WorkerNode)
959 it('Verify that pool execute() arguments are checked', async () => {
960 const pool = new FixedClusterPool(
962 './tests/worker-files/cluster/testWorker.js'
964 await expect(pool.execute(undefined, 0)).rejects.toThrow(
965 new TypeError('name argument must be a string')
967 await expect(pool.execute(undefined, '')).rejects.toThrow(
968 new TypeError('name argument must not be an empty string')
970 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
971 new TypeError('transferList argument must be an array')
973 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
974 "Task function 'unknown' not found"
977 await expect(pool.execute()).rejects.toThrow(
978 new Error('Cannot execute a task on not started pool')
982 it('Verify that pool worker tasks usage are computed', async () => {
983 const pool = new FixedClusterPool(
985 './tests/worker-files/cluster/testWorker.js'
987 const promises = new Set()
988 const maxMultiplier = 2
989 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
990 promises.add(pool.execute())
992 for (const workerNode of pool.workerNodes) {
993 expect(workerNode.usage).toStrictEqual({
996 executing: maxMultiplier,
999 sequentiallyStolen: 0,
1004 history: expect.any(CircularArray)
1007 history: expect.any(CircularArray)
1011 history: expect.any(CircularArray)
1014 history: expect.any(CircularArray)
1019 await Promise.all(promises)
1020 for (const workerNode of pool.workerNodes) {
1021 expect(workerNode.usage).toStrictEqual({
1023 executed: maxMultiplier,
1027 sequentiallyStolen: 0,
1032 history: expect.any(CircularArray)
1035 history: expect.any(CircularArray)
1039 history: expect.any(CircularArray)
1042 history: expect.any(CircularArray)
1047 await pool.destroy()
1050 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1051 const pool = new DynamicThreadPool(
1052 Math.floor(numberOfWorkers / 2),
1054 './tests/worker-files/thread/testWorker.mjs'
1056 const promises = new Set()
1057 const maxMultiplier = 2
1058 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1059 promises.add(pool.execute())
1061 await Promise.all(promises)
1062 for (const workerNode of pool.workerNodes) {
1063 expect(workerNode.usage).toStrictEqual({
1065 executed: expect.any(Number),
1069 sequentiallyStolen: 0,
1074 history: expect.any(CircularArray)
1077 history: expect.any(CircularArray)
1081 history: expect.any(CircularArray)
1084 history: expect.any(CircularArray)
1088 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1089 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1090 numberOfWorkers * maxMultiplier
1092 expect(workerNode.usage.runTime.history.length).toBe(0)
1093 expect(workerNode.usage.waitTime.history.length).toBe(0)
1094 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1095 expect(workerNode.usage.elu.active.history.length).toBe(0)
1097 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1098 for (const workerNode of pool.workerNodes) {
1099 expect(workerNode.usage).toStrictEqual({
1105 sequentiallyStolen: 0,
1110 history: expect.any(CircularArray)
1113 history: expect.any(CircularArray)
1117 history: expect.any(CircularArray)
1120 history: expect.any(CircularArray)
1124 expect(workerNode.usage.runTime.history.length).toBe(0)
1125 expect(workerNode.usage.waitTime.history.length).toBe(0)
1126 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1127 expect(workerNode.usage.elu.active.history.length).toBe(0)
1129 await pool.destroy()
1132 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1133 const pool = new DynamicClusterPool(
1134 Math.floor(numberOfWorkers / 2),
1136 './tests/worker-files/cluster/testWorker.js'
1138 expect(pool.emitter.eventNames()).toStrictEqual([])
1141 pool.emitter.on(PoolEvents.ready, info => {
1145 await waitPoolEvents(pool, PoolEvents.ready, 1)
1146 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1147 expect(poolReady).toBe(1)
1148 expect(poolInfo).toStrictEqual({
1150 type: PoolTypes.dynamic,
1151 worker: WorkerTypes.cluster,
1154 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1155 minSize: expect.any(Number),
1156 maxSize: expect.any(Number),
1157 workerNodes: expect.any(Number),
1158 idleWorkerNodes: expect.any(Number),
1159 busyWorkerNodes: expect.any(Number),
1160 executedTasks: expect.any(Number),
1161 executingTasks: expect.any(Number),
1162 failedTasks: expect.any(Number)
1164 await pool.destroy()
1167 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1168 const pool = new FixedThreadPool(
1170 './tests/worker-files/thread/testWorker.mjs'
1172 expect(pool.emitter.eventNames()).toStrictEqual([])
1173 const promises = new Set()
1176 pool.emitter.on(PoolEvents.busy, info => {
1180 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1181 for (let i = 0; i < numberOfWorkers * 2; i++) {
1182 promises.add(pool.execute())
1184 await Promise.all(promises)
1185 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1186 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1187 expect(poolBusy).toBe(numberOfWorkers + 1)
1188 expect(poolInfo).toStrictEqual({
1190 type: PoolTypes.fixed,
1191 worker: WorkerTypes.thread,
1194 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1195 minSize: expect.any(Number),
1196 maxSize: expect.any(Number),
1197 workerNodes: expect.any(Number),
1198 idleWorkerNodes: expect.any(Number),
1199 busyWorkerNodes: expect.any(Number),
1200 executedTasks: expect.any(Number),
1201 executingTasks: expect.any(Number),
1202 failedTasks: expect.any(Number)
1204 await pool.destroy()
1207 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1208 const pool = new DynamicThreadPool(
1209 Math.floor(numberOfWorkers / 2),
1211 './tests/worker-files/thread/testWorker.mjs'
1213 expect(pool.emitter.eventNames()).toStrictEqual([])
1214 const promises = new Set()
1217 pool.emitter.on(PoolEvents.full, info => {
1221 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1222 for (let i = 0; i < numberOfWorkers * 2; i++) {
1223 promises.add(pool.execute())
1225 await Promise.all(promises)
1226 expect(poolFull).toBe(1)
1227 expect(poolInfo).toStrictEqual({
1229 type: PoolTypes.dynamic,
1230 worker: WorkerTypes.thread,
1233 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1234 minSize: expect.any(Number),
1235 maxSize: expect.any(Number),
1236 workerNodes: expect.any(Number),
1237 idleWorkerNodes: expect.any(Number),
1238 busyWorkerNodes: expect.any(Number),
1239 executedTasks: expect.any(Number),
1240 executingTasks: expect.any(Number),
1241 failedTasks: expect.any(Number)
1243 await pool.destroy()
1246 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1247 const pool = new FixedThreadPool(
1249 './tests/worker-files/thread/testWorker.mjs',
1251 enableTasksQueue: true
1254 stub(pool, 'hasBackPressure').returns(true)
1255 expect(pool.emitter.eventNames()).toStrictEqual([])
1256 const promises = new Set()
1257 let poolBackPressure = 0
1259 pool.emitter.on(PoolEvents.backPressure, info => {
1263 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1264 for (let i = 0; i < numberOfWorkers + 1; i++) {
1265 promises.add(pool.execute())
1267 await Promise.all(promises)
1268 expect(poolBackPressure).toBe(1)
1269 expect(poolInfo).toStrictEqual({
1271 type: PoolTypes.fixed,
1272 worker: WorkerTypes.thread,
1275 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1276 minSize: expect.any(Number),
1277 maxSize: expect.any(Number),
1278 workerNodes: expect.any(Number),
1279 idleWorkerNodes: expect.any(Number),
1280 busyWorkerNodes: expect.any(Number),
1281 executedTasks: expect.any(Number),
1282 executingTasks: expect.any(Number),
1283 maxQueuedTasks: expect.any(Number),
1284 queuedTasks: expect.any(Number),
1286 stolenTasks: expect.any(Number),
1287 failedTasks: expect.any(Number)
1289 expect(pool.hasBackPressure.callCount).toBe(5)
1290 await pool.destroy()
1293 it('Verify that destroy() waits for queued tasks to finish', async () => {
1294 const tasksFinishedTimeout = 2500
1295 const pool = new FixedThreadPool(
1297 './tests/worker-files/thread/asyncWorker.mjs',
1299 enableTasksQueue: true,
1300 tasksQueueOptions: { tasksFinishedTimeout }
1303 const maxMultiplier = 4
1304 let tasksFinished = 0
1305 for (const workerNode of pool.workerNodes) {
1306 workerNode.on('taskFinished', () => {
1310 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1313 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1314 const startTime = performance.now()
1315 await pool.destroy()
1316 const elapsedTime = performance.now() - startTime
1317 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1318 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1319 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1322 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1323 const tasksFinishedTimeout = 1000
1324 const pool = new FixedThreadPool(
1326 './tests/worker-files/thread/asyncWorker.mjs',
1328 enableTasksQueue: true,
1329 tasksQueueOptions: { tasksFinishedTimeout }
1332 const maxMultiplier = 4
1333 let tasksFinished = 0
1334 for (const workerNode of pool.workerNodes) {
1335 workerNode.on('taskFinished', () => {
1339 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1342 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1343 const startTime = performance.now()
1344 await pool.destroy()
1345 const elapsedTime = performance.now() - startTime
1346 expect(tasksFinished).toBe(0)
1347 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1350 it('Verify that pool asynchronous resource track tasks execution', async () => {
1355 let resolveCalls = 0
1356 const hook = createHook({
1357 init (asyncId, type) {
1358 if (type === 'poolifier:task') {
1360 taskAsyncId = asyncId
1364 if (asyncId === taskAsyncId) beforeCalls++
1367 if (asyncId === taskAsyncId) afterCalls++
1370 if (executionAsyncId() === taskAsyncId) resolveCalls++
1373 const pool = new FixedThreadPool(
1375 './tests/worker-files/thread/testWorker.mjs'
1378 await pool.execute()
1380 expect(initCalls).toBe(1)
1381 expect(beforeCalls).toBe(1)
1382 expect(afterCalls).toBe(1)
1383 expect(resolveCalls).toBe(1)
1384 await pool.destroy()
1387 it('Verify that hasTaskFunction() is working', async () => {
1388 const dynamicThreadPool = new DynamicThreadPool(
1389 Math.floor(numberOfWorkers / 2),
1391 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1393 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1394 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1395 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1398 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1399 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1400 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1401 await dynamicThreadPool.destroy()
1402 const fixedClusterPool = new FixedClusterPool(
1404 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1406 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1407 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1408 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1411 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1412 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1413 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1414 await fixedClusterPool.destroy()
1417 it('Verify that addTaskFunction() is working', async () => {
1418 const dynamicThreadPool = new DynamicThreadPool(
1419 Math.floor(numberOfWorkers / 2),
1421 './tests/worker-files/thread/testWorker.mjs'
1423 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1425 dynamicThreadPool.addTaskFunction(0, () => {})
1426 ).rejects.toThrow(new TypeError('name argument must be a string'))
1428 dynamicThreadPool.addTaskFunction('', () => {})
1430 new TypeError('name argument must not be an empty string')
1432 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1433 new TypeError('fn argument must be a function')
1435 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1436 new TypeError('fn argument must be a function')
1438 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1442 const echoTaskFunction = data => {
1446 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1447 ).resolves.toBe(true)
1448 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1449 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1452 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1457 const taskFunctionData = { test: 'test' }
1458 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1459 expect(echoResult).toStrictEqual(taskFunctionData)
1460 for (const workerNode of dynamicThreadPool.workerNodes) {
1461 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1463 executed: expect.any(Number),
1466 sequentiallyStolen: 0,
1471 history: new CircularArray()
1474 history: new CircularArray()
1478 history: new CircularArray()
1481 history: new CircularArray()
1486 await dynamicThreadPool.destroy()
1489 it('Verify that removeTaskFunction() is working', async () => {
1490 const dynamicThreadPool = new DynamicThreadPool(
1491 Math.floor(numberOfWorkers / 2),
1493 './tests/worker-files/thread/testWorker.mjs'
1495 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1496 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1500 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1501 new Error('Cannot remove a task function not handled on the pool side')
1503 const echoTaskFunction = data => {
1506 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1507 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1508 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1511 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1516 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1519 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1520 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1521 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1525 await dynamicThreadPool.destroy()
1528 it('Verify that listTaskFunctionNames() is working', async () => {
1529 const dynamicThreadPool = new DynamicThreadPool(
1530 Math.floor(numberOfWorkers / 2),
1532 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1534 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1535 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1537 'jsonIntegerSerialization',
1541 await dynamicThreadPool.destroy()
1542 const fixedClusterPool = new FixedClusterPool(
1544 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1546 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1547 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1549 'jsonIntegerSerialization',
1553 await fixedClusterPool.destroy()
1556 it('Verify that setDefaultTaskFunction() is working', async () => {
1557 const dynamicThreadPool = new DynamicThreadPool(
1558 Math.floor(numberOfWorkers / 2),
1560 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1562 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1563 const workerId = dynamicThreadPool.workerNodes[0].info.id
1564 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1566 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1570 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1573 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1577 dynamicThreadPool.setDefaultTaskFunction('unknown')
1580 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1583 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1585 'jsonIntegerSerialization',
1590 dynamicThreadPool.setDefaultTaskFunction('factorial')
1591 ).resolves.toBe(true)
1592 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1595 'jsonIntegerSerialization',
1599 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1600 ).resolves.toBe(true)
1601 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1604 'jsonIntegerSerialization',
1607 await dynamicThreadPool.destroy()
1610 it('Verify that multiple task functions worker is working', async () => {
1611 const pool = new DynamicClusterPool(
1612 Math.floor(numberOfWorkers / 2),
1614 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1616 const data = { n: 10 }
1617 const result0 = await pool.execute(data)
1618 expect(result0).toStrictEqual({ ok: 1 })
1619 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1620 expect(result1).toStrictEqual({ ok: 1 })
1621 const result2 = await pool.execute(data, 'factorial')
1622 expect(result2).toBe(3628800)
1623 const result3 = await pool.execute(data, 'fibonacci')
1624 expect(result3).toBe(55)
1625 expect(pool.info.executingTasks).toBe(0)
1626 expect(pool.info.executedTasks).toBe(4)
1627 for (const workerNode of pool.workerNodes) {
1628 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1630 'jsonIntegerSerialization',
1634 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1635 for (const name of pool.listTaskFunctionNames()) {
1636 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1638 executed: expect.any(Number),
1642 sequentiallyStolen: 0,
1646 history: expect.any(CircularArray)
1649 history: expect.any(CircularArray)
1653 history: expect.any(CircularArray)
1656 history: expect.any(CircularArray)
1661 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1662 ).toBeGreaterThan(0)
1665 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1667 workerNode.getTaskFunctionWorkerUsage(
1668 workerNode.info.taskFunctionNames[1]
1672 await pool.destroy()
1675 it('Verify sendKillMessageToWorker()', async () => {
1676 const pool = new DynamicClusterPool(
1677 Math.floor(numberOfWorkers / 2),
1679 './tests/worker-files/cluster/testWorker.js'
1681 const workerNodeKey = 0
1683 pool.sendKillMessageToWorker(workerNodeKey)
1684 ).resolves.toBeUndefined()
1686 pool.sendKillMessageToWorker(numberOfWorkers)
1687 ).rejects.toStrictEqual(
1688 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1690 await pool.destroy()
1693 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1694 const pool = new DynamicClusterPool(
1695 Math.floor(numberOfWorkers / 2),
1697 './tests/worker-files/cluster/testWorker.js'
1699 const workerNodeKey = 0
1701 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1702 taskFunctionOperation: 'add',
1703 taskFunctionName: 'empty',
1704 taskFunction: (() => {}).toString()
1706 ).resolves.toBe(true)
1708 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1709 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1710 await pool.destroy()
1713 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1714 const pool = new DynamicClusterPool(
1715 Math.floor(numberOfWorkers / 2),
1717 './tests/worker-files/cluster/testWorker.js'
1720 pool.sendTaskFunctionOperationToWorkers({
1721 taskFunctionOperation: 'add',
1722 taskFunctionName: 'empty',
1723 taskFunction: (() => {}).toString()
1725 ).resolves.toBe(true)
1726 for (const workerNode of pool.workerNodes) {
1727 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1733 await pool.destroy()