1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.js'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.js'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.js'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual({
248 Object.keys(workerChoiceStrategy.opts.weights).length,
249 runTime: { median: false },
250 waitTime: { median: false },
251 elu: { median: false },
252 weights: expect.objectContaining({
253 0: expect.any(Number),
254 [pool.info.maxSize - 1]: expect.any(Number)
259 const testHandler = () => console.info('test handler executed')
260 pool = new FixedThreadPool(
262 './tests/worker-files/thread/testWorker.mjs',
264 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
265 workerChoiceStrategyOptions: {
266 runTime: { median: true },
267 weights: { 0: 300, 1: 200 }
270 restartWorkerOnError: false,
271 enableTasksQueue: true,
272 tasksQueueOptions: { concurrency: 2 },
273 messageHandler: testHandler,
274 errorHandler: testHandler,
275 onlineHandler: testHandler,
276 exitHandler: testHandler
279 expect(pool.emitter).toBeUndefined()
280 expect(pool.opts).toStrictEqual({
283 restartWorkerOnError: false,
284 enableTasksQueue: true,
287 size: Math.pow(numberOfWorkers, 2),
289 tasksStealingOnBackPressure: true,
290 tasksFinishedTimeout: 2000
292 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
293 workerChoiceStrategyOptions: {
294 runTime: { median: true },
295 weights: { 0: 300, 1: 200 }
297 onlineHandler: testHandler,
298 messageHandler: testHandler,
299 errorHandler: testHandler,
300 exitHandler: testHandler
302 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
305 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
306 runTime: { median: true },
307 waitTime: { median: false },
308 elu: { median: false },
309 weights: { 0: 300, 1: 200 }
311 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
312 .workerChoiceStrategies) {
313 expect(workerChoiceStrategy.opts).toStrictEqual({
316 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
317 runTime: { median: true },
318 waitTime: { median: false },
319 elu: { median: false },
320 weights: { 0: 300, 1: 200 }
326 it('Verify that pool options are validated', () => {
331 './tests/worker-files/thread/testWorker.mjs',
333 workerChoiceStrategy: 'invalidStrategy'
336 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
341 './tests/worker-files/thread/testWorker.mjs',
343 workerChoiceStrategyOptions: { weights: {} }
348 'Invalid worker choice strategy options: must have a weight for each worker node'
355 './tests/worker-files/thread/testWorker.mjs',
357 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
362 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
369 './tests/worker-files/thread/testWorker.mjs',
371 enableTasksQueue: true,
372 tasksQueueOptions: 'invalidTasksQueueOptions'
376 new TypeError('Invalid tasks queue options: must be a plain object')
382 './tests/worker-files/thread/testWorker.mjs',
384 enableTasksQueue: true,
385 tasksQueueOptions: { concurrency: 0 }
390 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
397 './tests/worker-files/thread/testWorker.mjs',
399 enableTasksQueue: true,
400 tasksQueueOptions: { concurrency: -1 }
405 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
412 './tests/worker-files/thread/testWorker.mjs',
414 enableTasksQueue: true,
415 tasksQueueOptions: { concurrency: 0.2 }
419 new TypeError('Invalid worker node tasks concurrency: must be an integer')
425 './tests/worker-files/thread/testWorker.mjs',
427 enableTasksQueue: true,
428 tasksQueueOptions: { size: 0 }
433 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
440 './tests/worker-files/thread/testWorker.mjs',
442 enableTasksQueue: true,
443 tasksQueueOptions: { size: -1 }
448 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
455 './tests/worker-files/thread/testWorker.mjs',
457 enableTasksQueue: true,
458 tasksQueueOptions: { size: 0.2 }
462 new TypeError('Invalid worker node tasks queue size: must be an integer')
466 it('Verify that pool worker choice strategy options can be set', async () => {
467 const pool = new FixedThreadPool(
469 './tests/worker-files/thread/testWorker.mjs',
470 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
472 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
473 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
476 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
477 runTime: { median: false },
478 waitTime: { median: false },
479 elu: { median: false },
480 weights: expect.objectContaining({
481 0: expect.any(Number),
482 [pool.info.maxSize - 1]: expect.any(Number)
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
487 expect(workerChoiceStrategy.opts).toStrictEqual(
488 expect.objectContaining({
491 Object.keys(workerChoiceStrategy.opts.weights).length,
492 runTime: { median: false },
493 waitTime: { median: false },
494 elu: { median: false }
499 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
517 pool.setWorkerChoiceStrategyOptions({
518 runTime: { median: true },
519 elu: { median: true }
521 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
522 runTime: { median: true },
523 elu: { median: true }
525 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
528 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
529 runTime: { median: true },
530 waitTime: { median: false },
531 elu: { median: true },
532 weights: expect.objectContaining({
533 0: expect.any(Number),
534 [pool.info.maxSize - 1]: expect.any(Number)
537 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
538 .workerChoiceStrategies) {
539 expect(workerChoiceStrategy.opts).toStrictEqual(
540 expect.objectContaining({
543 Object.keys(workerChoiceStrategy.opts.weights).length,
544 runTime: { median: true },
545 waitTime: { median: false },
546 elu: { median: true }
551 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
569 pool.setWorkerChoiceStrategyOptions({
570 runTime: { median: false },
571 elu: { median: false }
573 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
574 runTime: { median: false },
575 elu: { median: false }
577 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
580 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
581 runTime: { median: false },
582 waitTime: { median: false },
583 elu: { median: false },
584 weights: expect.objectContaining({
585 0: expect.any(Number),
586 [pool.info.maxSize - 1]: expect.any(Number)
589 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
590 .workerChoiceStrategies) {
591 expect(workerChoiceStrategy.opts).toStrictEqual(
592 expect.objectContaining({
595 Object.keys(workerChoiceStrategy.opts.weights).length,
596 runTime: { median: false },
597 waitTime: { median: false },
598 elu: { median: false }
603 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
622 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
625 'Invalid worker choice strategy options: must be a plain object'
628 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
630 'Invalid worker choice strategy options: must have a weight for each worker node'
634 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
637 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
643 it('Verify that pool tasks queue can be enabled/disabled', async () => {
644 const pool = new FixedThreadPool(
646 './tests/worker-files/thread/testWorker.mjs'
648 expect(pool.opts.enableTasksQueue).toBe(false)
649 expect(pool.opts.tasksQueueOptions).toBeUndefined()
650 pool.enableTasksQueue(true)
651 expect(pool.opts.enableTasksQueue).toBe(true)
652 expect(pool.opts.tasksQueueOptions).toStrictEqual({
654 size: Math.pow(numberOfWorkers, 2),
656 tasksStealingOnBackPressure: true,
657 tasksFinishedTimeout: 2000
659 pool.enableTasksQueue(true, { concurrency: 2 })
660 expect(pool.opts.enableTasksQueue).toBe(true)
661 expect(pool.opts.tasksQueueOptions).toStrictEqual({
663 size: Math.pow(numberOfWorkers, 2),
665 tasksStealingOnBackPressure: true,
666 tasksFinishedTimeout: 2000
668 pool.enableTasksQueue(false)
669 expect(pool.opts.enableTasksQueue).toBe(false)
670 expect(pool.opts.tasksQueueOptions).toBeUndefined()
674 it('Verify that pool tasks queue options can be set', async () => {
675 const pool = new FixedThreadPool(
677 './tests/worker-files/thread/testWorker.mjs',
678 { enableTasksQueue: true }
680 expect(pool.opts.tasksQueueOptions).toStrictEqual({
682 size: Math.pow(numberOfWorkers, 2),
684 tasksStealingOnBackPressure: true,
685 tasksFinishedTimeout: 2000
687 for (const workerNode of pool.workerNodes) {
688 expect(workerNode.tasksQueueBackPressureSize).toBe(
689 pool.opts.tasksQueueOptions.size
692 pool.setTasksQueueOptions({
696 tasksStealingOnBackPressure: false,
697 tasksFinishedTimeout: 3000
699 expect(pool.opts.tasksQueueOptions).toStrictEqual({
703 tasksStealingOnBackPressure: false,
704 tasksFinishedTimeout: 3000
706 for (const workerNode of pool.workerNodes) {
707 expect(workerNode.tasksQueueBackPressureSize).toBe(
708 pool.opts.tasksQueueOptions.size
711 pool.setTasksQueueOptions({
714 tasksStealingOnBackPressure: true
716 expect(pool.opts.tasksQueueOptions).toStrictEqual({
718 size: Math.pow(numberOfWorkers, 2),
720 tasksStealingOnBackPressure: true,
721 tasksFinishedTimeout: 2000
723 for (const workerNode of pool.workerNodes) {
724 expect(workerNode.tasksQueueBackPressureSize).toBe(
725 pool.opts.tasksQueueOptions.size
728 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
729 new TypeError('Invalid tasks queue options: must be a plain object')
731 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
733 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
736 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
738 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
741 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
742 new TypeError('Invalid worker node tasks concurrency: must be an integer')
744 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
746 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
749 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
751 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
754 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
755 new TypeError('Invalid worker node tasks queue size: must be an integer')
760 it('Verify that pool info is set', async () => {
761 let pool = new FixedThreadPool(
763 './tests/worker-files/thread/testWorker.mjs'
765 expect(pool.info).toStrictEqual({
767 type: PoolTypes.fixed,
768 worker: WorkerTypes.thread,
771 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
772 minSize: numberOfWorkers,
773 maxSize: numberOfWorkers,
774 workerNodes: numberOfWorkers,
775 idleWorkerNodes: numberOfWorkers,
782 pool = new DynamicClusterPool(
783 Math.floor(numberOfWorkers / 2),
785 './tests/worker-files/cluster/testWorker.js'
787 expect(pool.info).toStrictEqual({
789 type: PoolTypes.dynamic,
790 worker: WorkerTypes.cluster,
793 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
794 minSize: Math.floor(numberOfWorkers / 2),
795 maxSize: numberOfWorkers,
796 workerNodes: Math.floor(numberOfWorkers / 2),
797 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
806 it('Verify that pool worker tasks usage are initialized', async () => {
807 const pool = new FixedClusterPool(
809 './tests/worker-files/cluster/testWorker.js'
811 for (const workerNode of pool.workerNodes) {
812 expect(workerNode).toBeInstanceOf(WorkerNode)
813 expect(workerNode.usage).toStrictEqual({
819 sequentiallyStolen: 0,
824 history: new CircularArray()
827 history: new CircularArray()
831 history: new CircularArray()
834 history: new CircularArray()
842 it('Verify that pool worker tasks queue are initialized', async () => {
843 let pool = new FixedClusterPool(
845 './tests/worker-files/cluster/testWorker.js'
847 for (const workerNode of pool.workerNodes) {
848 expect(workerNode).toBeInstanceOf(WorkerNode)
849 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
850 expect(workerNode.tasksQueue.size).toBe(0)
851 expect(workerNode.tasksQueue.maxSize).toBe(0)
854 pool = new DynamicThreadPool(
855 Math.floor(numberOfWorkers / 2),
857 './tests/worker-files/thread/testWorker.mjs'
859 for (const workerNode of pool.workerNodes) {
860 expect(workerNode).toBeInstanceOf(WorkerNode)
861 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
862 expect(workerNode.tasksQueue.size).toBe(0)
863 expect(workerNode.tasksQueue.maxSize).toBe(0)
868 it('Verify that pool worker info are initialized', async () => {
869 let pool = new FixedClusterPool(
871 './tests/worker-files/cluster/testWorker.js'
873 for (const workerNode of pool.workerNodes) {
874 expect(workerNode).toBeInstanceOf(WorkerNode)
875 expect(workerNode.info).toStrictEqual({
876 id: expect.any(Number),
877 type: WorkerTypes.cluster,
883 pool = new DynamicThreadPool(
884 Math.floor(numberOfWorkers / 2),
886 './tests/worker-files/thread/testWorker.mjs'
888 for (const workerNode of pool.workerNodes) {
889 expect(workerNode).toBeInstanceOf(WorkerNode)
890 expect(workerNode.info).toStrictEqual({
891 id: expect.any(Number),
892 type: WorkerTypes.thread,
900 it('Verify that pool statuses are checked at start or destroy', async () => {
901 const pool = new FixedThreadPool(
903 './tests/worker-files/thread/testWorker.mjs'
905 expect(pool.info.started).toBe(true)
906 expect(pool.info.ready).toBe(true)
907 expect(() => pool.start()).toThrow(
908 new Error('Cannot start an already started pool')
911 expect(pool.info.started).toBe(false)
912 expect(pool.info.ready).toBe(false)
913 await expect(pool.destroy()).rejects.toThrow(
914 new Error('Cannot destroy an already destroyed pool')
918 it('Verify that pool can be started after initialization', async () => {
919 const pool = new FixedClusterPool(
921 './tests/worker-files/cluster/testWorker.js',
926 expect(pool.info.started).toBe(false)
927 expect(pool.info.ready).toBe(false)
928 expect(pool.readyEventEmitted).toBe(false)
929 expect(pool.workerNodes).toStrictEqual([])
930 await expect(pool.execute()).rejects.toThrow(
931 new Error('Cannot execute a task on not started pool')
934 expect(pool.info.started).toBe(true)
935 expect(pool.info.ready).toBe(true)
936 await waitPoolEvents(pool, PoolEvents.ready, 1)
937 expect(pool.readyEventEmitted).toBe(true)
938 expect(pool.workerNodes.length).toBe(numberOfWorkers)
939 for (const workerNode of pool.workerNodes) {
940 expect(workerNode).toBeInstanceOf(WorkerNode)
945 it('Verify that pool execute() arguments are checked', async () => {
946 const pool = new FixedClusterPool(
948 './tests/worker-files/cluster/testWorker.js'
950 await expect(pool.execute(undefined, 0)).rejects.toThrow(
951 new TypeError('name argument must be a string')
953 await expect(pool.execute(undefined, '')).rejects.toThrow(
954 new TypeError('name argument must not be an empty string')
956 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
957 new TypeError('transferList argument must be an array')
959 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
960 "Task function 'unknown' not found"
963 await expect(pool.execute()).rejects.toThrow(
964 new Error('Cannot execute a task on not started pool')
968 it('Verify that pool worker tasks usage are computed', async () => {
969 const pool = new FixedClusterPool(
971 './tests/worker-files/cluster/testWorker.js'
973 const promises = new Set()
974 const maxMultiplier = 2
975 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
976 promises.add(pool.execute())
978 for (const workerNode of pool.workerNodes) {
979 expect(workerNode.usage).toStrictEqual({
982 executing: maxMultiplier,
985 sequentiallyStolen: 0,
990 history: expect.any(CircularArray)
993 history: expect.any(CircularArray)
997 history: expect.any(CircularArray)
1000 history: expect.any(CircularArray)
1005 await Promise.all(promises)
1006 for (const workerNode of pool.workerNodes) {
1007 expect(workerNode.usage).toStrictEqual({
1009 executed: maxMultiplier,
1013 sequentiallyStolen: 0,
1018 history: expect.any(CircularArray)
1021 history: expect.any(CircularArray)
1025 history: expect.any(CircularArray)
1028 history: expect.any(CircularArray)
1033 await pool.destroy()
1036 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1037 const pool = new DynamicThreadPool(
1038 Math.floor(numberOfWorkers / 2),
1040 './tests/worker-files/thread/testWorker.mjs'
1042 const promises = new Set()
1043 const maxMultiplier = 2
1044 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1045 promises.add(pool.execute())
1047 await Promise.all(promises)
1048 for (const workerNode of pool.workerNodes) {
1049 expect(workerNode.usage).toStrictEqual({
1051 executed: expect.any(Number),
1055 sequentiallyStolen: 0,
1060 history: expect.any(CircularArray)
1063 history: expect.any(CircularArray)
1067 history: expect.any(CircularArray)
1070 history: expect.any(CircularArray)
1074 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1075 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1076 numberOfWorkers * maxMultiplier
1078 expect(workerNode.usage.runTime.history.length).toBe(0)
1079 expect(workerNode.usage.waitTime.history.length).toBe(0)
1080 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1081 expect(workerNode.usage.elu.active.history.length).toBe(0)
1083 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1084 for (const workerNode of pool.workerNodes) {
1085 expect(workerNode.usage).toStrictEqual({
1091 sequentiallyStolen: 0,
1096 history: expect.any(CircularArray)
1099 history: expect.any(CircularArray)
1103 history: expect.any(CircularArray)
1106 history: expect.any(CircularArray)
1110 expect(workerNode.usage.runTime.history.length).toBe(0)
1111 expect(workerNode.usage.waitTime.history.length).toBe(0)
1112 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1113 expect(workerNode.usage.elu.active.history.length).toBe(0)
1115 await pool.destroy()
1118 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1119 const pool = new DynamicClusterPool(
1120 Math.floor(numberOfWorkers / 2),
1122 './tests/worker-files/cluster/testWorker.js'
1124 expect(pool.emitter.eventNames()).toStrictEqual([])
1127 pool.emitter.on(PoolEvents.ready, info => {
1131 await waitPoolEvents(pool, PoolEvents.ready, 1)
1132 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1133 expect(poolReady).toBe(1)
1134 expect(poolInfo).toStrictEqual({
1136 type: PoolTypes.dynamic,
1137 worker: WorkerTypes.cluster,
1140 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1141 minSize: expect.any(Number),
1142 maxSize: expect.any(Number),
1143 workerNodes: expect.any(Number),
1144 idleWorkerNodes: expect.any(Number),
1145 busyWorkerNodes: expect.any(Number),
1146 executedTasks: expect.any(Number),
1147 executingTasks: expect.any(Number),
1148 failedTasks: expect.any(Number)
1150 await pool.destroy()
1153 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1154 const pool = new FixedThreadPool(
1156 './tests/worker-files/thread/testWorker.mjs'
1158 expect(pool.emitter.eventNames()).toStrictEqual([])
1159 const promises = new Set()
1162 pool.emitter.on(PoolEvents.busy, info => {
1166 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1167 for (let i = 0; i < numberOfWorkers * 2; i++) {
1168 promises.add(pool.execute())
1170 await Promise.all(promises)
1171 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1172 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1173 expect(poolBusy).toBe(numberOfWorkers + 1)
1174 expect(poolInfo).toStrictEqual({
1176 type: PoolTypes.fixed,
1177 worker: WorkerTypes.thread,
1180 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1181 minSize: expect.any(Number),
1182 maxSize: expect.any(Number),
1183 workerNodes: expect.any(Number),
1184 idleWorkerNodes: expect.any(Number),
1185 busyWorkerNodes: expect.any(Number),
1186 executedTasks: expect.any(Number),
1187 executingTasks: expect.any(Number),
1188 failedTasks: expect.any(Number)
1190 await pool.destroy()
1193 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1194 const pool = new DynamicThreadPool(
1195 Math.floor(numberOfWorkers / 2),
1197 './tests/worker-files/thread/testWorker.mjs'
1199 expect(pool.emitter.eventNames()).toStrictEqual([])
1200 const promises = new Set()
1203 pool.emitter.on(PoolEvents.full, info => {
1207 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1208 for (let i = 0; i < numberOfWorkers * 2; i++) {
1209 promises.add(pool.execute())
1211 await Promise.all(promises)
1212 expect(poolFull).toBe(1)
1213 expect(poolInfo).toStrictEqual({
1215 type: PoolTypes.dynamic,
1216 worker: WorkerTypes.thread,
1219 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1220 minSize: expect.any(Number),
1221 maxSize: expect.any(Number),
1222 workerNodes: expect.any(Number),
1223 idleWorkerNodes: expect.any(Number),
1224 busyWorkerNodes: expect.any(Number),
1225 executedTasks: expect.any(Number),
1226 executingTasks: expect.any(Number),
1227 failedTasks: expect.any(Number)
1229 await pool.destroy()
1232 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1233 const pool = new FixedThreadPool(
1235 './tests/worker-files/thread/testWorker.mjs',
1237 enableTasksQueue: true
1240 stub(pool, 'hasBackPressure').returns(true)
1241 expect(pool.emitter.eventNames()).toStrictEqual([])
1242 const promises = new Set()
1243 let poolBackPressure = 0
1245 pool.emitter.on(PoolEvents.backPressure, info => {
1249 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1250 for (let i = 0; i < numberOfWorkers + 1; i++) {
1251 promises.add(pool.execute())
1253 await Promise.all(promises)
1254 expect(poolBackPressure).toBe(1)
1255 expect(poolInfo).toStrictEqual({
1257 type: PoolTypes.fixed,
1258 worker: WorkerTypes.thread,
1261 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1262 minSize: expect.any(Number),
1263 maxSize: expect.any(Number),
1264 workerNodes: expect.any(Number),
1265 idleWorkerNodes: expect.any(Number),
1266 busyWorkerNodes: expect.any(Number),
1267 executedTasks: expect.any(Number),
1268 executingTasks: expect.any(Number),
1269 maxQueuedTasks: expect.any(Number),
1270 queuedTasks: expect.any(Number),
1272 stolenTasks: expect.any(Number),
1273 failedTasks: expect.any(Number)
1275 expect(pool.hasBackPressure.callCount).toBe(5)
1276 await pool.destroy()
1279 it('Verify that destroy() waits for queued tasks to finish', async () => {
1280 const tasksFinishedTimeout = 2500
1281 const pool = new FixedThreadPool(
1283 './tests/worker-files/thread/asyncWorker.mjs',
1285 enableTasksQueue: true,
1286 tasksQueueOptions: { tasksFinishedTimeout }
1289 const maxMultiplier = 4
1290 let tasksFinished = 0
1291 for (const workerNode of pool.workerNodes) {
1292 workerNode.on('taskFinished', () => {
1296 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1299 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1300 const startTime = performance.now()
1301 await pool.destroy()
1302 const elapsedTime = performance.now() - startTime
1303 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1304 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1305 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1308 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1309 const tasksFinishedTimeout = 1000
1310 const pool = new FixedThreadPool(
1312 './tests/worker-files/thread/asyncWorker.mjs',
1314 enableTasksQueue: true,
1315 tasksQueueOptions: { tasksFinishedTimeout }
1318 const maxMultiplier = 4
1319 let tasksFinished = 0
1320 for (const workerNode of pool.workerNodes) {
1321 workerNode.on('taskFinished', () => {
1325 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1328 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1329 const startTime = performance.now()
1330 await pool.destroy()
1331 const elapsedTime = performance.now() - startTime
1332 expect(tasksFinished).toBe(0)
1333 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1336 it('Verify that pool asynchronous resource track tasks execution', async () => {
1341 let resolveCalls = 0
1342 const hook = createHook({
1343 init (asyncId, type) {
1344 if (type === 'poolifier:task') {
1346 taskAsyncId = asyncId
1350 if (asyncId === taskAsyncId) beforeCalls++
1353 if (asyncId === taskAsyncId) afterCalls++
1356 if (executionAsyncId() === taskAsyncId) resolveCalls++
1359 const pool = new FixedThreadPool(
1361 './tests/worker-files/thread/testWorker.mjs'
1364 await pool.execute()
1366 expect(initCalls).toBe(1)
1367 expect(beforeCalls).toBe(1)
1368 expect(afterCalls).toBe(1)
1369 expect(resolveCalls).toBe(1)
1370 await pool.destroy()
1373 it('Verify that hasTaskFunction() is working', async () => {
1374 const dynamicThreadPool = new DynamicThreadPool(
1375 Math.floor(numberOfWorkers / 2),
1377 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1379 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1380 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1381 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1384 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1385 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1386 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1387 await dynamicThreadPool.destroy()
1388 const fixedClusterPool = new FixedClusterPool(
1390 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1392 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1393 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1394 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1397 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1398 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1399 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1400 await fixedClusterPool.destroy()
1403 it('Verify that addTaskFunction() is working', async () => {
1404 const dynamicThreadPool = new DynamicThreadPool(
1405 Math.floor(numberOfWorkers / 2),
1407 './tests/worker-files/thread/testWorker.mjs'
1409 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1411 dynamicThreadPool.addTaskFunction(0, () => {})
1412 ).rejects.toThrow(new TypeError('name argument must be a string'))
1414 dynamicThreadPool.addTaskFunction('', () => {})
1416 new TypeError('name argument must not be an empty string')
1418 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1419 new TypeError('fn argument must be a function')
1421 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1422 new TypeError('fn argument must be a function')
1424 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1428 const echoTaskFunction = data => {
1432 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1433 ).resolves.toBe(true)
1434 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1435 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1438 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1443 const taskFunctionData = { test: 'test' }
1444 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1445 expect(echoResult).toStrictEqual(taskFunctionData)
1446 for (const workerNode of dynamicThreadPool.workerNodes) {
1447 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1449 executed: expect.any(Number),
1452 sequentiallyStolen: 0,
1457 history: new CircularArray()
1460 history: new CircularArray()
1464 history: new CircularArray()
1467 history: new CircularArray()
1472 await dynamicThreadPool.destroy()
1475 it('Verify that removeTaskFunction() is working', async () => {
1476 const dynamicThreadPool = new DynamicThreadPool(
1477 Math.floor(numberOfWorkers / 2),
1479 './tests/worker-files/thread/testWorker.mjs'
1481 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1482 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1486 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1487 new Error('Cannot remove a task function not handled on the pool side')
1489 const echoTaskFunction = data => {
1492 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1493 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1494 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1497 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1502 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1505 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1506 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1507 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1511 await dynamicThreadPool.destroy()
1514 it('Verify that listTaskFunctionNames() is working', async () => {
1515 const dynamicThreadPool = new DynamicThreadPool(
1516 Math.floor(numberOfWorkers / 2),
1518 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1520 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1521 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1523 'jsonIntegerSerialization',
1527 await dynamicThreadPool.destroy()
1528 const fixedClusterPool = new FixedClusterPool(
1530 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1532 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1533 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1535 'jsonIntegerSerialization',
1539 await fixedClusterPool.destroy()
1542 it('Verify that setDefaultTaskFunction() is working', async () => {
1543 const dynamicThreadPool = new DynamicThreadPool(
1544 Math.floor(numberOfWorkers / 2),
1546 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1548 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1549 const workerId = dynamicThreadPool.workerNodes[0].info.id
1550 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1552 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1556 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1559 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1563 dynamicThreadPool.setDefaultTaskFunction('unknown')
1566 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1569 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1571 'jsonIntegerSerialization',
1576 dynamicThreadPool.setDefaultTaskFunction('factorial')
1577 ).resolves.toBe(true)
1578 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1581 'jsonIntegerSerialization',
1585 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1586 ).resolves.toBe(true)
1587 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1590 'jsonIntegerSerialization',
1593 await dynamicThreadPool.destroy()
1596 it('Verify that multiple task functions worker is working', async () => {
1597 const pool = new DynamicClusterPool(
1598 Math.floor(numberOfWorkers / 2),
1600 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1602 const data = { n: 10 }
1603 const result0 = await pool.execute(data)
1604 expect(result0).toStrictEqual({ ok: 1 })
1605 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1606 expect(result1).toStrictEqual({ ok: 1 })
1607 const result2 = await pool.execute(data, 'factorial')
1608 expect(result2).toBe(3628800)
1609 const result3 = await pool.execute(data, 'fibonacci')
1610 expect(result3).toBe(55)
1611 expect(pool.info.executingTasks).toBe(0)
1612 expect(pool.info.executedTasks).toBe(4)
1613 for (const workerNode of pool.workerNodes) {
1614 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1616 'jsonIntegerSerialization',
1620 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1621 for (const name of pool.listTaskFunctionNames()) {
1622 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1624 executed: expect.any(Number),
1628 sequentiallyStolen: 0,
1632 history: expect.any(CircularArray)
1635 history: expect.any(CircularArray)
1639 history: expect.any(CircularArray)
1642 history: expect.any(CircularArray)
1647 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1648 ).toBeGreaterThan(0)
1651 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1653 workerNode.getTaskFunctionWorkerUsage(
1654 workerNode.info.taskFunctionNames[1]
1658 await pool.destroy()
1661 it('Verify sendKillMessageToWorker()', async () => {
1662 const pool = new DynamicClusterPool(
1663 Math.floor(numberOfWorkers / 2),
1665 './tests/worker-files/cluster/testWorker.js'
1667 const workerNodeKey = 0
1669 pool.sendKillMessageToWorker(workerNodeKey)
1670 ).resolves.toBeUndefined()
1672 pool.sendKillMessageToWorker(numberOfWorkers)
1673 ).rejects.toStrictEqual(
1674 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1676 await pool.destroy()
1679 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1680 const pool = new DynamicClusterPool(
1681 Math.floor(numberOfWorkers / 2),
1683 './tests/worker-files/cluster/testWorker.js'
1685 const workerNodeKey = 0
1687 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1688 taskFunctionOperation: 'add',
1689 taskFunctionName: 'empty',
1690 taskFunction: (() => {}).toString()
1692 ).resolves.toBe(true)
1694 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1695 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1696 await pool.destroy()
1699 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1700 const pool = new DynamicClusterPool(
1701 Math.floor(numberOfWorkers / 2),
1703 './tests/worker-files/cluster/testWorker.js'
1706 pool.sendTaskFunctionOperationToWorkers({
1707 taskFunctionOperation: 'add',
1708 taskFunctionName: 'empty',
1709 taskFunction: (() => {}).toString()
1711 ).resolves.toBe(true)
1712 for (const workerNode of pool.workerNodes) {
1713 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1719 await pool.destroy()