1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.js'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.js'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.js'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
234 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
235 runTime: { median: false },
236 waitTime: { median: false },
237 elu: { median: false },
238 weights: expect.objectContaining({
239 0: expect.any(Number),
240 [pool.info.maxSize - 1]: expect.any(Number)
243 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
244 .workerChoiceStrategies) {
245 expect(workerChoiceStrategy.opts).toStrictEqual(
246 expect.objectContaining({
249 Object.keys(workerChoiceStrategy.opts.weights).length,
250 runTime: { median: false },
251 waitTime: { median: false },
252 elu: { median: false }
253 // weights: expect.objectContaining({
254 // 0: expect.any(Number),
255 // [pool.info.maxSize - 1]: expect.any(Number)
261 const testHandler = () => console.info('test handler executed')
262 pool = new FixedThreadPool(
264 './tests/worker-files/thread/testWorker.mjs',
266 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
267 workerChoiceStrategyOptions: {
268 runTime: { median: true },
269 weights: { 0: 300, 1: 200 }
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
274 tasksQueueOptions: { concurrency: 2 },
275 messageHandler: testHandler,
276 errorHandler: testHandler,
277 onlineHandler: testHandler,
278 exitHandler: testHandler
281 expect(pool.emitter).toBeUndefined()
282 expect(pool.opts).toStrictEqual({
285 restartWorkerOnError: false,
286 enableTasksQueue: true,
289 size: Math.pow(numberOfWorkers, 2),
291 tasksStealingOnBackPressure: true,
292 tasksFinishedTimeout: 2000
294 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
295 workerChoiceStrategyOptions: {
296 runTime: { median: true },
297 weights: { 0: 300, 1: 200 }
299 onlineHandler: testHandler,
300 messageHandler: testHandler,
301 errorHandler: testHandler,
302 exitHandler: testHandler
304 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
307 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
308 runTime: { median: true },
309 waitTime: { median: false },
310 elu: { median: false },
311 weights: { 0: 300, 1: 200 }
313 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
314 .workerChoiceStrategies) {
315 expect(workerChoiceStrategy.opts).toStrictEqual({
318 Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
319 runTime: { median: true },
320 waitTime: { median: false },
321 elu: { median: false },
322 weights: { 0: 300, 1: 200 }
328 it('Verify that pool options are validated', () => {
333 './tests/worker-files/thread/testWorker.mjs',
335 workerChoiceStrategy: 'invalidStrategy'
338 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
343 './tests/worker-files/thread/testWorker.mjs',
345 workerChoiceStrategyOptions: { weights: {} }
350 'Invalid worker choice strategy options: must have a weight for each worker node'
357 './tests/worker-files/thread/testWorker.mjs',
359 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
364 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
371 './tests/worker-files/thread/testWorker.mjs',
373 enableTasksQueue: true,
374 tasksQueueOptions: 'invalidTasksQueueOptions'
378 new TypeError('Invalid tasks queue options: must be a plain object')
384 './tests/worker-files/thread/testWorker.mjs',
386 enableTasksQueue: true,
387 tasksQueueOptions: { concurrency: 0 }
392 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
399 './tests/worker-files/thread/testWorker.mjs',
401 enableTasksQueue: true,
402 tasksQueueOptions: { concurrency: -1 }
407 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
414 './tests/worker-files/thread/testWorker.mjs',
416 enableTasksQueue: true,
417 tasksQueueOptions: { concurrency: 0.2 }
421 new TypeError('Invalid worker node tasks concurrency: must be an integer')
427 './tests/worker-files/thread/testWorker.mjs',
429 enableTasksQueue: true,
430 tasksQueueOptions: { size: 0 }
435 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
442 './tests/worker-files/thread/testWorker.mjs',
444 enableTasksQueue: true,
445 tasksQueueOptions: { size: -1 }
450 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
457 './tests/worker-files/thread/testWorker.mjs',
459 enableTasksQueue: true,
460 tasksQueueOptions: { size: 0.2 }
464 new TypeError('Invalid worker node tasks queue size: must be an integer')
468 it('Verify that pool worker choice strategy options can be set', async () => {
469 const pool = new FixedThreadPool(
471 './tests/worker-files/thread/testWorker.mjs',
472 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
474 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
475 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
478 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
479 runTime: { median: false },
480 waitTime: { median: false },
481 elu: { median: false },
482 weights: expect.objectContaining({
483 0: expect.any(Number),
484 [pool.info.maxSize - 1]: expect.any(Number)
487 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
488 .workerChoiceStrategies) {
489 expect(workerChoiceStrategy.opts).toStrictEqual({
492 Object.keys(workerChoiceStrategy.opts.weights).length,
493 runTime: { median: false },
494 waitTime: { median: false },
495 elu: { median: false },
496 weights: expect.objectContaining({
497 0: expect.any(Number),
498 [pool.info.maxSize - 1]: expect.any(Number)
503 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
521 pool.setWorkerChoiceStrategyOptions({
522 runTime: { median: true },
523 elu: { median: true }
525 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
526 runTime: { median: true },
527 elu: { median: true }
529 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
532 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
533 runTime: { median: true },
534 waitTime: { median: false },
535 elu: { median: true },
536 weights: expect.objectContaining({
537 0: expect.any(Number),
538 [pool.info.maxSize - 1]: expect.any(Number)
541 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
542 .workerChoiceStrategies) {
543 expect(workerChoiceStrategy.opts).toStrictEqual({
546 Object.keys(workerChoiceStrategy.opts.weights).length,
547 runTime: { median: true },
548 waitTime: { median: false },
549 elu: { median: true },
550 weights: expect.objectContaining({
551 0: expect.any(Number),
552 [pool.info.maxSize - 1]: expect.any(Number)
557 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
575 pool.setWorkerChoiceStrategyOptions({
576 runTime: { median: false },
577 elu: { median: false }
579 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
580 runTime: { median: false },
581 elu: { median: false }
583 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
586 Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
587 runTime: { median: false },
588 waitTime: { median: false },
589 elu: { median: false },
590 weights: expect.objectContaining({
591 0: expect.any(Number),
592 [pool.info.maxSize - 1]: expect.any(Number)
595 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
596 .workerChoiceStrategies) {
597 expect(workerChoiceStrategy.opts).toStrictEqual({
600 Object.keys(workerChoiceStrategy.opts.weights).length,
601 runTime: { median: false },
602 waitTime: { median: false },
603 elu: { median: false },
604 weights: expect.objectContaining({
605 0: expect.any(Number),
606 [pool.info.maxSize - 1]: expect.any(Number)
611 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
630 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
633 'Invalid worker choice strategy options: must be a plain object'
636 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
638 'Invalid worker choice strategy options: must have a weight for each worker node'
642 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
645 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
651 it('Verify that pool tasks queue can be enabled/disabled', async () => {
652 const pool = new FixedThreadPool(
654 './tests/worker-files/thread/testWorker.mjs'
656 expect(pool.opts.enableTasksQueue).toBe(false)
657 expect(pool.opts.tasksQueueOptions).toBeUndefined()
658 pool.enableTasksQueue(true)
659 expect(pool.opts.enableTasksQueue).toBe(true)
660 expect(pool.opts.tasksQueueOptions).toStrictEqual({
662 size: Math.pow(numberOfWorkers, 2),
664 tasksStealingOnBackPressure: true,
665 tasksFinishedTimeout: 2000
667 pool.enableTasksQueue(true, { concurrency: 2 })
668 expect(pool.opts.enableTasksQueue).toBe(true)
669 expect(pool.opts.tasksQueueOptions).toStrictEqual({
671 size: Math.pow(numberOfWorkers, 2),
673 tasksStealingOnBackPressure: true,
674 tasksFinishedTimeout: 2000
676 pool.enableTasksQueue(false)
677 expect(pool.opts.enableTasksQueue).toBe(false)
678 expect(pool.opts.tasksQueueOptions).toBeUndefined()
682 it('Verify that pool tasks queue options can be set', async () => {
683 const pool = new FixedThreadPool(
685 './tests/worker-files/thread/testWorker.mjs',
686 { enableTasksQueue: true }
688 expect(pool.opts.tasksQueueOptions).toStrictEqual({
690 size: Math.pow(numberOfWorkers, 2),
692 tasksStealingOnBackPressure: true,
693 tasksFinishedTimeout: 2000
695 for (const workerNode of pool.workerNodes) {
696 expect(workerNode.tasksQueueBackPressureSize).toBe(
697 pool.opts.tasksQueueOptions.size
700 pool.setTasksQueueOptions({
704 tasksStealingOnBackPressure: false,
705 tasksFinishedTimeout: 3000
707 expect(pool.opts.tasksQueueOptions).toStrictEqual({
711 tasksStealingOnBackPressure: false,
712 tasksFinishedTimeout: 3000
714 for (const workerNode of pool.workerNodes) {
715 expect(workerNode.tasksQueueBackPressureSize).toBe(
716 pool.opts.tasksQueueOptions.size
719 pool.setTasksQueueOptions({
722 tasksStealingOnBackPressure: true
724 expect(pool.opts.tasksQueueOptions).toStrictEqual({
726 size: Math.pow(numberOfWorkers, 2),
728 tasksStealingOnBackPressure: true,
729 tasksFinishedTimeout: 2000
731 for (const workerNode of pool.workerNodes) {
732 expect(workerNode.tasksQueueBackPressureSize).toBe(
733 pool.opts.tasksQueueOptions.size
736 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
737 new TypeError('Invalid tasks queue options: must be a plain object')
739 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
741 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
744 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
746 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
749 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
750 new TypeError('Invalid worker node tasks concurrency: must be an integer')
752 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
754 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
757 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
759 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
762 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
763 new TypeError('Invalid worker node tasks queue size: must be an integer')
768 it('Verify that pool info is set', async () => {
769 let pool = new FixedThreadPool(
771 './tests/worker-files/thread/testWorker.mjs'
773 expect(pool.info).toStrictEqual({
775 type: PoolTypes.fixed,
776 worker: WorkerTypes.thread,
779 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
780 minSize: numberOfWorkers,
781 maxSize: numberOfWorkers,
782 workerNodes: numberOfWorkers,
783 idleWorkerNodes: numberOfWorkers,
790 pool = new DynamicClusterPool(
791 Math.floor(numberOfWorkers / 2),
793 './tests/worker-files/cluster/testWorker.js'
795 expect(pool.info).toStrictEqual({
797 type: PoolTypes.dynamic,
798 worker: WorkerTypes.cluster,
801 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
802 minSize: Math.floor(numberOfWorkers / 2),
803 maxSize: numberOfWorkers,
804 workerNodes: Math.floor(numberOfWorkers / 2),
805 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
814 it('Verify that pool worker tasks usage are initialized', async () => {
815 const pool = new FixedClusterPool(
817 './tests/worker-files/cluster/testWorker.js'
819 for (const workerNode of pool.workerNodes) {
820 expect(workerNode).toBeInstanceOf(WorkerNode)
821 expect(workerNode.usage).toStrictEqual({
827 sequentiallyStolen: 0,
832 history: new CircularArray()
835 history: new CircularArray()
839 history: new CircularArray()
842 history: new CircularArray()
850 it('Verify that pool worker tasks queue are initialized', async () => {
851 let pool = new FixedClusterPool(
853 './tests/worker-files/cluster/testWorker.js'
855 for (const workerNode of pool.workerNodes) {
856 expect(workerNode).toBeInstanceOf(WorkerNode)
857 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
858 expect(workerNode.tasksQueue.size).toBe(0)
859 expect(workerNode.tasksQueue.maxSize).toBe(0)
862 pool = new DynamicThreadPool(
863 Math.floor(numberOfWorkers / 2),
865 './tests/worker-files/thread/testWorker.mjs'
867 for (const workerNode of pool.workerNodes) {
868 expect(workerNode).toBeInstanceOf(WorkerNode)
869 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
870 expect(workerNode.tasksQueue.size).toBe(0)
871 expect(workerNode.tasksQueue.maxSize).toBe(0)
876 it('Verify that pool worker info are initialized', async () => {
877 let pool = new FixedClusterPool(
879 './tests/worker-files/cluster/testWorker.js'
881 for (const workerNode of pool.workerNodes) {
882 expect(workerNode).toBeInstanceOf(WorkerNode)
883 expect(workerNode.info).toStrictEqual({
884 id: expect.any(Number),
885 type: WorkerTypes.cluster,
891 pool = new DynamicThreadPool(
892 Math.floor(numberOfWorkers / 2),
894 './tests/worker-files/thread/testWorker.mjs'
896 for (const workerNode of pool.workerNodes) {
897 expect(workerNode).toBeInstanceOf(WorkerNode)
898 expect(workerNode.info).toStrictEqual({
899 id: expect.any(Number),
900 type: WorkerTypes.thread,
908 it('Verify that pool statuses are checked at start or destroy', async () => {
909 const pool = new FixedThreadPool(
911 './tests/worker-files/thread/testWorker.mjs'
913 expect(pool.info.started).toBe(true)
914 expect(pool.info.ready).toBe(true)
915 expect(() => pool.start()).toThrow(
916 new Error('Cannot start an already started pool')
919 expect(pool.info.started).toBe(false)
920 expect(pool.info.ready).toBe(false)
921 await expect(pool.destroy()).rejects.toThrow(
922 new Error('Cannot destroy an already destroyed pool')
926 it('Verify that pool can be started after initialization', async () => {
927 const pool = new FixedClusterPool(
929 './tests/worker-files/cluster/testWorker.js',
934 expect(pool.info.started).toBe(false)
935 expect(pool.info.ready).toBe(false)
936 expect(pool.readyEventEmitted).toBe(false)
937 expect(pool.workerNodes).toStrictEqual([])
938 await expect(pool.execute()).rejects.toThrow(
939 new Error('Cannot execute a task on not started pool')
942 expect(pool.info.started).toBe(true)
943 expect(pool.info.ready).toBe(true)
944 await waitPoolEvents(pool, PoolEvents.ready, 1)
945 expect(pool.readyEventEmitted).toBe(true)
946 expect(pool.workerNodes.length).toBe(numberOfWorkers)
947 for (const workerNode of pool.workerNodes) {
948 expect(workerNode).toBeInstanceOf(WorkerNode)
953 it('Verify that pool execute() arguments are checked', async () => {
954 const pool = new FixedClusterPool(
956 './tests/worker-files/cluster/testWorker.js'
958 await expect(pool.execute(undefined, 0)).rejects.toThrow(
959 new TypeError('name argument must be a string')
961 await expect(pool.execute(undefined, '')).rejects.toThrow(
962 new TypeError('name argument must not be an empty string')
964 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
965 new TypeError('transferList argument must be an array')
967 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
968 "Task function 'unknown' not found"
971 await expect(pool.execute()).rejects.toThrow(
972 new Error('Cannot execute a task on not started pool')
976 it('Verify that pool worker tasks usage are computed', async () => {
977 const pool = new FixedClusterPool(
979 './tests/worker-files/cluster/testWorker.js'
981 const promises = new Set()
982 const maxMultiplier = 2
983 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
984 promises.add(pool.execute())
986 for (const workerNode of pool.workerNodes) {
987 expect(workerNode.usage).toStrictEqual({
990 executing: maxMultiplier,
993 sequentiallyStolen: 0,
998 history: expect.any(CircularArray)
1001 history: expect.any(CircularArray)
1005 history: expect.any(CircularArray)
1008 history: expect.any(CircularArray)
1013 await Promise.all(promises)
1014 for (const workerNode of pool.workerNodes) {
1015 expect(workerNode.usage).toStrictEqual({
1017 executed: maxMultiplier,
1021 sequentiallyStolen: 0,
1026 history: expect.any(CircularArray)
1029 history: expect.any(CircularArray)
1033 history: expect.any(CircularArray)
1036 history: expect.any(CircularArray)
1041 await pool.destroy()
1044 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1045 const pool = new DynamicThreadPool(
1046 Math.floor(numberOfWorkers / 2),
1048 './tests/worker-files/thread/testWorker.mjs'
1050 const promises = new Set()
1051 const maxMultiplier = 2
1052 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1053 promises.add(pool.execute())
1055 await Promise.all(promises)
1056 for (const workerNode of pool.workerNodes) {
1057 expect(workerNode.usage).toStrictEqual({
1059 executed: expect.any(Number),
1063 sequentiallyStolen: 0,
1068 history: expect.any(CircularArray)
1071 history: expect.any(CircularArray)
1075 history: expect.any(CircularArray)
1078 history: expect.any(CircularArray)
1082 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1083 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1084 numberOfWorkers * maxMultiplier
1086 expect(workerNode.usage.runTime.history.length).toBe(0)
1087 expect(workerNode.usage.waitTime.history.length).toBe(0)
1088 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1089 expect(workerNode.usage.elu.active.history.length).toBe(0)
1091 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1092 for (const workerNode of pool.workerNodes) {
1093 expect(workerNode.usage).toStrictEqual({
1099 sequentiallyStolen: 0,
1104 history: expect.any(CircularArray)
1107 history: expect.any(CircularArray)
1111 history: expect.any(CircularArray)
1114 history: expect.any(CircularArray)
1118 expect(workerNode.usage.runTime.history.length).toBe(0)
1119 expect(workerNode.usage.waitTime.history.length).toBe(0)
1120 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1121 expect(workerNode.usage.elu.active.history.length).toBe(0)
1123 await pool.destroy()
1126 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1127 const pool = new DynamicClusterPool(
1128 Math.floor(numberOfWorkers / 2),
1130 './tests/worker-files/cluster/testWorker.js'
1132 expect(pool.emitter.eventNames()).toStrictEqual([])
1135 pool.emitter.on(PoolEvents.ready, info => {
1139 await waitPoolEvents(pool, PoolEvents.ready, 1)
1140 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1141 expect(poolReady).toBe(1)
1142 expect(poolInfo).toStrictEqual({
1144 type: PoolTypes.dynamic,
1145 worker: WorkerTypes.cluster,
1148 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1149 minSize: expect.any(Number),
1150 maxSize: expect.any(Number),
1151 workerNodes: expect.any(Number),
1152 idleWorkerNodes: expect.any(Number),
1153 busyWorkerNodes: expect.any(Number),
1154 executedTasks: expect.any(Number),
1155 executingTasks: expect.any(Number),
1156 failedTasks: expect.any(Number)
1158 await pool.destroy()
1161 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1162 const pool = new FixedThreadPool(
1164 './tests/worker-files/thread/testWorker.mjs'
1166 expect(pool.emitter.eventNames()).toStrictEqual([])
1167 const promises = new Set()
1170 pool.emitter.on(PoolEvents.busy, info => {
1174 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1175 for (let i = 0; i < numberOfWorkers * 2; i++) {
1176 promises.add(pool.execute())
1178 await Promise.all(promises)
1179 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1180 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1181 expect(poolBusy).toBe(numberOfWorkers + 1)
1182 expect(poolInfo).toStrictEqual({
1184 type: PoolTypes.fixed,
1185 worker: WorkerTypes.thread,
1188 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1189 minSize: expect.any(Number),
1190 maxSize: expect.any(Number),
1191 workerNodes: expect.any(Number),
1192 idleWorkerNodes: expect.any(Number),
1193 busyWorkerNodes: expect.any(Number),
1194 executedTasks: expect.any(Number),
1195 executingTasks: expect.any(Number),
1196 failedTasks: expect.any(Number)
1198 await pool.destroy()
1201 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1202 const pool = new DynamicThreadPool(
1203 Math.floor(numberOfWorkers / 2),
1205 './tests/worker-files/thread/testWorker.mjs'
1207 expect(pool.emitter.eventNames()).toStrictEqual([])
1208 const promises = new Set()
1211 pool.emitter.on(PoolEvents.full, info => {
1215 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1216 for (let i = 0; i < numberOfWorkers * 2; i++) {
1217 promises.add(pool.execute())
1219 await Promise.all(promises)
1220 expect(poolFull).toBe(1)
1221 expect(poolInfo).toStrictEqual({
1223 type: PoolTypes.dynamic,
1224 worker: WorkerTypes.thread,
1227 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1228 minSize: expect.any(Number),
1229 maxSize: expect.any(Number),
1230 workerNodes: expect.any(Number),
1231 idleWorkerNodes: expect.any(Number),
1232 busyWorkerNodes: expect.any(Number),
1233 executedTasks: expect.any(Number),
1234 executingTasks: expect.any(Number),
1235 failedTasks: expect.any(Number)
1237 await pool.destroy()
1240 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1241 const pool = new FixedThreadPool(
1243 './tests/worker-files/thread/testWorker.mjs',
1245 enableTasksQueue: true
1248 stub(pool, 'hasBackPressure').returns(true)
1249 expect(pool.emitter.eventNames()).toStrictEqual([])
1250 const promises = new Set()
1251 let poolBackPressure = 0
1253 pool.emitter.on(PoolEvents.backPressure, info => {
1257 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1258 for (let i = 0; i < numberOfWorkers + 1; i++) {
1259 promises.add(pool.execute())
1261 await Promise.all(promises)
1262 expect(poolBackPressure).toBe(1)
1263 expect(poolInfo).toStrictEqual({
1265 type: PoolTypes.fixed,
1266 worker: WorkerTypes.thread,
1269 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1270 minSize: expect.any(Number),
1271 maxSize: expect.any(Number),
1272 workerNodes: expect.any(Number),
1273 idleWorkerNodes: expect.any(Number),
1274 busyWorkerNodes: expect.any(Number),
1275 executedTasks: expect.any(Number),
1276 executingTasks: expect.any(Number),
1277 maxQueuedTasks: expect.any(Number),
1278 queuedTasks: expect.any(Number),
1280 stolenTasks: expect.any(Number),
1281 failedTasks: expect.any(Number)
1283 expect(pool.hasBackPressure.callCount).toBe(5)
1284 await pool.destroy()
1287 it('Verify that destroy() waits for queued tasks to finish', async () => {
1288 const tasksFinishedTimeout = 2500
1289 const pool = new FixedThreadPool(
1291 './tests/worker-files/thread/asyncWorker.mjs',
1293 enableTasksQueue: true,
1294 tasksQueueOptions: { tasksFinishedTimeout }
1297 const maxMultiplier = 4
1298 let tasksFinished = 0
1299 for (const workerNode of pool.workerNodes) {
1300 workerNode.on('taskFinished', () => {
1304 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1307 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1308 const startTime = performance.now()
1309 await pool.destroy()
1310 const elapsedTime = performance.now() - startTime
1311 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1312 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1313 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1316 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1317 const tasksFinishedTimeout = 1000
1318 const pool = new FixedThreadPool(
1320 './tests/worker-files/thread/asyncWorker.mjs',
1322 enableTasksQueue: true,
1323 tasksQueueOptions: { tasksFinishedTimeout }
1326 const maxMultiplier = 4
1327 let tasksFinished = 0
1328 for (const workerNode of pool.workerNodes) {
1329 workerNode.on('taskFinished', () => {
1333 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1336 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1337 const startTime = performance.now()
1338 await pool.destroy()
1339 const elapsedTime = performance.now() - startTime
1340 expect(tasksFinished).toBe(0)
1341 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
1344 it('Verify that pool asynchronous resource track tasks execution', async () => {
1349 let resolveCalls = 0
1350 const hook = createHook({
1351 init (asyncId, type) {
1352 if (type === 'poolifier:task') {
1354 taskAsyncId = asyncId
1358 if (asyncId === taskAsyncId) beforeCalls++
1361 if (asyncId === taskAsyncId) afterCalls++
1364 if (executionAsyncId() === taskAsyncId) resolveCalls++
1367 const pool = new FixedThreadPool(
1369 './tests/worker-files/thread/testWorker.mjs'
1372 await pool.execute()
1374 expect(initCalls).toBe(1)
1375 expect(beforeCalls).toBe(1)
1376 expect(afterCalls).toBe(1)
1377 expect(resolveCalls).toBe(1)
1378 await pool.destroy()
1381 it('Verify that hasTaskFunction() is working', async () => {
1382 const dynamicThreadPool = new DynamicThreadPool(
1383 Math.floor(numberOfWorkers / 2),
1385 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1387 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1388 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1389 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1392 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1393 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1394 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1395 await dynamicThreadPool.destroy()
1396 const fixedClusterPool = new FixedClusterPool(
1398 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1400 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1401 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1402 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1405 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1406 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1407 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1408 await fixedClusterPool.destroy()
1411 it('Verify that addTaskFunction() is working', async () => {
1412 const dynamicThreadPool = new DynamicThreadPool(
1413 Math.floor(numberOfWorkers / 2),
1415 './tests/worker-files/thread/testWorker.mjs'
1417 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1419 dynamicThreadPool.addTaskFunction(0, () => {})
1420 ).rejects.toThrow(new TypeError('name argument must be a string'))
1422 dynamicThreadPool.addTaskFunction('', () => {})
1424 new TypeError('name argument must not be an empty string')
1426 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1427 new TypeError('fn argument must be a function')
1429 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1430 new TypeError('fn argument must be a function')
1432 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1436 const echoTaskFunction = data => {
1440 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1441 ).resolves.toBe(true)
1442 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1443 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1446 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1451 const taskFunctionData = { test: 'test' }
1452 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1453 expect(echoResult).toStrictEqual(taskFunctionData)
1454 for (const workerNode of dynamicThreadPool.workerNodes) {
1455 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1457 executed: expect.any(Number),
1460 sequentiallyStolen: 0,
1465 history: new CircularArray()
1468 history: new CircularArray()
1472 history: new CircularArray()
1475 history: new CircularArray()
1480 await dynamicThreadPool.destroy()
1483 it('Verify that removeTaskFunction() is working', async () => {
1484 const dynamicThreadPool = new DynamicThreadPool(
1485 Math.floor(numberOfWorkers / 2),
1487 './tests/worker-files/thread/testWorker.mjs'
1489 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1490 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1494 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1495 new Error('Cannot remove a task function not handled on the pool side')
1497 const echoTaskFunction = data => {
1500 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1501 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1502 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1505 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1510 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1513 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1514 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1515 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1519 await dynamicThreadPool.destroy()
1522 it('Verify that listTaskFunctionNames() is working', async () => {
1523 const dynamicThreadPool = new DynamicThreadPool(
1524 Math.floor(numberOfWorkers / 2),
1526 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1528 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1529 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1531 'jsonIntegerSerialization',
1535 await dynamicThreadPool.destroy()
1536 const fixedClusterPool = new FixedClusterPool(
1538 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1540 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1541 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1543 'jsonIntegerSerialization',
1547 await fixedClusterPool.destroy()
1550 it('Verify that setDefaultTaskFunction() is working', async () => {
1551 const dynamicThreadPool = new DynamicThreadPool(
1552 Math.floor(numberOfWorkers / 2),
1554 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1556 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1557 const workerId = dynamicThreadPool.workerNodes[0].info.id
1558 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1560 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1564 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1567 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1571 dynamicThreadPool.setDefaultTaskFunction('unknown')
1574 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1577 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1579 'jsonIntegerSerialization',
1584 dynamicThreadPool.setDefaultTaskFunction('factorial')
1585 ).resolves.toBe(true)
1586 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1589 'jsonIntegerSerialization',
1593 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1594 ).resolves.toBe(true)
1595 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1598 'jsonIntegerSerialization',
1601 await dynamicThreadPool.destroy()
1604 it('Verify that multiple task functions worker is working', async () => {
1605 const pool = new DynamicClusterPool(
1606 Math.floor(numberOfWorkers / 2),
1608 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1610 const data = { n: 10 }
1611 const result0 = await pool.execute(data)
1612 expect(result0).toStrictEqual({ ok: 1 })
1613 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1614 expect(result1).toStrictEqual({ ok: 1 })
1615 const result2 = await pool.execute(data, 'factorial')
1616 expect(result2).toBe(3628800)
1617 const result3 = await pool.execute(data, 'fibonacci')
1618 expect(result3).toBe(55)
1619 expect(pool.info.executingTasks).toBe(0)
1620 expect(pool.info.executedTasks).toBe(4)
1621 for (const workerNode of pool.workerNodes) {
1622 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1624 'jsonIntegerSerialization',
1628 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1629 for (const name of pool.listTaskFunctionNames()) {
1630 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1632 executed: expect.any(Number),
1636 sequentiallyStolen: 0,
1640 history: expect.any(CircularArray)
1643 history: expect.any(CircularArray)
1647 history: expect.any(CircularArray)
1650 history: expect.any(CircularArray)
1655 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1656 ).toBeGreaterThan(0)
1659 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1661 workerNode.getTaskFunctionWorkerUsage(
1662 workerNode.info.taskFunctionNames[1]
1666 await pool.destroy()
1669 it('Verify sendKillMessageToWorker()', async () => {
1670 const pool = new DynamicClusterPool(
1671 Math.floor(numberOfWorkers / 2),
1673 './tests/worker-files/cluster/testWorker.js'
1675 const workerNodeKey = 0
1677 pool.sendKillMessageToWorker(workerNodeKey)
1678 ).resolves.toBeUndefined()
1680 pool.sendKillMessageToWorker(numberOfWorkers)
1681 ).rejects.toStrictEqual(
1682 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1684 await pool.destroy()
1687 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1688 const pool = new DynamicClusterPool(
1689 Math.floor(numberOfWorkers / 2),
1691 './tests/worker-files/cluster/testWorker.js'
1693 const workerNodeKey = 0
1695 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1696 taskFunctionOperation: 'add',
1697 taskFunctionName: 'empty',
1698 taskFunction: (() => {}).toString()
1700 ).resolves.toBe(true)
1702 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1703 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1704 await pool.destroy()
1707 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1708 const pool = new DynamicClusterPool(
1709 Math.floor(numberOfWorkers / 2),
1711 './tests/worker-files/cluster/testWorker.js'
1714 pool.sendTaskFunctionOperationToWorkers({
1715 taskFunctionOperation: 'add',
1716 taskFunctionName: 'empty',
1717 taskFunction: (() => {}).toString()
1719 ).resolves.toBe(true)
1720 for (const workerNode of pool.workerNodes) {
1721 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1727 await pool.destroy()