1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.js'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.js'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.js'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false },
253 weights: expect.objectContaining({
254 0: expect.any(Number),
255 [pool.info.maxSize - 1]: expect.any(Number)
261 const testHandler = () => console.info('test handler executed')
262 pool = new FixedThreadPool(
264 './tests/worker-files/thread/testWorker.mjs',
266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
267 workerChoiceStrategyOptions: {
268 runTime: { median: true },
269 weights: { 0: 300, 1: 200 }
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
274 tasksQueueOptions: { concurrency: 2 },
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 onlineHandler: testHandler,
278 exitHandler: testHandler
281 expect(pool.emitter).toBeUndefined()
282 expect(pool.opts).toStrictEqual({
285 restartWorkerOnError: false,
286 enableTasksQueue: true,
289 size: Math.pow(numberOfWorkers, 2),
291 tasksStealingOnBackPressure: true,
292 tasksFinishedTimeout: 2000
294 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
295 workerChoiceStrategyOptions: {
296 runTime: { median: true },
297 weights: { 0: 300, 1: 200 }
299 onlineHandler: testHandler,
300 messageHandler: testHandler,
301 errorHandler: testHandler,
302 exitHandler: testHandler
304 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
307 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
308 runTime: { median: true },
309 waitTime: { median: false },
310 elu: { median: false },
311 weights: { 0: 300, 1: 200 }
313 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
314 .workerChoiceStrategies) {
315 expect(workerChoiceStrategy.opts).toStrictEqual({
318 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
319 runTime: { median: true },
320 waitTime: { median: false },
321 elu: { median: false },
322 weights: { 0: 300, 1: 200 }
328 it('Verify that pool options are validated', () => {
333 './tests/worker-files/thread/testWorker.mjs',
335 workerChoiceStrategy: 'invalidStrategy'
338 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
343 './tests/worker-files/thread/testWorker.mjs',
345 workerChoiceStrategyOptions: { weights: {} }
350 'Invalid worker choice strategy options: must have a weight for each worker node'
357 './tests/worker-files/thread/testWorker.mjs',
359 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
364 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
371 './tests/worker-files/thread/testWorker.mjs',
373 enableTasksQueue: true,
374 tasksQueueOptions: 'invalidTasksQueueOptions'
378 new TypeError('Invalid tasks queue options: must be a plain object')
384 './tests/worker-files/thread/testWorker.mjs',
386 enableTasksQueue: true,
387 tasksQueueOptions: { concurrency: 0 }
392 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
399 './tests/worker-files/thread/testWorker.mjs',
401 enableTasksQueue: true,
402 tasksQueueOptions: { concurrency: -1 }
407 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
414 './tests/worker-files/thread/testWorker.mjs',
416 enableTasksQueue: true,
417 tasksQueueOptions: { concurrency: 0.2 }
421 new TypeError('Invalid worker node tasks concurrency: must be an integer')
427 './tests/worker-files/thread/testWorker.mjs',
429 enableTasksQueue: true,
430 tasksQueueOptions: { size: 0 }
435 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
442 './tests/worker-files/thread/testWorker.mjs',
444 enableTasksQueue: true,
445 tasksQueueOptions: { size: -1 }
450 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
457 './tests/worker-files/thread/testWorker.mjs',
459 enableTasksQueue: true,
460 tasksQueueOptions: { size: 0.2 }
464 new TypeError('Invalid worker node tasks queue size: must be an integer')
468 it('Verify that pool worker choice strategy options can be set', async () => {
469 const pool = new FixedThreadPool(
471 './tests/worker-files/thread/testWorker.mjs',
472 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
474 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
475 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
478 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
479 runTime: { median: false },
480 waitTime: { median: false },
481 elu: { median: false },
482 weights: expect.objectContaining({
483 0: expect.any(Number),
484 [pool.info.maxSize - 1]: expect.any(Number)
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
489 expect(workerChoiceStrategy.opts).toStrictEqual(
490 expect.objectContaining({
493 Object.keys(workerChoiceStrategy.opts.weights).length,
494 runTime: { median: false },
495 waitTime: { median: false },
496 elu: { median: false }
501 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
519 pool.setWorkerChoiceStrategyOptions({
520 runTime: { median: true },
521 elu: { median: true }
523 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
524 runTime: { median: true },
525 elu: { median: true }
527 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
530 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
531 runTime: { median: true },
532 waitTime: { median: false },
533 elu: { median: true },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
539 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
540 .workerChoiceStrategies) {
541 expect(workerChoiceStrategy.opts).toStrictEqual(
542 expect.objectContaining({
545 Object.keys(workerChoiceStrategy.opts.weights).length,
546 runTime: { median: true },
547 waitTime: { median: false },
548 elu: { median: true }
553 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
571 pool.setWorkerChoiceStrategyOptions({
572 runTime: { median: false },
573 elu: { median: false }
575 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
576 runTime: { median: false },
577 elu: { median: false }
579 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
582 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
583 runTime: { median: false },
584 waitTime: { median: false },
585 elu: { median: false },
586 weights: expect.objectContaining({
587 0: expect.any(Number),
588 [pool.info.maxSize - 1]: expect.any(Number)
591 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
592 .workerChoiceStrategies) {
593 expect(workerChoiceStrategy.opts).toStrictEqual(
594 expect.objectContaining({
597 Object.keys(workerChoiceStrategy.opts.weights).length,
598 runTime: { median: false },
599 waitTime: { median: false },
600 elu: { median: false }
605 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
624 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
627 'Invalid worker choice strategy options: must be a plain object'
630 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
632 'Invalid worker choice strategy options: must have a weight for each worker node'
636 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
639 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
645 it('Verify that pool tasks queue can be enabled/disabled', async () => {
646 const pool = new FixedThreadPool(
648 './tests/worker-files/thread/testWorker.mjs'
650 expect(pool.opts.enableTasksQueue).toBe(false)
651 expect(pool.opts.tasksQueueOptions).toBeUndefined()
652 pool.enableTasksQueue(true)
653 expect(pool.opts.enableTasksQueue).toBe(true)
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 size: Math.pow(numberOfWorkers, 2),
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 2000
661 pool.enableTasksQueue(true, { concurrency: 2 })
662 expect(pool.opts.enableTasksQueue).toBe(true)
663 expect(pool.opts.tasksQueueOptions).toStrictEqual({
665 size: Math.pow(numberOfWorkers, 2),
667 tasksStealingOnBackPressure: true,
668 tasksFinishedTimeout: 2000
670 pool.enableTasksQueue(false)
671 expect(pool.opts.enableTasksQueue).toBe(false)
672 expect(pool.opts.tasksQueueOptions).toBeUndefined()
676 it('Verify that pool tasks queue options can be set', async () => {
677 const pool = new FixedThreadPool(
679 './tests/worker-files/thread/testWorker.mjs',
680 { enableTasksQueue: true }
682 expect(pool.opts.tasksQueueOptions).toStrictEqual({
684 size: Math.pow(numberOfWorkers, 2),
686 tasksStealingOnBackPressure: true,
687 tasksFinishedTimeout: 2000
689 for (const workerNode of pool.workerNodes) {
690 expect(workerNode.tasksQueueBackPressureSize).toBe(
691 pool.opts.tasksQueueOptions.size
694 pool.setTasksQueueOptions({
698 tasksStealingOnBackPressure: false,
699 tasksFinishedTimeout: 3000
701 expect(pool.opts.tasksQueueOptions).toStrictEqual({
705 tasksStealingOnBackPressure: false,
706 tasksFinishedTimeout: 3000
708 for (const workerNode of pool.workerNodes) {
709 expect(workerNode.tasksQueueBackPressureSize).toBe(
710 pool.opts.tasksQueueOptions.size
713 pool.setTasksQueueOptions({
716 tasksStealingOnBackPressure: true
718 expect(pool.opts.tasksQueueOptions).toStrictEqual({
720 size: Math.pow(numberOfWorkers, 2),
722 tasksStealingOnBackPressure: true,
723 tasksFinishedTimeout: 2000
725 for (const workerNode of pool.workerNodes) {
726 expect(workerNode.tasksQueueBackPressureSize).toBe(
727 pool.opts.tasksQueueOptions.size
730 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
731 new TypeError('Invalid tasks queue options: must be a plain object')
733 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
735 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
738 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
740 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
743 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
744 new TypeError('Invalid worker node tasks concurrency: must be an integer')
746 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
748 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
751 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
753 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
756 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
757 new TypeError('Invalid worker node tasks queue size: must be an integer')
762 it('Verify that pool info is set', async () => {
763 let pool = new FixedThreadPool(
765 './tests/worker-files/thread/testWorker.mjs'
767 expect(pool.info).toStrictEqual({
769 type: PoolTypes.fixed,
770 worker: WorkerTypes.thread,
773 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
774 minSize: numberOfWorkers,
775 maxSize: numberOfWorkers,
776 workerNodes: numberOfWorkers,
777 idleWorkerNodes: numberOfWorkers,
784 pool = new DynamicClusterPool(
785 Math.floor(numberOfWorkers / 2),
787 './tests/worker-files/cluster/testWorker.js'
789 expect(pool.info).toStrictEqual({
791 type: PoolTypes.dynamic,
792 worker: WorkerTypes.cluster,
795 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
796 minSize: Math.floor(numberOfWorkers / 2),
797 maxSize: numberOfWorkers,
798 workerNodes: Math.floor(numberOfWorkers / 2),
799 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
808 it('Verify that pool worker tasks usage are initialized', async () => {
809 const pool = new FixedClusterPool(
811 './tests/worker-files/cluster/testWorker.js'
813 for (const workerNode of pool.workerNodes) {
814 expect(workerNode).toBeInstanceOf(WorkerNode)
815 expect(workerNode.usage).toStrictEqual({
821 sequentiallyStolen: 0,
826 history: new CircularArray()
829 history: new CircularArray()
833 history: new CircularArray()
836 history: new CircularArray()
844 it('Verify that pool worker tasks queue are initialized', async () => {
845 let pool = new FixedClusterPool(
847 './tests/worker-files/cluster/testWorker.js'
849 for (const workerNode of pool.workerNodes) {
850 expect(workerNode).toBeInstanceOf(WorkerNode)
851 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
856 pool = new DynamicThreadPool(
857 Math.floor(numberOfWorkers / 2),
859 './tests/worker-files/thread/testWorker.mjs'
861 for (const workerNode of pool.workerNodes) {
862 expect(workerNode).toBeInstanceOf(WorkerNode)
863 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
864 expect(workerNode.tasksQueue.size).toBe(0)
865 expect(workerNode.tasksQueue.maxSize).toBe(0)
870 it('Verify that pool worker info are initialized', async () => {
871 let pool = new FixedClusterPool(
873 './tests/worker-files/cluster/testWorker.js'
875 for (const workerNode of pool.workerNodes) {
876 expect(workerNode).toBeInstanceOf(WorkerNode)
877 expect(workerNode.info).toStrictEqual({
878 id: expect.any(Number),
879 type: WorkerTypes.cluster,
885 pool = new DynamicThreadPool(
886 Math.floor(numberOfWorkers / 2),
888 './tests/worker-files/thread/testWorker.mjs'
890 for (const workerNode of pool.workerNodes) {
891 expect(workerNode).toBeInstanceOf(WorkerNode)
892 expect(workerNode.info).toStrictEqual({
893 id: expect.any(Number),
894 type: WorkerTypes.thread,
902 it('Verify that pool statuses are checked at start or destroy', async () => {
903 const pool = new FixedThreadPool(
905 './tests/worker-files/thread/testWorker.mjs'
907 expect(pool.info.started).toBe(true)
908 expect(pool.info.ready).toBe(true)
909 expect(() => pool.start()).toThrow(
910 new Error('Cannot start an already started pool')
913 expect(pool.info.started).toBe(false)
914 expect(pool.info.ready).toBe(false)
915 await expect(pool.destroy()).rejects.toThrow(
916 new Error('Cannot destroy an already destroyed pool')
920 it('Verify that pool can be started after initialization', async () => {
921 const pool = new FixedClusterPool(
923 './tests/worker-files/cluster/testWorker.js',
928 expect(pool.info.started).toBe(false)
929 expect(pool.info.ready).toBe(false)
930 expect(pool.readyEventEmitted).toBe(false)
931 expect(pool.workerNodes).toStrictEqual([])
932 await expect(pool.execute()).rejects.toThrow(
933 new Error('Cannot execute a task on not started pool')
936 expect(pool.info.started).toBe(true)
937 expect(pool.info.ready).toBe(true)
938 await waitPoolEvents(pool, PoolEvents.ready, 1)
939 expect(pool.readyEventEmitted).toBe(true)
940 expect(pool.workerNodes.length).toBe(numberOfWorkers)
941 for (const workerNode of pool.workerNodes) {
942 expect(workerNode).toBeInstanceOf(WorkerNode)
947 it('Verify that pool execute() arguments are checked', async () => {
948 const pool = new FixedClusterPool(
950 './tests/worker-files/cluster/testWorker.js'
952 await expect(pool.execute(undefined, 0)).rejects.toThrow(
953 new TypeError('name argument must be a string')
955 await expect(pool.execute(undefined, '')).rejects.toThrow(
956 new TypeError('name argument must not be an empty string')
958 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
959 new TypeError('transferList argument must be an array')
961 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
962 "Task function 'unknown' not found"
965 await expect(pool.execute()).rejects.toThrow(
966 new Error('Cannot execute a task on not started pool')
970 it('Verify that pool worker tasks usage are computed', async () => {
971 const pool = new FixedClusterPool(
973 './tests/worker-files/cluster/testWorker.js'
975 const promises = new Set()
976 const maxMultiplier = 2
977 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
978 promises.add(pool.execute())
980 for (const workerNode of pool.workerNodes) {
981 expect(workerNode.usage).toStrictEqual({
984 executing: maxMultiplier,
987 sequentiallyStolen: 0,
992 history: expect.any(CircularArray)
995 history: expect.any(CircularArray)
999 history: expect.any(CircularArray)
1002 history: expect.any(CircularArray)
1007 await Promise.all(promises)
1008 for (const workerNode of pool.workerNodes) {
1009 expect(workerNode.usage).toStrictEqual({
1011 executed: maxMultiplier,
1015 sequentiallyStolen: 0,
1020 history: expect.any(CircularArray)
1023 history: expect.any(CircularArray)
1027 history: expect.any(CircularArray)
1030 history: expect.any(CircularArray)
1035 await pool.destroy()
1038 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1039 const pool = new DynamicThreadPool(
1040 Math.floor(numberOfWorkers / 2),
1042 './tests/worker-files/thread/testWorker.mjs'
1044 const promises = new Set()
1045 const maxMultiplier = 2
1046 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1047 promises.add(pool.execute())
1049 await Promise.all(promises)
1050 for (const workerNode of pool.workerNodes) {
1051 expect(workerNode.usage).toStrictEqual({
1053 executed: expect.any(Number),
1057 sequentiallyStolen: 0,
1062 history: expect.any(CircularArray)
1065 history: expect.any(CircularArray)
1069 history: expect.any(CircularArray)
1072 history: expect.any(CircularArray)
1076 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1077 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1078 numberOfWorkers * maxMultiplier
1080 expect(workerNode.usage.runTime.history.length).toBe(0)
1081 expect(workerNode.usage.waitTime.history.length).toBe(0)
1082 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1083 expect(workerNode.usage.elu.active.history.length).toBe(0)
1085 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1086 for (const workerNode of pool.workerNodes) {
1087 expect(workerNode.usage).toStrictEqual({
1093 sequentiallyStolen: 0,
1098 history: expect.any(CircularArray)
1101 history: expect.any(CircularArray)
1105 history: expect.any(CircularArray)
1108 history: expect.any(CircularArray)
1112 expect(workerNode.usage.runTime.history.length).toBe(0)
1113 expect(workerNode.usage.waitTime.history.length).toBe(0)
1114 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1115 expect(workerNode.usage.elu.active.history.length).toBe(0)
1117 await pool.destroy()
1120 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1121 const pool = new DynamicClusterPool(
1122 Math.floor(numberOfWorkers / 2),
1124 './tests/worker-files/cluster/testWorker.js'
1126 expect(pool.emitter.eventNames()).toStrictEqual([])
1129 pool.emitter.on(PoolEvents.ready, info => {
1133 await waitPoolEvents(pool, PoolEvents.ready, 1)
1134 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1135 expect(poolReady).toBe(1)
1136 expect(poolInfo).toStrictEqual({
1138 type: PoolTypes.dynamic,
1139 worker: WorkerTypes.cluster,
1142 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1143 minSize: expect.any(Number),
1144 maxSize: expect.any(Number),
1145 workerNodes: expect.any(Number),
1146 idleWorkerNodes: expect.any(Number),
1147 busyWorkerNodes: expect.any(Number),
1148 executedTasks: expect.any(Number),
1149 executingTasks: expect.any(Number),
1150 failedTasks: expect.any(Number)
1152 await pool.destroy()
1155 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1156 const pool = new FixedThreadPool(
1158 './tests/worker-files/thread/testWorker.mjs'
1160 expect(pool.emitter.eventNames()).toStrictEqual([])
1161 const promises = new Set()
1164 pool.emitter.on(PoolEvents.busy, info => {
1168 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1169 for (let i = 0; i < numberOfWorkers * 2; i++) {
1170 promises.add(pool.execute())
1172 await Promise.all(promises)
1173 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1174 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1175 expect(poolBusy).toBe(numberOfWorkers + 1)
1176 expect(poolInfo).toStrictEqual({
1178 type: PoolTypes.fixed,
1179 worker: WorkerTypes.thread,
1182 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1183 minSize: expect.any(Number),
1184 maxSize: expect.any(Number),
1185 workerNodes: expect.any(Number),
1186 idleWorkerNodes: expect.any(Number),
1187 busyWorkerNodes: expect.any(Number),
1188 executedTasks: expect.any(Number),
1189 executingTasks: expect.any(Number),
1190 failedTasks: expect.any(Number)
1192 await pool.destroy()
1195 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1196 const pool = new DynamicThreadPool(
1197 Math.floor(numberOfWorkers / 2),
1199 './tests/worker-files/thread/testWorker.mjs'
1201 expect(pool.emitter.eventNames()).toStrictEqual([])
1202 const promises = new Set()
1205 pool.emitter.on(PoolEvents.full, info => {
1209 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1210 for (let i = 0; i < numberOfWorkers * 2; i++) {
1211 promises.add(pool.execute())
1213 await Promise.all(promises)
1214 expect(poolFull).toBe(1)
1215 expect(poolInfo).toStrictEqual({
1217 type: PoolTypes.dynamic,
1218 worker: WorkerTypes.thread,
1221 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1222 minSize: expect.any(Number),
1223 maxSize: expect.any(Number),
1224 workerNodes: expect.any(Number),
1225 idleWorkerNodes: expect.any(Number),
1226 busyWorkerNodes: expect.any(Number),
1227 executedTasks: expect.any(Number),
1228 executingTasks: expect.any(Number),
1229 failedTasks: expect.any(Number)
1231 await pool.destroy()
1234 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1235 const pool = new FixedThreadPool(
1237 './tests/worker-files/thread/testWorker.mjs',
1239 enableTasksQueue: true
1242 stub(pool, 'hasBackPressure').returns(true)
1243 expect(pool.emitter.eventNames()).toStrictEqual([])
1244 const promises = new Set()
1245 let poolBackPressure = 0
1247 pool.emitter.on(PoolEvents.backPressure, info => {
1251 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1252 for (let i = 0; i < numberOfWorkers + 1; i++) {
1253 promises.add(pool.execute())
1255 await Promise.all(promises)
1256 expect(poolBackPressure).toBe(1)
1257 expect(poolInfo).toStrictEqual({
1259 type: PoolTypes.fixed,
1260 worker: WorkerTypes.thread,
1263 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1264 minSize: expect.any(Number),
1265 maxSize: expect.any(Number),
1266 workerNodes: expect.any(Number),
1267 idleWorkerNodes: expect.any(Number),
1268 busyWorkerNodes: expect.any(Number),
1269 executedTasks: expect.any(Number),
1270 executingTasks: expect.any(Number),
1271 maxQueuedTasks: expect.any(Number),
1272 queuedTasks: expect.any(Number),
1274 stolenTasks: expect.any(Number),
1275 failedTasks: expect.any(Number)
1277 expect(pool.hasBackPressure.callCount).toBe(5)
1278 await pool.destroy()
1281 it('Verify that destroy() waits for queued tasks to finish', async () => {
1282 const tasksFinishedTimeout = 2500
1283 const pool = new FixedThreadPool(
1285 './tests/worker-files/thread/asyncWorker.mjs',
1287 enableTasksQueue: true,
1288 tasksQueueOptions: { tasksFinishedTimeout }
1291 const maxMultiplier = 4
1292 let tasksFinished = 0
1293 for (const workerNode of pool.workerNodes) {
1294 workerNode.on('taskFinished', () => {
1298 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1301 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1302 const startTime = performance.now()
1303 await pool.destroy()
1304 const elapsedTime = performance.now() - startTime
1305 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1306 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1307 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1310 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1311 const tasksFinishedTimeout = 1000
1312 const pool = new FixedThreadPool(
1314 './tests/worker-files/thread/asyncWorker.mjs',
1316 enableTasksQueue: true,
1317 tasksQueueOptions: { tasksFinishedTimeout }
1320 const maxMultiplier = 4
1321 let tasksFinished = 0
1322 for (const workerNode of pool.workerNodes) {
1323 workerNode.on('taskFinished', () => {
1327 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1330 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1331 const startTime = performance.now()
1332 await pool.destroy()
1333 const elapsedTime = performance.now() - startTime
1334 expect(tasksFinished).toBe(0)
1335 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1338 it('Verify that pool asynchronous resource track tasks execution', async () => {
1343 let resolveCalls = 0
1344 const hook = createHook({
1345 init (asyncId, type) {
1346 if (type === 'poolifier:task') {
1348 taskAsyncId = asyncId
1352 if (asyncId === taskAsyncId) beforeCalls++
1355 if (asyncId === taskAsyncId) afterCalls++
1358 if (executionAsyncId() === taskAsyncId) resolveCalls++
1361 const pool = new FixedThreadPool(
1363 './tests/worker-files/thread/testWorker.mjs'
1366 await pool.execute()
1368 expect(initCalls).toBe(1)
1369 expect(beforeCalls).toBe(1)
1370 expect(afterCalls).toBe(1)
1371 expect(resolveCalls).toBe(1)
1372 await pool.destroy()
1375 it('Verify that hasTaskFunction() is working', async () => {
1376 const dynamicThreadPool = new DynamicThreadPool(
1377 Math.floor(numberOfWorkers / 2),
1379 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1381 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1382 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1383 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1386 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1387 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1388 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1389 await dynamicThreadPool.destroy()
1390 const fixedClusterPool = new FixedClusterPool(
1392 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1394 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1395 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1396 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1399 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1400 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1401 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1402 await fixedClusterPool.destroy()
1405 it('Verify that addTaskFunction() is working', async () => {
1406 const dynamicThreadPool = new DynamicThreadPool(
1407 Math.floor(numberOfWorkers / 2),
1409 './tests/worker-files/thread/testWorker.mjs'
1411 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1413 dynamicThreadPool.addTaskFunction(0, () => {})
1414 ).rejects.toThrow(new TypeError('name argument must be a string'))
1416 dynamicThreadPool.addTaskFunction('', () => {})
1418 new TypeError('name argument must not be an empty string')
1420 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1421 new TypeError('fn argument must be a function')
1423 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1424 new TypeError('fn argument must be a function')
1426 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1430 const echoTaskFunction = data => {
1434 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1435 ).resolves.toBe(true)
1436 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1437 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1440 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1445 const taskFunctionData = { test: 'test' }
1446 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1447 expect(echoResult).toStrictEqual(taskFunctionData)
1448 for (const workerNode of dynamicThreadPool.workerNodes) {
1449 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1451 executed: expect.any(Number),
1454 sequentiallyStolen: 0,
1459 history: new CircularArray()
1462 history: new CircularArray()
1466 history: new CircularArray()
1469 history: new CircularArray()
1474 await dynamicThreadPool.destroy()
1477 it('Verify that removeTaskFunction() is working', async () => {
1478 const dynamicThreadPool = new DynamicThreadPool(
1479 Math.floor(numberOfWorkers / 2),
1481 './tests/worker-files/thread/testWorker.mjs'
1483 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1484 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1488 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1489 new Error('Cannot remove a task function not handled on the pool side')
1491 const echoTaskFunction = data => {
1494 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1495 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1496 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1499 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1504 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1507 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1508 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1509 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1513 await dynamicThreadPool.destroy()
1516 it('Verify that listTaskFunctionNames() is working', async () => {
1517 const dynamicThreadPool = new DynamicThreadPool(
1518 Math.floor(numberOfWorkers / 2),
1520 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1522 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1523 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1525 'jsonIntegerSerialization',
1529 await dynamicThreadPool.destroy()
1530 const fixedClusterPool = new FixedClusterPool(
1532 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1534 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1535 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1537 'jsonIntegerSerialization',
1541 await fixedClusterPool.destroy()
1544 it('Verify that setDefaultTaskFunction() is working', async () => {
1545 const dynamicThreadPool = new DynamicThreadPool(
1546 Math.floor(numberOfWorkers / 2),
1548 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1550 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1551 const workerId = dynamicThreadPool.workerNodes[0].info.id
1552 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1554 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1558 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1561 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1565 dynamicThreadPool.setDefaultTaskFunction('unknown')
1568 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1571 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1573 'jsonIntegerSerialization',
1578 dynamicThreadPool.setDefaultTaskFunction('factorial')
1579 ).resolves.toBe(true)
1580 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1583 'jsonIntegerSerialization',
1587 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1588 ).resolves.toBe(true)
1589 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1592 'jsonIntegerSerialization',
1595 await dynamicThreadPool.destroy()
1598 it('Verify that multiple task functions worker is working', async () => {
1599 const pool = new DynamicClusterPool(
1600 Math.floor(numberOfWorkers / 2),
1602 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1604 const data = { n: 10 }
1605 const result0 = await pool.execute(data)
1606 expect(result0).toStrictEqual({ ok: 1 })
1607 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1608 expect(result1).toStrictEqual({ ok: 1 })
1609 const result2 = await pool.execute(data, 'factorial')
1610 expect(result2).toBe(3628800)
1611 const result3 = await pool.execute(data, 'fibonacci')
1612 expect(result3).toBe(55)
1613 expect(pool.info.executingTasks).toBe(0)
1614 expect(pool.info.executedTasks).toBe(4)
1615 for (const workerNode of pool.workerNodes) {
1616 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1618 'jsonIntegerSerialization',
1622 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1623 for (const name of pool.listTaskFunctionNames()) {
1624 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1626 executed: expect.any(Number),
1630 sequentiallyStolen: 0,
1634 history: expect.any(CircularArray)
1637 history: expect.any(CircularArray)
1641 history: expect.any(CircularArray)
1644 history: expect.any(CircularArray)
1649 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1650 ).toBeGreaterThan(0)
1653 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1655 workerNode.getTaskFunctionWorkerUsage(
1656 workerNode.info.taskFunctionNames[1]
1660 await pool.destroy()
1663 it('Verify sendKillMessageToWorker()', async () => {
1664 const pool = new DynamicClusterPool(
1665 Math.floor(numberOfWorkers / 2),
1667 './tests/worker-files/cluster/testWorker.js'
1669 const workerNodeKey = 0
1671 pool.sendKillMessageToWorker(workerNodeKey)
1672 ).resolves.toBeUndefined()
1674 pool.sendKillMessageToWorker(numberOfWorkers)
1675 ).rejects.toStrictEqual(
1676 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1678 await pool.destroy()
1681 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1682 const pool = new DynamicClusterPool(
1683 Math.floor(numberOfWorkers / 2),
1685 './tests/worker-files/cluster/testWorker.js'
1687 const workerNodeKey = 0
1689 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1690 taskFunctionOperation: 'add',
1691 taskFunctionName: 'empty',
1692 taskFunction: (() => {}).toString()
1694 ).resolves.toBe(true)
1696 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1697 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1698 await pool.destroy()
1701 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1702 const pool = new DynamicClusterPool(
1703 Math.floor(numberOfWorkers / 2),
1705 './tests/worker-files/cluster/testWorker.js'
1708 pool.sendTaskFunctionOperationToWorkers({
1709 taskFunctionOperation: 'add',
1710 taskFunctionName: 'empty',
1711 taskFunction: (() => {}).toString()
1713 ).resolves.toBe(true)
1714 for (const workerNode of pool.workerNodes) {
1715 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1721 await pool.destroy()