1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.js'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.js'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.js'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual({
248 Object.keys(workerChoiceStrategy.opts.weights).length,
249 runTime: { median: false },
250 waitTime: { median: false },
251 elu: { median: false },
252 weights: expect.objectContaining({
253 0: expect.any(Number),
254 [pool.info.maxSize - 1]: expect.any(Number)
259 const testHandler = () => console.info('test handler executed')
260 pool = new FixedThreadPool(
262 './tests/worker-files/thread/testWorker.mjs',
264 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
265 workerChoiceStrategyOptions: {
266 runTime: { median: true },
267 weights: { 0: 300, 1: 200 }
270 restartWorkerOnError: false,
271 enableTasksQueue: true,
272 tasksQueueOptions: { concurrency: 2 },
273 messageHandler: testHandler,
274 errorHandler: testHandler,
275 onlineHandler: testHandler,
276 exitHandler: testHandler
279 expect(pool.emitter).toBeUndefined()
280 expect(pool.opts).toStrictEqual({
283 restartWorkerOnError: false,
284 enableTasksQueue: true,
287 size: Math.pow(numberOfWorkers, 2),
289 tasksStealingOnBackPressure: true,
290 tasksFinishedTimeout: 2000
292 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
293 workerChoiceStrategyOptions: {
294 runTime: { median: true },
295 weights: { 0: 300, 1: 200 }
297 onlineHandler: testHandler,
298 messageHandler: testHandler,
299 errorHandler: testHandler,
300 exitHandler: testHandler
302 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
305 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
306 runTime: { median: true },
307 waitTime: { median: false },
308 elu: { median: false },
309 weights: { 0: 300, 1: 200 }
311 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
312 .workerChoiceStrategies) {
313 expect(workerChoiceStrategy.opts).toStrictEqual({
316 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
317 runTime: { median: true },
318 waitTime: { median: false },
319 elu: { median: false },
320 weights: { 0: 300, 1: 200 }
326 it('Verify that pool options are validated', () => {
331 './tests/worker-files/thread/testWorker.mjs',
333 workerChoiceStrategy: 'invalidStrategy'
336 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
341 './tests/worker-files/thread/testWorker.mjs',
343 workerChoiceStrategyOptions: { weights: {} }
348 'Invalid worker choice strategy options: must have a weight for each worker node'
355 './tests/worker-files/thread/testWorker.mjs',
357 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
362 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
369 './tests/worker-files/thread/testWorker.mjs',
371 enableTasksQueue: true,
372 tasksQueueOptions: 'invalidTasksQueueOptions'
376 new TypeError('Invalid tasks queue options: must be a plain object')
382 './tests/worker-files/thread/testWorker.mjs',
384 enableTasksQueue: true,
385 tasksQueueOptions: { concurrency: 0 }
390 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
397 './tests/worker-files/thread/testWorker.mjs',
399 enableTasksQueue: true,
400 tasksQueueOptions: { concurrency: -1 }
405 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
412 './tests/worker-files/thread/testWorker.mjs',
414 enableTasksQueue: true,
415 tasksQueueOptions: { concurrency: 0.2 }
419 new TypeError('Invalid worker node tasks concurrency: must be an integer')
425 './tests/worker-files/thread/testWorker.mjs',
427 enableTasksQueue: true,
428 tasksQueueOptions: { size: 0 }
433 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
440 './tests/worker-files/thread/testWorker.mjs',
442 enableTasksQueue: true,
443 tasksQueueOptions: { size: -1 }
448 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
455 './tests/worker-files/thread/testWorker.mjs',
457 enableTasksQueue: true,
458 tasksQueueOptions: { size: 0.2 }
462 new TypeError('Invalid worker node tasks queue size: must be an integer')
466 it('Verify that pool worker choice strategy options can be set', async () => {
467 const pool = new FixedThreadPool(
469 './tests/worker-files/thread/testWorker.mjs',
470 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
472 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
473 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
476 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
477 runTime: { median: false },
478 waitTime: { median: false },
479 elu: { median: false },
480 weights: expect.objectContaining({
481 0: expect.any(Number),
482 [pool.info.maxSize - 1]: expect.any(Number)
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
487 expect(workerChoiceStrategy.opts).toStrictEqual({
490 Object.keys(workerChoiceStrategy.opts.weights).length,
491 runTime: { median: false },
492 waitTime: { median: false },
493 elu: { median: false },
494 weights: expect.objectContaining({
495 0: expect.any(Number),
496 [pool.info.maxSize - 1]: expect.any(Number)
501 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
519 pool.setWorkerChoiceStrategyOptions({
520 runTime: { median: true },
521 elu: { median: true }
523 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
524 runTime: { median: true },
525 elu: { median: true }
527 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
530 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
531 runTime: { median: true },
532 waitTime: { median: false },
533 elu: { median: true },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
539 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
540 .workerChoiceStrategies) {
541 expect(workerChoiceStrategy.opts).toStrictEqual({
544 Object.keys(workerChoiceStrategy.opts.weights).length,
545 runTime: { median: true },
546 waitTime: { median: false },
547 elu: { median: true },
548 weights: expect.objectContaining({
549 0: expect.any(Number),
550 [pool.info.maxSize - 1]: expect.any(Number)
555 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
573 pool.setWorkerChoiceStrategyOptions({
574 runTime: { median: false },
575 elu: { median: false }
577 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
578 runTime: { median: false },
579 elu: { median: false }
581 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
584 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
585 runTime: { median: false },
586 waitTime: { median: false },
587 elu: { median: false },
588 weights: expect.objectContaining({
589 0: expect.any(Number),
590 [pool.info.maxSize - 1]: expect.any(Number)
593 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
594 .workerChoiceStrategies) {
595 expect(workerChoiceStrategy.opts).toStrictEqual({
598 Object.keys(workerChoiceStrategy.opts.weights).length,
599 runTime: { median: false },
600 waitTime: { median: false },
601 elu: { median: false },
602 weights: expect.objectContaining({
603 0: expect.any(Number),
604 [pool.info.maxSize - 1]: expect.any(Number)
609 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
628 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
631 'Invalid worker choice strategy options: must be a plain object'
634 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
636 'Invalid worker choice strategy options: must have a weight for each worker node'
640 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
643 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
649 it('Verify that pool tasks queue can be enabled/disabled', async () => {
650 const pool = new FixedThreadPool(
652 './tests/worker-files/thread/testWorker.mjs'
654 expect(pool.opts.enableTasksQueue).toBe(false)
655 expect(pool.opts.tasksQueueOptions).toBeUndefined()
656 pool.enableTasksQueue(true)
657 expect(pool.opts.enableTasksQueue).toBe(true)
658 expect(pool.opts.tasksQueueOptions).toStrictEqual({
660 size: Math.pow(numberOfWorkers, 2),
662 tasksStealingOnBackPressure: true,
663 tasksFinishedTimeout: 2000
665 pool.enableTasksQueue(true, { concurrency: 2 })
666 expect(pool.opts.enableTasksQueue).toBe(true)
667 expect(pool.opts.tasksQueueOptions).toStrictEqual({
669 size: Math.pow(numberOfWorkers, 2),
671 tasksStealingOnBackPressure: true,
672 tasksFinishedTimeout: 2000
674 pool.enableTasksQueue(false)
675 expect(pool.opts.enableTasksQueue).toBe(false)
676 expect(pool.opts.tasksQueueOptions).toBeUndefined()
680 it('Verify that pool tasks queue options can be set', async () => {
681 const pool = new FixedThreadPool(
683 './tests/worker-files/thread/testWorker.mjs',
684 { enableTasksQueue: true }
686 expect(pool.opts.tasksQueueOptions).toStrictEqual({
688 size: Math.pow(numberOfWorkers, 2),
690 tasksStealingOnBackPressure: true,
691 tasksFinishedTimeout: 2000
693 for (const workerNode of pool.workerNodes) {
694 expect(workerNode.tasksQueueBackPressureSize).toBe(
695 pool.opts.tasksQueueOptions.size
698 pool.setTasksQueueOptions({
702 tasksStealingOnBackPressure: false,
703 tasksFinishedTimeout: 3000
705 expect(pool.opts.tasksQueueOptions).toStrictEqual({
709 tasksStealingOnBackPressure: false,
710 tasksFinishedTimeout: 3000
712 for (const workerNode of pool.workerNodes) {
713 expect(workerNode.tasksQueueBackPressureSize).toBe(
714 pool.opts.tasksQueueOptions.size
717 pool.setTasksQueueOptions({
720 tasksStealingOnBackPressure: true
722 expect(pool.opts.tasksQueueOptions).toStrictEqual({
724 size: Math.pow(numberOfWorkers, 2),
726 tasksStealingOnBackPressure: true,
727 tasksFinishedTimeout: 2000
729 for (const workerNode of pool.workerNodes) {
730 expect(workerNode.tasksQueueBackPressureSize).toBe(
731 pool.opts.tasksQueueOptions.size
734 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
735 new TypeError('Invalid tasks queue options: must be a plain object')
737 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
739 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
742 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
744 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
747 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
748 new TypeError('Invalid worker node tasks concurrency: must be an integer')
750 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
752 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
755 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
757 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
760 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
761 new TypeError('Invalid worker node tasks queue size: must be an integer')
766 it('Verify that pool info is set', async () => {
767 let pool = new FixedThreadPool(
769 './tests/worker-files/thread/testWorker.mjs'
771 expect(pool.info).toStrictEqual({
773 type: PoolTypes.fixed,
774 worker: WorkerTypes.thread,
777 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
778 minSize: numberOfWorkers,
779 maxSize: numberOfWorkers,
780 workerNodes: numberOfWorkers,
781 idleWorkerNodes: numberOfWorkers,
788 pool = new DynamicClusterPool(
789 Math.floor(numberOfWorkers / 2),
791 './tests/worker-files/cluster/testWorker.js'
793 expect(pool.info).toStrictEqual({
795 type: PoolTypes.dynamic,
796 worker: WorkerTypes.cluster,
799 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
800 minSize: Math.floor(numberOfWorkers / 2),
801 maxSize: numberOfWorkers,
802 workerNodes: Math.floor(numberOfWorkers / 2),
803 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
812 it('Verify that pool worker tasks usage are initialized', async () => {
813 const pool = new FixedClusterPool(
815 './tests/worker-files/cluster/testWorker.js'
817 for (const workerNode of pool.workerNodes) {
818 expect(workerNode).toBeInstanceOf(WorkerNode)
819 expect(workerNode.usage).toStrictEqual({
825 sequentiallyStolen: 0,
830 history: new CircularArray()
833 history: new CircularArray()
837 history: new CircularArray()
840 history: new CircularArray()
848 it('Verify that pool worker tasks queue are initialized', async () => {
849 let pool = new FixedClusterPool(
851 './tests/worker-files/cluster/testWorker.js'
853 for (const workerNode of pool.workerNodes) {
854 expect(workerNode).toBeInstanceOf(WorkerNode)
855 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
856 expect(workerNode.tasksQueue.size).toBe(0)
857 expect(workerNode.tasksQueue.maxSize).toBe(0)
860 pool = new DynamicThreadPool(
861 Math.floor(numberOfWorkers / 2),
863 './tests/worker-files/thread/testWorker.mjs'
865 for (const workerNode of pool.workerNodes) {
866 expect(workerNode).toBeInstanceOf(WorkerNode)
867 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
868 expect(workerNode.tasksQueue.size).toBe(0)
869 expect(workerNode.tasksQueue.maxSize).toBe(0)
874 it('Verify that pool worker info are initialized', async () => {
875 let pool = new FixedClusterPool(
877 './tests/worker-files/cluster/testWorker.js'
879 for (const workerNode of pool.workerNodes) {
880 expect(workerNode).toBeInstanceOf(WorkerNode)
881 expect(workerNode.info).toStrictEqual({
882 id: expect.any(Number),
883 type: WorkerTypes.cluster,
890 pool = new DynamicThreadPool(
891 Math.floor(numberOfWorkers / 2),
893 './tests/worker-files/thread/testWorker.mjs'
895 for (const workerNode of pool.workerNodes) {
896 expect(workerNode).toBeInstanceOf(WorkerNode)
897 expect(workerNode.info).toStrictEqual({
898 id: expect.any(Number),
899 type: WorkerTypes.thread,
908 it('Verify that pool statuses are checked at start or destroy', async () => {
909 const pool = new FixedThreadPool(
911 './tests/worker-files/thread/testWorker.mjs'
913 expect(pool.info.started).toBe(true)
914 expect(pool.info.ready).toBe(true)
915 expect(() => pool.start()).toThrow(
916 new Error('Cannot start an already started pool')
919 expect(pool.info.started).toBe(false)
920 expect(pool.info.ready).toBe(false)
921 await expect(pool.destroy()).rejects.toThrow(
922 new Error('Cannot destroy an already destroyed pool')
926 it('Verify that pool can be started after initialization', async () => {
927 const pool = new FixedClusterPool(
929 './tests/worker-files/cluster/testWorker.js',
934 expect(pool.info.started).toBe(false)
935 expect(pool.info.ready).toBe(false)
936 expect(pool.readyEventEmitted).toBe(false)
937 expect(pool.workerNodes).toStrictEqual([])
938 await expect(pool.execute()).rejects.toThrow(
939 new Error('Cannot execute a task on not started pool')
942 expect(pool.info.started).toBe(true)
943 expect(pool.info.ready).toBe(true)
944 await waitPoolEvents(pool, PoolEvents.ready, 1)
945 expect(pool.readyEventEmitted).toBe(true)
946 expect(pool.workerNodes.length).toBe(numberOfWorkers)
947 for (const workerNode of pool.workerNodes) {
948 expect(workerNode).toBeInstanceOf(WorkerNode)
953 it('Verify that pool execute() arguments are checked', async () => {
954 const pool = new FixedClusterPool(
956 './tests/worker-files/cluster/testWorker.js'
958 await expect(pool.execute(undefined, 0)).rejects.toThrow(
959 new TypeError('name argument must be a string')
961 await expect(pool.execute(undefined, '')).rejects.toThrow(
962 new TypeError('name argument must not be an empty string')
964 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
965 new TypeError('transferList argument must be an array')
967 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
968 "Task function 'unknown' not found"
971 await expect(pool.execute()).rejects.toThrow(
972 new Error('Cannot execute a task on not started pool')
976 it('Verify that pool worker tasks usage are computed', async () => {
977 const pool = new FixedClusterPool(
979 './tests/worker-files/cluster/testWorker.js'
981 const promises = new Set()
982 const maxMultiplier = 2
983 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
984 promises.add(pool.execute())
986 for (const workerNode of pool.workerNodes) {
987 expect(workerNode.usage).toStrictEqual({
990 executing: maxMultiplier,
993 sequentiallyStolen: 0,
998 history: expect.any(CircularArray)
1001 history: expect.any(CircularArray)
1005 history: expect.any(CircularArray)
1008 history: expect.any(CircularArray)
1013 await Promise.all(promises)
1014 for (const workerNode of pool.workerNodes) {
1015 expect(workerNode.usage).toStrictEqual({
1017 executed: maxMultiplier,
1021 sequentiallyStolen: 0,
1026 history: expect.any(CircularArray)
1029 history: expect.any(CircularArray)
1033 history: expect.any(CircularArray)
1036 history: expect.any(CircularArray)
1041 await pool.destroy()
1044 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1045 const pool = new DynamicThreadPool(
1046 Math.floor(numberOfWorkers / 2),
1048 './tests/worker-files/thread/testWorker.mjs'
1050 const promises = new Set()
1051 const maxMultiplier = 2
1052 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1053 promises.add(pool.execute())
1055 await Promise.all(promises)
1056 for (const workerNode of pool.workerNodes) {
1057 expect(workerNode.usage).toStrictEqual({
1059 executed: expect.any(Number),
1063 sequentiallyStolen: 0,
1068 history: expect.any(CircularArray)
1071 history: expect.any(CircularArray)
1075 history: expect.any(CircularArray)
1078 history: expect.any(CircularArray)
1082 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1083 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1084 numberOfWorkers * maxMultiplier
1086 expect(workerNode.usage.runTime.history.length).toBe(0)
1087 expect(workerNode.usage.waitTime.history.length).toBe(0)
1088 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1089 expect(workerNode.usage.elu.active.history.length).toBe(0)
1091 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1092 for (const workerNode of pool.workerNodes) {
1093 expect(workerNode.usage).toStrictEqual({
1099 sequentiallyStolen: 0,
1104 history: expect.any(CircularArray)
1107 history: expect.any(CircularArray)
1111 history: expect.any(CircularArray)
1114 history: expect.any(CircularArray)
1118 expect(workerNode.usage.runTime.history.length).toBe(0)
1119 expect(workerNode.usage.waitTime.history.length).toBe(0)
1120 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1121 expect(workerNode.usage.elu.active.history.length).toBe(0)
1123 await pool.destroy()
1126 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1127 const pool = new DynamicClusterPool(
1128 Math.floor(numberOfWorkers / 2),
1130 './tests/worker-files/cluster/testWorker.js'
1132 expect(pool.emitter.eventNames()).toStrictEqual([])
1135 pool.emitter.on(PoolEvents.ready, info => {
1139 await waitPoolEvents(pool, PoolEvents.ready, 1)
1140 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1141 expect(poolReady).toBe(1)
1142 expect(poolInfo).toStrictEqual({
1144 type: PoolTypes.dynamic,
1145 worker: WorkerTypes.cluster,
1148 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1149 minSize: expect.any(Number),
1150 maxSize: expect.any(Number),
1151 workerNodes: expect.any(Number),
1152 idleWorkerNodes: expect.any(Number),
1153 busyWorkerNodes: expect.any(Number),
1154 executedTasks: expect.any(Number),
1155 executingTasks: expect.any(Number),
1156 failedTasks: expect.any(Number)
1158 await pool.destroy()
1161 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1162 const pool = new FixedThreadPool(
1164 './tests/worker-files/thread/testWorker.mjs'
1166 expect(pool.emitter.eventNames()).toStrictEqual([])
1167 const promises = new Set()
1170 pool.emitter.on(PoolEvents.busy, info => {
1174 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1175 for (let i = 0; i < numberOfWorkers * 2; i++) {
1176 promises.add(pool.execute())
1178 await Promise.all(promises)
1179 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1180 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1181 expect(poolBusy).toBe(numberOfWorkers + 1)
1182 expect(poolInfo).toStrictEqual({
1184 type: PoolTypes.fixed,
1185 worker: WorkerTypes.thread,
1188 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1189 minSize: expect.any(Number),
1190 maxSize: expect.any(Number),
1191 workerNodes: expect.any(Number),
1192 idleWorkerNodes: expect.any(Number),
1193 busyWorkerNodes: expect.any(Number),
1194 executedTasks: expect.any(Number),
1195 executingTasks: expect.any(Number),
1196 failedTasks: expect.any(Number)
1198 await pool.destroy()
1201 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1202 const pool = new DynamicThreadPool(
1203 Math.floor(numberOfWorkers / 2),
1205 './tests/worker-files/thread/testWorker.mjs'
1207 expect(pool.emitter.eventNames()).toStrictEqual([])
1208 const promises = new Set()
1211 pool.emitter.on(PoolEvents.full, info => {
1215 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1216 for (let i = 0; i < numberOfWorkers * 2; i++) {
1217 promises.add(pool.execute())
1219 await Promise.all(promises)
1220 expect(poolFull).toBe(1)
1221 expect(poolInfo).toStrictEqual({
1223 type: PoolTypes.dynamic,
1224 worker: WorkerTypes.thread,
1227 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1228 minSize: expect.any(Number),
1229 maxSize: expect.any(Number),
1230 workerNodes: expect.any(Number),
1231 idleWorkerNodes: expect.any(Number),
1232 busyWorkerNodes: expect.any(Number),
1233 executedTasks: expect.any(Number),
1234 executingTasks: expect.any(Number),
1235 failedTasks: expect.any(Number)
1237 await pool.destroy()
1240 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1241 const pool = new FixedThreadPool(
1243 './tests/worker-files/thread/testWorker.mjs',
1245 enableTasksQueue: true
1248 stub(pool, 'hasBackPressure').returns(true)
1249 expect(pool.emitter.eventNames()).toStrictEqual([])
1250 const promises = new Set()
1251 let poolBackPressure = 0
1253 pool.emitter.on(PoolEvents.backPressure, info => {
1257 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1258 for (let i = 0; i < numberOfWorkers + 1; i++) {
1259 promises.add(pool.execute())
1261 await Promise.all(promises)
1262 expect(poolBackPressure).toBe(1)
1263 expect(poolInfo).toStrictEqual({
1265 type: PoolTypes.fixed,
1266 worker: WorkerTypes.thread,
1269 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1270 minSize: expect.any(Number),
1271 maxSize: expect.any(Number),
1272 workerNodes: expect.any(Number),
1273 idleWorkerNodes: expect.any(Number),
1274 stealingWorkerNodes: expect.any(Number),
1275 busyWorkerNodes: expect.any(Number),
1276 executedTasks: expect.any(Number),
1277 executingTasks: expect.any(Number),
1278 maxQueuedTasks: expect.any(Number),
1279 queuedTasks: expect.any(Number),
1281 stolenTasks: expect.any(Number),
1282 failedTasks: expect.any(Number)
1284 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1285 await pool.destroy()
1288 it('Verify that destroy() waits for queued tasks to finish', async () => {
1289 const tasksFinishedTimeout = 2500
1290 const pool = new FixedThreadPool(
1292 './tests/worker-files/thread/asyncWorker.mjs',
1294 enableTasksQueue: true,
1295 tasksQueueOptions: { tasksFinishedTimeout }
1298 const maxMultiplier = 4
1299 let tasksFinished = 0
1300 for (const workerNode of pool.workerNodes) {
1301 workerNode.on('taskFinished', () => {
1305 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1308 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1309 const startTime = performance.now()
1310 await pool.destroy()
1311 const elapsedTime = performance.now() - startTime
1312 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1313 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1314 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 400)
1317 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1318 const tasksFinishedTimeout = 1000
1319 const pool = new FixedThreadPool(
1321 './tests/worker-files/thread/asyncWorker.mjs',
1323 enableTasksQueue: true,
1324 tasksQueueOptions: { tasksFinishedTimeout }
1327 const maxMultiplier = 4
1328 let tasksFinished = 0
1329 for (const workerNode of pool.workerNodes) {
1330 workerNode.on('taskFinished', () => {
1334 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1337 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1338 const startTime = performance.now()
1339 await pool.destroy()
1340 const elapsedTime = performance.now() - startTime
1341 expect(tasksFinished).toBe(0)
1342 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1345 it('Verify that pool asynchronous resource track tasks execution', async () => {
1350 let resolveCalls = 0
1351 const hook = createHook({
1352 init (asyncId, type) {
1353 if (type === 'poolifier:task') {
1355 taskAsyncId = asyncId
1359 if (asyncId === taskAsyncId) beforeCalls++
1362 if (asyncId === taskAsyncId) afterCalls++
1365 if (executionAsyncId() === taskAsyncId) resolveCalls++
1368 const pool = new FixedThreadPool(
1370 './tests/worker-files/thread/testWorker.mjs'
1373 await pool.execute()
1375 expect(initCalls).toBe(1)
1376 expect(beforeCalls).toBe(1)
1377 expect(afterCalls).toBe(1)
1378 expect(resolveCalls).toBe(1)
1379 await pool.destroy()
1382 it('Verify that hasTaskFunction() is working', async () => {
1383 const dynamicThreadPool = new DynamicThreadPool(
1384 Math.floor(numberOfWorkers / 2),
1386 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1388 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1389 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1390 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1393 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1394 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1395 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1396 await dynamicThreadPool.destroy()
1397 const fixedClusterPool = new FixedClusterPool(
1399 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1401 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1402 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1403 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1406 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1407 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1408 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1409 await fixedClusterPool.destroy()
1412 it('Verify that addTaskFunction() is working', async () => {
1413 const dynamicThreadPool = new DynamicThreadPool(
1414 Math.floor(numberOfWorkers / 2),
1416 './tests/worker-files/thread/testWorker.mjs'
1418 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1420 dynamicThreadPool.addTaskFunction(0, () => {})
1421 ).rejects.toThrow(new TypeError('name argument must be a string'))
1423 dynamicThreadPool.addTaskFunction('', () => {})
1425 new TypeError('name argument must not be an empty string')
1427 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1428 new TypeError('fn argument must be a function')
1430 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1431 new TypeError('fn argument must be a function')
1433 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1437 const echoTaskFunction = data => {
1441 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1442 ).resolves.toBe(true)
1443 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1444 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1447 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1452 const taskFunctionData = { test: 'test' }
1453 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1454 expect(echoResult).toStrictEqual(taskFunctionData)
1455 for (const workerNode of dynamicThreadPool.workerNodes) {
1456 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1458 executed: expect.any(Number),
1461 sequentiallyStolen: 0,
1466 history: new CircularArray()
1469 history: new CircularArray()
1473 history: new CircularArray()
1476 history: new CircularArray()
1481 await dynamicThreadPool.destroy()
1484 it('Verify that removeTaskFunction() is working', async () => {
1485 const dynamicThreadPool = new DynamicThreadPool(
1486 Math.floor(numberOfWorkers / 2),
1488 './tests/worker-files/thread/testWorker.mjs'
1490 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1491 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1495 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1496 new Error('Cannot remove a task function not handled on the pool side')
1498 const echoTaskFunction = data => {
1501 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1502 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1503 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1506 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1511 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1514 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1515 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1516 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1520 await dynamicThreadPool.destroy()
1523 it('Verify that listTaskFunctionNames() is working', async () => {
1524 const dynamicThreadPool = new DynamicThreadPool(
1525 Math.floor(numberOfWorkers / 2),
1527 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1529 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1530 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1532 'jsonIntegerSerialization',
1536 await dynamicThreadPool.destroy()
1537 const fixedClusterPool = new FixedClusterPool(
1539 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1541 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1542 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1544 'jsonIntegerSerialization',
1548 await fixedClusterPool.destroy()
1551 it('Verify that setDefaultTaskFunction() is working', async () => {
1552 const dynamicThreadPool = new DynamicThreadPool(
1553 Math.floor(numberOfWorkers / 2),
1555 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1557 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1558 const workerId = dynamicThreadPool.workerNodes[0].info.id
1559 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1561 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1565 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1568 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1572 dynamicThreadPool.setDefaultTaskFunction('unknown')
1575 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1578 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1580 'jsonIntegerSerialization',
1585 dynamicThreadPool.setDefaultTaskFunction('factorial')
1586 ).resolves.toBe(true)
1587 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1590 'jsonIntegerSerialization',
1594 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1595 ).resolves.toBe(true)
1596 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1599 'jsonIntegerSerialization',
1602 await dynamicThreadPool.destroy()
1605 it('Verify that multiple task functions worker is working', async () => {
1606 const pool = new DynamicClusterPool(
1607 Math.floor(numberOfWorkers / 2),
1609 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1611 const data = { n: 10 }
1612 const result0 = await pool.execute(data)
1613 expect(result0).toStrictEqual({ ok: 1 })
1614 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1615 expect(result1).toStrictEqual({ ok: 1 })
1616 const result2 = await pool.execute(data, 'factorial')
1617 expect(result2).toBe(3628800)
1618 const result3 = await pool.execute(data, 'fibonacci')
1619 expect(result3).toBe(55)
1620 expect(pool.info.executingTasks).toBe(0)
1621 expect(pool.info.executedTasks).toBe(4)
1622 for (const workerNode of pool.workerNodes) {
1623 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1625 'jsonIntegerSerialization',
1629 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1630 for (const name of pool.listTaskFunctionNames()) {
1631 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1633 executed: expect.any(Number),
1637 sequentiallyStolen: 0,
1641 history: expect.any(CircularArray)
1644 history: expect.any(CircularArray)
1648 history: expect.any(CircularArray)
1651 history: expect.any(CircularArray)
1656 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1657 ).toBeGreaterThan(0)
1660 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1662 workerNode.getTaskFunctionWorkerUsage(
1663 workerNode.info.taskFunctionNames[1]
1667 await pool.destroy()
1670 it('Verify sendKillMessageToWorker()', async () => {
1671 const pool = new DynamicClusterPool(
1672 Math.floor(numberOfWorkers / 2),
1674 './tests/worker-files/cluster/testWorker.js'
1676 const workerNodeKey = 0
1678 pool.sendKillMessageToWorker(workerNodeKey)
1679 ).resolves.toBeUndefined()
1680 await pool.destroy()
1683 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1684 const pool = new DynamicClusterPool(
1685 Math.floor(numberOfWorkers / 2),
1687 './tests/worker-files/cluster/testWorker.js'
1689 const workerNodeKey = 0
1691 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1692 taskFunctionOperation: 'add',
1693 taskFunctionName: 'empty',
1694 taskFunction: (() => {}).toString()
1696 ).resolves.toBe(true)
1698 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1699 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1700 await pool.destroy()
1703 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1704 const pool = new DynamicClusterPool(
1705 Math.floor(numberOfWorkers / 2),
1707 './tests/worker-files/cluster/testWorker.js'
1710 pool.sendTaskFunctionOperationToWorkers({
1711 taskFunctionOperation: 'add',
1712 taskFunctionName: 'empty',
1713 taskFunction: (() => {}).toString()
1715 ).resolves.toBe(true)
1716 for (const workerNode of pool.workerNodes) {
1717 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1723 await pool.destroy()