1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that dynamic pool sizing is checked', () => {
130 new DynamicClusterPool(
133 './tests/worker-files/cluster/testWorker.js'
137 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
142 new DynamicThreadPool(
145 './tests/worker-files/thread/testWorker.mjs'
149 'Cannot instantiate a pool with a non safe integer number of workers'
154 new DynamicClusterPool(
157 './tests/worker-files/cluster/testWorker.js'
161 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
166 new DynamicThreadPool(
169 './tests/worker-files/thread/testWorker.mjs'
173 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
178 new DynamicThreadPool(
181 './tests/worker-files/thread/testWorker.mjs'
185 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
190 new DynamicClusterPool(
193 './tests/worker-files/cluster/testWorker.js'
197 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
202 it('Verify that pool options are checked', async () => {
203 let pool = new FixedThreadPool(
205 './tests/worker-files/thread/testWorker.mjs'
207 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
208 expect(pool.opts).toStrictEqual({
211 restartWorkerOnError: true,
212 enableTasksQueue: false,
213 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
214 workerChoiceStrategyOptions: {
216 runTime: { median: false },
217 waitTime: { median: false },
218 elu: { median: false }
221 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
223 runTime: { median: false },
224 waitTime: { median: false },
225 elu: { median: false }
227 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
228 .workerChoiceStrategies) {
229 expect(workerChoiceStrategy.opts).toStrictEqual({
231 runTime: { median: false },
232 waitTime: { median: false },
233 elu: { median: false }
237 const testHandler = () => console.info('test handler executed')
238 pool = new FixedThreadPool(
240 './tests/worker-files/thread/testWorker.mjs',
242 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
243 workerChoiceStrategyOptions: {
244 runTime: { median: true },
245 weights: { 0: 300, 1: 200 }
248 restartWorkerOnError: false,
249 enableTasksQueue: true,
250 tasksQueueOptions: { concurrency: 2 },
251 messageHandler: testHandler,
252 errorHandler: testHandler,
253 onlineHandler: testHandler,
254 exitHandler: testHandler
257 expect(pool.emitter).toBeUndefined()
258 expect(pool.opts).toStrictEqual({
261 restartWorkerOnError: false,
262 enableTasksQueue: true,
265 size: Math.pow(numberOfWorkers, 2),
267 tasksStealingOnBackPressure: true,
268 tasksFinishedTimeout: 2000
270 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
271 workerChoiceStrategyOptions: {
273 runTime: { median: true },
274 waitTime: { median: false },
275 elu: { median: false },
276 weights: { 0: 300, 1: 200 }
278 onlineHandler: testHandler,
279 messageHandler: testHandler,
280 errorHandler: testHandler,
281 exitHandler: testHandler
283 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
285 runTime: { median: true },
286 waitTime: { median: false },
287 elu: { median: false },
288 weights: { 0: 300, 1: 200 }
290 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
291 .workerChoiceStrategies) {
292 expect(workerChoiceStrategy.opts).toStrictEqual({
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
303 it('Verify that pool options are validated', () => {
308 './tests/worker-files/thread/testWorker.mjs',
310 workerChoiceStrategy: 'invalidStrategy'
313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
318 './tests/worker-files/thread/testWorker.mjs',
320 workerChoiceStrategyOptions: {
321 retries: 'invalidChoiceRetries'
327 'Invalid worker choice strategy options: retries must be an integer'
334 './tests/worker-files/thread/testWorker.mjs',
336 workerChoiceStrategyOptions: {
343 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
350 './tests/worker-files/thread/testWorker.mjs',
352 workerChoiceStrategyOptions: { weights: {} }
357 'Invalid worker choice strategy options: must have a weight for each worker node'
364 './tests/worker-files/thread/testWorker.mjs',
366 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
371 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
378 './tests/worker-files/thread/testWorker.mjs',
380 enableTasksQueue: true,
381 tasksQueueOptions: 'invalidTasksQueueOptions'
385 new TypeError('Invalid tasks queue options: must be a plain object')
391 './tests/worker-files/thread/testWorker.mjs',
393 enableTasksQueue: true,
394 tasksQueueOptions: { concurrency: 0 }
399 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
406 './tests/worker-files/thread/testWorker.mjs',
408 enableTasksQueue: true,
409 tasksQueueOptions: { concurrency: -1 }
414 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
421 './tests/worker-files/thread/testWorker.mjs',
423 enableTasksQueue: true,
424 tasksQueueOptions: { concurrency: 0.2 }
428 new TypeError('Invalid worker node tasks concurrency: must be an integer')
434 './tests/worker-files/thread/testWorker.mjs',
436 enableTasksQueue: true,
437 tasksQueueOptions: { size: 0 }
442 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
449 './tests/worker-files/thread/testWorker.mjs',
451 enableTasksQueue: true,
452 tasksQueueOptions: { size: -1 }
457 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
464 './tests/worker-files/thread/testWorker.mjs',
466 enableTasksQueue: true,
467 tasksQueueOptions: { size: 0.2 }
471 new TypeError('Invalid worker node tasks queue size: must be an integer')
475 it('Verify that pool worker choice strategy options can be set', async () => {
476 const pool = new FixedThreadPool(
478 './tests/worker-files/thread/testWorker.mjs',
479 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
481 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
483 runTime: { median: false },
484 waitTime: { median: false },
485 elu: { median: false }
487 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
489 runTime: { median: false },
490 waitTime: { median: false },
491 elu: { median: false }
493 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
494 .workerChoiceStrategies) {
495 expect(workerChoiceStrategy.opts).toStrictEqual({
497 runTime: { median: false },
498 waitTime: { median: false },
499 elu: { median: false }
503 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
521 pool.setWorkerChoiceStrategyOptions({
522 runTime: { median: true },
523 elu: { median: true }
525 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
527 runTime: { median: true },
528 waitTime: { median: false },
529 elu: { median: true }
531 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
533 runTime: { median: true },
534 waitTime: { median: false },
535 elu: { median: true }
537 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
538 .workerChoiceStrategies) {
539 expect(workerChoiceStrategy.opts).toStrictEqual({
541 runTime: { median: true },
542 waitTime: { median: false },
543 elu: { median: true }
547 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
565 pool.setWorkerChoiceStrategyOptions({
566 runTime: { median: false },
567 elu: { median: false }
569 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
571 runTime: { median: false },
572 waitTime: { median: false },
573 elu: { median: false }
575 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
577 runTime: { median: false },
578 waitTime: { median: false },
579 elu: { median: false }
581 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
582 .workerChoiceStrategies) {
583 expect(workerChoiceStrategy.opts).toStrictEqual({
585 runTime: { median: false },
586 waitTime: { median: false },
587 elu: { median: false }
591 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
610 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
613 'Invalid worker choice strategy options: must be a plain object'
617 pool.setWorkerChoiceStrategyOptions({
618 retries: 'invalidChoiceRetries'
622 'Invalid worker choice strategy options: retries must be an integer'
625 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
627 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
630 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
632 'Invalid worker choice strategy options: must have a weight for each worker node'
636 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
639 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
645 it('Verify that pool tasks queue can be enabled/disabled', async () => {
646 const pool = new FixedThreadPool(
648 './tests/worker-files/thread/testWorker.mjs'
650 expect(pool.opts.enableTasksQueue).toBe(false)
651 expect(pool.opts.tasksQueueOptions).toBeUndefined()
652 pool.enableTasksQueue(true)
653 expect(pool.opts.enableTasksQueue).toBe(true)
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 size: Math.pow(numberOfWorkers, 2),
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 2000
661 pool.enableTasksQueue(true, { concurrency: 2 })
662 expect(pool.opts.enableTasksQueue).toBe(true)
663 expect(pool.opts.tasksQueueOptions).toStrictEqual({
665 size: Math.pow(numberOfWorkers, 2),
667 tasksStealingOnBackPressure: true,
668 tasksFinishedTimeout: 2000
670 pool.enableTasksQueue(false)
671 expect(pool.opts.enableTasksQueue).toBe(false)
672 expect(pool.opts.tasksQueueOptions).toBeUndefined()
676 it('Verify that pool tasks queue options can be set', async () => {
677 const pool = new FixedThreadPool(
679 './tests/worker-files/thread/testWorker.mjs',
680 { enableTasksQueue: true }
682 expect(pool.opts.tasksQueueOptions).toStrictEqual({
684 size: Math.pow(numberOfWorkers, 2),
686 tasksStealingOnBackPressure: true,
687 tasksFinishedTimeout: 2000
689 for (const workerNode of pool.workerNodes) {
690 expect(workerNode.tasksQueueBackPressureSize).toBe(
691 pool.opts.tasksQueueOptions.size
694 pool.setTasksQueueOptions({
698 tasksStealingOnBackPressure: false,
699 tasksFinishedTimeout: 3000
701 expect(pool.opts.tasksQueueOptions).toStrictEqual({
705 tasksStealingOnBackPressure: false,
706 tasksFinishedTimeout: 3000
708 for (const workerNode of pool.workerNodes) {
709 expect(workerNode.tasksQueueBackPressureSize).toBe(
710 pool.opts.tasksQueueOptions.size
713 pool.setTasksQueueOptions({
716 tasksStealingOnBackPressure: true
718 expect(pool.opts.tasksQueueOptions).toStrictEqual({
720 size: Math.pow(numberOfWorkers, 2),
722 tasksStealingOnBackPressure: true,
723 tasksFinishedTimeout: 2000
725 for (const workerNode of pool.workerNodes) {
726 expect(workerNode.tasksQueueBackPressureSize).toBe(
727 pool.opts.tasksQueueOptions.size
730 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
731 new TypeError('Invalid tasks queue options: must be a plain object')
733 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
735 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
738 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
740 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
743 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
744 new TypeError('Invalid worker node tasks concurrency: must be an integer')
746 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
748 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
751 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
753 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
756 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
757 new TypeError('Invalid worker node tasks queue size: must be an integer')
762 it('Verify that pool info is set', async () => {
763 let pool = new FixedThreadPool(
765 './tests/worker-files/thread/testWorker.mjs'
767 expect(pool.info).toStrictEqual({
769 type: PoolTypes.fixed,
770 worker: WorkerTypes.thread,
773 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
774 minSize: numberOfWorkers,
775 maxSize: numberOfWorkers,
776 workerNodes: numberOfWorkers,
777 idleWorkerNodes: numberOfWorkers,
784 pool = new DynamicClusterPool(
785 Math.floor(numberOfWorkers / 2),
787 './tests/worker-files/cluster/testWorker.js'
789 expect(pool.info).toStrictEqual({
791 type: PoolTypes.dynamic,
792 worker: WorkerTypes.cluster,
795 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
796 minSize: Math.floor(numberOfWorkers / 2),
797 maxSize: numberOfWorkers,
798 workerNodes: Math.floor(numberOfWorkers / 2),
799 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
808 it('Verify that pool worker tasks usage are initialized', async () => {
809 const pool = new FixedClusterPool(
811 './tests/worker-files/cluster/testWorker.js'
813 for (const workerNode of pool.workerNodes) {
814 expect(workerNode).toBeInstanceOf(WorkerNode)
815 expect(workerNode.usage).toStrictEqual({
821 sequentiallyStolen: 0,
826 history: new CircularArray()
829 history: new CircularArray()
833 history: new CircularArray()
836 history: new CircularArray()
844 it('Verify that pool worker tasks queue are initialized', async () => {
845 let pool = new FixedClusterPool(
847 './tests/worker-files/cluster/testWorker.js'
849 for (const workerNode of pool.workerNodes) {
850 expect(workerNode).toBeInstanceOf(WorkerNode)
851 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
856 pool = new DynamicThreadPool(
857 Math.floor(numberOfWorkers / 2),
859 './tests/worker-files/thread/testWorker.mjs'
861 for (const workerNode of pool.workerNodes) {
862 expect(workerNode).toBeInstanceOf(WorkerNode)
863 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
864 expect(workerNode.tasksQueue.size).toBe(0)
865 expect(workerNode.tasksQueue.maxSize).toBe(0)
870 it('Verify that pool worker info are initialized', async () => {
871 let pool = new FixedClusterPool(
873 './tests/worker-files/cluster/testWorker.js'
875 for (const workerNode of pool.workerNodes) {
876 expect(workerNode).toBeInstanceOf(WorkerNode)
877 expect(workerNode.info).toStrictEqual({
878 id: expect.any(Number),
879 type: WorkerTypes.cluster,
885 pool = new DynamicThreadPool(
886 Math.floor(numberOfWorkers / 2),
888 './tests/worker-files/thread/testWorker.mjs'
890 for (const workerNode of pool.workerNodes) {
891 expect(workerNode).toBeInstanceOf(WorkerNode)
892 expect(workerNode.info).toStrictEqual({
893 id: expect.any(Number),
894 type: WorkerTypes.thread,
902 it('Verify that pool statuses are checked at start or destroy', async () => {
903 const pool = new FixedThreadPool(
905 './tests/worker-files/thread/testWorker.mjs'
907 expect(pool.info.started).toBe(true)
908 expect(pool.info.ready).toBe(true)
909 expect(() => pool.start()).toThrow(
910 new Error('Cannot start an already started pool')
913 expect(pool.info.started).toBe(false)
914 expect(pool.info.ready).toBe(false)
915 await expect(pool.destroy()).rejects.toThrow(
916 new Error('Cannot destroy an already destroyed pool')
920 it('Verify that pool can be started after initialization', async () => {
921 const pool = new FixedClusterPool(
923 './tests/worker-files/cluster/testWorker.js',
928 expect(pool.info.started).toBe(false)
929 expect(pool.info.ready).toBe(false)
930 expect(pool.readyEventEmitted).toBe(false)
931 expect(pool.workerNodes).toStrictEqual([])
932 await expect(pool.execute()).rejects.toThrow(
933 new Error('Cannot execute a task on not started pool')
936 expect(pool.info.started).toBe(true)
937 expect(pool.info.ready).toBe(true)
938 await waitPoolEvents(pool, PoolEvents.ready, 1)
939 expect(pool.readyEventEmitted).toBe(true)
940 expect(pool.workerNodes.length).toBe(numberOfWorkers)
941 for (const workerNode of pool.workerNodes) {
942 expect(workerNode).toBeInstanceOf(WorkerNode)
947 it('Verify that pool execute() arguments are checked', async () => {
948 const pool = new FixedClusterPool(
950 './tests/worker-files/cluster/testWorker.js'
952 await expect(pool.execute(undefined, 0)).rejects.toThrow(
953 new TypeError('name argument must be a string')
955 await expect(pool.execute(undefined, '')).rejects.toThrow(
956 new TypeError('name argument must not be an empty string')
958 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
959 new TypeError('transferList argument must be an array')
961 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
962 "Task function 'unknown' not found"
965 await expect(pool.execute()).rejects.toThrow(
966 new Error('Cannot execute a task on not started pool')
970 it('Verify that pool worker tasks usage are computed', async () => {
971 const pool = new FixedClusterPool(
973 './tests/worker-files/cluster/testWorker.js'
975 const promises = new Set()
976 const maxMultiplier = 2
977 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
978 promises.add(pool.execute())
980 for (const workerNode of pool.workerNodes) {
981 expect(workerNode.usage).toStrictEqual({
984 executing: maxMultiplier,
987 sequentiallyStolen: 0,
992 history: expect.any(CircularArray)
995 history: expect.any(CircularArray)
999 history: expect.any(CircularArray)
1002 history: expect.any(CircularArray)
1007 await Promise.all(promises)
1008 for (const workerNode of pool.workerNodes) {
1009 expect(workerNode.usage).toStrictEqual({
1011 executed: maxMultiplier,
1015 sequentiallyStolen: 0,
1020 history: expect.any(CircularArray)
1023 history: expect.any(CircularArray)
1027 history: expect.any(CircularArray)
1030 history: expect.any(CircularArray)
1035 await pool.destroy()
1038 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1039 const pool = new DynamicThreadPool(
1040 Math.floor(numberOfWorkers / 2),
1042 './tests/worker-files/thread/testWorker.mjs'
1044 const promises = new Set()
1045 const maxMultiplier = 2
1046 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1047 promises.add(pool.execute())
1049 await Promise.all(promises)
1050 for (const workerNode of pool.workerNodes) {
1051 expect(workerNode.usage).toStrictEqual({
1053 executed: expect.any(Number),
1057 sequentiallyStolen: 0,
1062 history: expect.any(CircularArray)
1065 history: expect.any(CircularArray)
1069 history: expect.any(CircularArray)
1072 history: expect.any(CircularArray)
1076 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1077 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1078 numberOfWorkers * maxMultiplier
1080 expect(workerNode.usage.runTime.history.length).toBe(0)
1081 expect(workerNode.usage.waitTime.history.length).toBe(0)
1082 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1083 expect(workerNode.usage.elu.active.history.length).toBe(0)
1085 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1086 for (const workerNode of pool.workerNodes) {
1087 expect(workerNode.usage).toStrictEqual({
1093 sequentiallyStolen: 0,
1098 history: expect.any(CircularArray)
1101 history: expect.any(CircularArray)
1105 history: expect.any(CircularArray)
1108 history: expect.any(CircularArray)
1112 expect(workerNode.usage.runTime.history.length).toBe(0)
1113 expect(workerNode.usage.waitTime.history.length).toBe(0)
1114 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1115 expect(workerNode.usage.elu.active.history.length).toBe(0)
1117 await pool.destroy()
1120 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1121 const pool = new DynamicClusterPool(
1122 Math.floor(numberOfWorkers / 2),
1124 './tests/worker-files/cluster/testWorker.js'
1126 expect(pool.emitter.eventNames()).toStrictEqual([])
1129 pool.emitter.on(PoolEvents.ready, info => {
1133 await waitPoolEvents(pool, PoolEvents.ready, 1)
1134 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1135 expect(poolReady).toBe(1)
1136 expect(poolInfo).toStrictEqual({
1138 type: PoolTypes.dynamic,
1139 worker: WorkerTypes.cluster,
1142 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1143 minSize: expect.any(Number),
1144 maxSize: expect.any(Number),
1145 workerNodes: expect.any(Number),
1146 idleWorkerNodes: expect.any(Number),
1147 busyWorkerNodes: expect.any(Number),
1148 executedTasks: expect.any(Number),
1149 executingTasks: expect.any(Number),
1150 failedTasks: expect.any(Number)
1152 await pool.destroy()
1155 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1156 const pool = new FixedThreadPool(
1158 './tests/worker-files/thread/testWorker.mjs'
1160 expect(pool.emitter.eventNames()).toStrictEqual([])
1161 const promises = new Set()
1164 pool.emitter.on(PoolEvents.busy, info => {
1168 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1169 for (let i = 0; i < numberOfWorkers * 2; i++) {
1170 promises.add(pool.execute())
1172 await Promise.all(promises)
1173 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1174 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1175 expect(poolBusy).toBe(numberOfWorkers + 1)
1176 expect(poolInfo).toStrictEqual({
1178 type: PoolTypes.fixed,
1179 worker: WorkerTypes.thread,
1182 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1183 minSize: expect.any(Number),
1184 maxSize: expect.any(Number),
1185 workerNodes: expect.any(Number),
1186 idleWorkerNodes: expect.any(Number),
1187 busyWorkerNodes: expect.any(Number),
1188 executedTasks: expect.any(Number),
1189 executingTasks: expect.any(Number),
1190 failedTasks: expect.any(Number)
1192 await pool.destroy()
1195 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1196 const pool = new DynamicThreadPool(
1197 Math.floor(numberOfWorkers / 2),
1199 './tests/worker-files/thread/testWorker.mjs'
1201 expect(pool.emitter.eventNames()).toStrictEqual([])
1202 const promises = new Set()
1205 pool.emitter.on(PoolEvents.full, info => {
1209 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1210 for (let i = 0; i < numberOfWorkers * 2; i++) {
1211 promises.add(pool.execute())
1213 await Promise.all(promises)
1214 expect(poolFull).toBe(1)
1215 expect(poolInfo).toStrictEqual({
1217 type: PoolTypes.dynamic,
1218 worker: WorkerTypes.thread,
1221 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1222 minSize: expect.any(Number),
1223 maxSize: expect.any(Number),
1224 workerNodes: expect.any(Number),
1225 idleWorkerNodes: expect.any(Number),
1226 busyWorkerNodes: expect.any(Number),
1227 executedTasks: expect.any(Number),
1228 executingTasks: expect.any(Number),
1229 failedTasks: expect.any(Number)
1231 await pool.destroy()
1234 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1235 const pool = new FixedThreadPool(
1237 './tests/worker-files/thread/testWorker.mjs',
1239 enableTasksQueue: true
1242 stub(pool, 'hasBackPressure').returns(true)
1243 expect(pool.emitter.eventNames()).toStrictEqual([])
1244 const promises = new Set()
1245 let poolBackPressure = 0
1247 pool.emitter.on(PoolEvents.backPressure, info => {
1251 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1252 for (let i = 0; i < numberOfWorkers + 1; i++) {
1253 promises.add(pool.execute())
1255 await Promise.all(promises)
1256 expect(poolBackPressure).toBe(1)
1257 expect(poolInfo).toStrictEqual({
1259 type: PoolTypes.fixed,
1260 worker: WorkerTypes.thread,
1263 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1264 minSize: expect.any(Number),
1265 maxSize: expect.any(Number),
1266 workerNodes: expect.any(Number),
1267 idleWorkerNodes: expect.any(Number),
1268 busyWorkerNodes: expect.any(Number),
1269 executedTasks: expect.any(Number),
1270 executingTasks: expect.any(Number),
1271 maxQueuedTasks: expect.any(Number),
1272 queuedTasks: expect.any(Number),
1274 stolenTasks: expect.any(Number),
1275 failedTasks: expect.any(Number)
1277 expect(pool.hasBackPressure.callCount).toBe(5)
1278 await pool.destroy()
1281 it('Verify that destroy() waits for queued tasks to finish', async () => {
1282 const tasksFinishedTimeout = 2500
1283 const pool = new FixedThreadPool(
1285 './tests/worker-files/thread/asyncWorker.mjs',
1287 enableTasksQueue: true,
1288 tasksQueueOptions: { tasksFinishedTimeout }
1291 const maxMultiplier = 4
1292 let tasksFinished = 0
1293 for (const workerNode of pool.workerNodes) {
1294 workerNode.on('taskFinished', () => {
1298 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1301 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1302 const startTime = performance.now()
1303 await pool.destroy()
1304 const elapsedTime = performance.now() - startTime
1305 expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
1306 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1307 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
1310 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1311 const tasksFinishedTimeout = 1000
1312 const pool = new FixedThreadPool(
1314 './tests/worker-files/thread/asyncWorker.mjs',
1316 enableTasksQueue: true,
1317 tasksQueueOptions: { tasksFinishedTimeout }
1320 const maxMultiplier = 4
1321 let tasksFinished = 0
1322 for (const workerNode of pool.workerNodes) {
1323 workerNode.on('taskFinished', () => {
1327 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1330 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1331 const startTime = performance.now()
1332 await pool.destroy()
1333 const elapsedTime = performance.now() - startTime
1334 expect(tasksFinished).toBe(0)
1335 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 300)
1338 it('Verify that pool asynchronous resource track tasks execution', async () => {
1343 let resolveCalls = 0
1344 const hook = createHook({
1345 init (asyncId, type) {
1346 if (type === 'poolifier:task') {
1348 taskAsyncId = asyncId
1352 if (asyncId === taskAsyncId) beforeCalls++
1355 if (asyncId === taskAsyncId) afterCalls++
1358 if (executionAsyncId() === taskAsyncId) resolveCalls++
1361 const pool = new FixedThreadPool(
1363 './tests/worker-files/thread/testWorker.mjs'
1366 await pool.execute()
1368 expect(initCalls).toBe(1)
1369 expect(beforeCalls).toBe(1)
1370 expect(afterCalls).toBe(1)
1371 expect(resolveCalls).toBe(1)
1372 await pool.destroy()
1375 it('Verify that hasTaskFunction() is working', async () => {
1376 const dynamicThreadPool = new DynamicThreadPool(
1377 Math.floor(numberOfWorkers / 2),
1379 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1381 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1382 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1383 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1386 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1387 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1388 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1389 await dynamicThreadPool.destroy()
1390 const fixedClusterPool = new FixedClusterPool(
1392 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1394 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1395 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1396 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1399 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1400 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1401 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1402 await fixedClusterPool.destroy()
1405 it('Verify that addTaskFunction() is working', async () => {
1406 const dynamicThreadPool = new DynamicThreadPool(
1407 Math.floor(numberOfWorkers / 2),
1409 './tests/worker-files/thread/testWorker.mjs'
1411 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1413 dynamicThreadPool.addTaskFunction(0, () => {})
1414 ).rejects.toThrow(new TypeError('name argument must be a string'))
1416 dynamicThreadPool.addTaskFunction('', () => {})
1418 new TypeError('name argument must not be an empty string')
1420 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1421 new TypeError('fn argument must be a function')
1423 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1424 new TypeError('fn argument must be a function')
1426 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1430 const echoTaskFunction = data => {
1434 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1435 ).resolves.toBe(true)
1436 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1437 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1440 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1445 const taskFunctionData = { test: 'test' }
1446 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1447 expect(echoResult).toStrictEqual(taskFunctionData)
1448 for (const workerNode of dynamicThreadPool.workerNodes) {
1449 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1451 executed: expect.any(Number),
1454 sequentiallyStolen: 0,
1459 history: new CircularArray()
1462 history: new CircularArray()
1466 history: new CircularArray()
1469 history: new CircularArray()
1474 await dynamicThreadPool.destroy()
1477 it('Verify that removeTaskFunction() is working', async () => {
1478 const dynamicThreadPool = new DynamicThreadPool(
1479 Math.floor(numberOfWorkers / 2),
1481 './tests/worker-files/thread/testWorker.mjs'
1483 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1484 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1488 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1489 new Error('Cannot remove a task function not handled on the pool side')
1491 const echoTaskFunction = data => {
1494 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1495 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1496 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1499 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1504 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1507 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1508 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1509 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1513 await dynamicThreadPool.destroy()
1516 it('Verify that listTaskFunctionNames() is working', async () => {
1517 const dynamicThreadPool = new DynamicThreadPool(
1518 Math.floor(numberOfWorkers / 2),
1520 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1522 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1523 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1525 'jsonIntegerSerialization',
1529 await dynamicThreadPool.destroy()
1530 const fixedClusterPool = new FixedClusterPool(
1532 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1534 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1535 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1537 'jsonIntegerSerialization',
1541 await fixedClusterPool.destroy()
1544 it('Verify that setDefaultTaskFunction() is working', async () => {
1545 const dynamicThreadPool = new DynamicThreadPool(
1546 Math.floor(numberOfWorkers / 2),
1548 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1550 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1551 const workerId = dynamicThreadPool.workerNodes[0].info.id
1552 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1554 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1558 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1561 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1565 dynamicThreadPool.setDefaultTaskFunction('unknown')
1568 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1571 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1573 'jsonIntegerSerialization',
1578 dynamicThreadPool.setDefaultTaskFunction('factorial')
1579 ).resolves.toBe(true)
1580 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1583 'jsonIntegerSerialization',
1587 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1588 ).resolves.toBe(true)
1589 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1592 'jsonIntegerSerialization',
1595 await dynamicThreadPool.destroy()
1598 it('Verify that multiple task functions worker is working', async () => {
1599 const pool = new DynamicClusterPool(
1600 Math.floor(numberOfWorkers / 2),
1602 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1604 const data = { n: 10 }
1605 const result0 = await pool.execute(data)
1606 expect(result0).toStrictEqual({ ok: 1 })
1607 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1608 expect(result1).toStrictEqual({ ok: 1 })
1609 const result2 = await pool.execute(data, 'factorial')
1610 expect(result2).toBe(3628800)
1611 const result3 = await pool.execute(data, 'fibonacci')
1612 expect(result3).toBe(55)
1613 expect(pool.info.executingTasks).toBe(0)
1614 expect(pool.info.executedTasks).toBe(4)
1615 for (const workerNode of pool.workerNodes) {
1616 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1618 'jsonIntegerSerialization',
1622 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1623 for (const name of pool.listTaskFunctionNames()) {
1624 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1626 executed: expect.any(Number),
1630 sequentiallyStolen: 0,
1634 history: expect.any(CircularArray)
1637 history: expect.any(CircularArray)
1641 history: expect.any(CircularArray)
1644 history: expect.any(CircularArray)
1649 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1650 ).toBeGreaterThan(0)
1653 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1655 workerNode.getTaskFunctionWorkerUsage(
1656 workerNode.info.taskFunctionNames[1]
1660 await pool.destroy()
1663 it('Verify sendKillMessageToWorker()', async () => {
1664 const pool = new DynamicClusterPool(
1665 Math.floor(numberOfWorkers / 2),
1667 './tests/worker-files/cluster/testWorker.js'
1669 const workerNodeKey = 0
1671 pool.sendKillMessageToWorker(workerNodeKey)
1672 ).resolves.toBeUndefined()
1674 pool.sendKillMessageToWorker(numberOfWorkers)
1675 ).rejects.toStrictEqual(
1676 new Error(`Invalid worker node key '${numberOfWorkers}'`)
1678 await pool.destroy()
1681 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1682 const pool = new DynamicClusterPool(
1683 Math.floor(numberOfWorkers / 2),
1685 './tests/worker-files/cluster/testWorker.js'
1687 const workerNodeKey = 0
1689 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1690 taskFunctionOperation: 'add',
1691 taskFunctionName: 'empty',
1692 taskFunction: (() => {}).toString()
1694 ).resolves.toBe(true)
1696 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1697 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1698 await pool.destroy()
1701 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1702 const pool = new DynamicClusterPool(
1703 Math.floor(numberOfWorkers / 2),
1705 './tests/worker-files/cluster/testWorker.js'
1708 pool.sendTaskFunctionOperationToWorkers({
1709 taskFunctionOperation: 'add',
1710 taskFunctionName: 'empty',
1711 taskFunction: (() => {}).toString()
1713 ).resolves.toBe(true)
1714 for (const workerNode of pool.workerNodes) {
1715 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1721 await pool.destroy()