1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.js'
18 import { CircularArray } from '../../lib/circular-array.js'
19 import { Deque } from '../../lib/deque.js'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
21 import { waitPoolEvents } from '../test-utils.js'
22 import { WorkerNode } from '../../lib/pools/worker-node.js'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that dynamic pool sizing is checked', () => {
130 new DynamicClusterPool(
133 './tests/worker-files/cluster/testWorker.js'
137 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
142 new DynamicThreadPool(
145 './tests/worker-files/thread/testWorker.mjs'
149 'Cannot instantiate a pool with a non safe integer number of workers'
154 new DynamicClusterPool(
157 './tests/worker-files/cluster/testWorker.js'
161 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
166 new DynamicThreadPool(
169 './tests/worker-files/thread/testWorker.mjs'
173 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
178 new DynamicThreadPool(
181 './tests/worker-files/thread/testWorker.mjs'
185 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
190 new DynamicClusterPool(
193 './tests/worker-files/cluster/testWorker.js'
197 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
202 it('Verify that pool options are checked', async () => {
203 let pool = new FixedThreadPool(
205 './tests/worker-files/thread/testWorker.mjs'
207 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
208 expect(pool.opts).toStrictEqual({
211 restartWorkerOnError: true,
212 enableTasksQueue: false,
213 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
214 workerChoiceStrategyOptions: {
216 runTime: { median: false },
217 waitTime: { median: false },
218 elu: { median: false }
221 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
223 runTime: { median: false },
224 waitTime: { median: false },
225 elu: { median: false }
227 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
228 .workerChoiceStrategies) {
229 expect(workerChoiceStrategy.opts).toStrictEqual({
231 runTime: { median: false },
232 waitTime: { median: false },
233 elu: { median: false }
237 const testHandler = () => console.info('test handler executed')
238 pool = new FixedThreadPool(
240 './tests/worker-files/thread/testWorker.mjs',
242 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
243 workerChoiceStrategyOptions: {
244 runTime: { median: true },
245 weights: { 0: 300, 1: 200 }
248 restartWorkerOnError: false,
249 enableTasksQueue: true,
250 tasksQueueOptions: { concurrency: 2 },
251 messageHandler: testHandler,
252 errorHandler: testHandler,
253 onlineHandler: testHandler,
254 exitHandler: testHandler
257 expect(pool.emitter).toBeUndefined()
258 expect(pool.opts).toStrictEqual({
261 restartWorkerOnError: false,
262 enableTasksQueue: true,
265 size: Math.pow(numberOfWorkers, 2),
267 tasksStealingOnBackPressure: true,
268 tasksFinishedTimeout: 1000
270 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
271 workerChoiceStrategyOptions: {
273 runTime: { median: true },
274 waitTime: { median: false },
275 elu: { median: false },
276 weights: { 0: 300, 1: 200 }
278 onlineHandler: testHandler,
279 messageHandler: testHandler,
280 errorHandler: testHandler,
281 exitHandler: testHandler
283 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
285 runTime: { median: true },
286 waitTime: { median: false },
287 elu: { median: false },
288 weights: { 0: 300, 1: 200 }
290 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
291 .workerChoiceStrategies) {
292 expect(workerChoiceStrategy.opts).toStrictEqual({
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
303 it('Verify that pool options are validated', () => {
308 './tests/worker-files/thread/testWorker.mjs',
310 workerChoiceStrategy: 'invalidStrategy'
313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
318 './tests/worker-files/thread/testWorker.mjs',
320 workerChoiceStrategyOptions: {
321 retries: 'invalidChoiceRetries'
327 'Invalid worker choice strategy options: retries must be an integer'
334 './tests/worker-files/thread/testWorker.mjs',
336 workerChoiceStrategyOptions: {
343 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
350 './tests/worker-files/thread/testWorker.mjs',
352 workerChoiceStrategyOptions: { weights: {} }
357 'Invalid worker choice strategy options: must have a weight for each worker node'
364 './tests/worker-files/thread/testWorker.mjs',
366 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
371 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
378 './tests/worker-files/thread/testWorker.mjs',
380 enableTasksQueue: true,
381 tasksQueueOptions: 'invalidTasksQueueOptions'
385 new TypeError('Invalid tasks queue options: must be a plain object')
391 './tests/worker-files/thread/testWorker.mjs',
393 enableTasksQueue: true,
394 tasksQueueOptions: { concurrency: 0 }
399 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
406 './tests/worker-files/thread/testWorker.mjs',
408 enableTasksQueue: true,
409 tasksQueueOptions: { concurrency: -1 }
414 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
421 './tests/worker-files/thread/testWorker.mjs',
423 enableTasksQueue: true,
424 tasksQueueOptions: { concurrency: 0.2 }
428 new TypeError('Invalid worker node tasks concurrency: must be an integer')
434 './tests/worker-files/thread/testWorker.mjs',
436 enableTasksQueue: true,
437 tasksQueueOptions: { size: 0 }
442 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
449 './tests/worker-files/thread/testWorker.mjs',
451 enableTasksQueue: true,
452 tasksQueueOptions: { size: -1 }
457 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
464 './tests/worker-files/thread/testWorker.mjs',
466 enableTasksQueue: true,
467 tasksQueueOptions: { size: 0.2 }
471 new TypeError('Invalid worker node tasks queue size: must be an integer')
475 it('Verify that pool worker choice strategy options can be set', async () => {
476 const pool = new FixedThreadPool(
478 './tests/worker-files/thread/testWorker.mjs',
479 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
481 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
483 runTime: { median: false },
484 waitTime: { median: false },
485 elu: { median: false }
487 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
489 runTime: { median: false },
490 waitTime: { median: false },
491 elu: { median: false }
493 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
494 .workerChoiceStrategies) {
495 expect(workerChoiceStrategy.opts).toStrictEqual({
497 runTime: { median: false },
498 waitTime: { median: false },
499 elu: { median: false }
503 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
521 pool.setWorkerChoiceStrategyOptions({
522 runTime: { median: true },
523 elu: { median: true }
525 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
527 runTime: { median: true },
528 waitTime: { median: false },
529 elu: { median: true }
531 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
533 runTime: { median: true },
534 waitTime: { median: false },
535 elu: { median: true }
537 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
538 .workerChoiceStrategies) {
539 expect(workerChoiceStrategy.opts).toStrictEqual({
541 runTime: { median: true },
542 waitTime: { median: false },
543 elu: { median: true }
547 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
565 pool.setWorkerChoiceStrategyOptions({
566 runTime: { median: false },
567 elu: { median: false }
569 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
571 runTime: { median: false },
572 waitTime: { median: false },
573 elu: { median: false }
575 expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
577 runTime: { median: false },
578 waitTime: { median: false },
579 elu: { median: false }
581 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
582 .workerChoiceStrategies) {
583 expect(workerChoiceStrategy.opts).toStrictEqual({
585 runTime: { median: false },
586 waitTime: { median: false },
587 elu: { median: false }
591 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
610 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
613 'Invalid worker choice strategy options: must be a plain object'
617 pool.setWorkerChoiceStrategyOptions({
618 retries: 'invalidChoiceRetries'
622 'Invalid worker choice strategy options: retries must be an integer'
625 expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
627 "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
630 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
632 'Invalid worker choice strategy options: must have a weight for each worker node'
636 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
639 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
645 it('Verify that pool tasks queue can be enabled/disabled', async () => {
646 const pool = new FixedThreadPool(
648 './tests/worker-files/thread/testWorker.mjs'
650 expect(pool.opts.enableTasksQueue).toBe(false)
651 expect(pool.opts.tasksQueueOptions).toBeUndefined()
652 pool.enableTasksQueue(true)
653 expect(pool.opts.enableTasksQueue).toBe(true)
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 size: Math.pow(numberOfWorkers, 2),
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 1000
661 pool.enableTasksQueue(true, { concurrency: 2 })
662 expect(pool.opts.enableTasksQueue).toBe(true)
663 expect(pool.opts.tasksQueueOptions).toStrictEqual({
665 size: Math.pow(numberOfWorkers, 2),
667 tasksStealingOnBackPressure: true,
668 tasksFinishedTimeout: 1000
670 pool.enableTasksQueue(false)
671 expect(pool.opts.enableTasksQueue).toBe(false)
672 expect(pool.opts.tasksQueueOptions).toBeUndefined()
676 it('Verify that pool tasks queue options can be set', async () => {
677 const pool = new FixedThreadPool(
679 './tests/worker-files/thread/testWorker.mjs',
680 { enableTasksQueue: true }
682 expect(pool.opts.tasksQueueOptions).toStrictEqual({
684 size: Math.pow(numberOfWorkers, 2),
686 tasksStealingOnBackPressure: true,
687 tasksFinishedTimeout: 1000
689 for (const workerNode of pool.workerNodes) {
690 expect(workerNode.tasksQueueBackPressureSize).toBe(
691 pool.opts.tasksQueueOptions.size
694 pool.setTasksQueueOptions({
698 tasksStealingOnBackPressure: false,
699 tasksFinishedTimeout: 2000
701 expect(pool.opts.tasksQueueOptions).toStrictEqual({
705 tasksStealingOnBackPressure: false,
706 tasksFinishedTimeout: 2000
708 for (const workerNode of pool.workerNodes) {
709 expect(workerNode.tasksQueueBackPressureSize).toBe(
710 pool.opts.tasksQueueOptions.size
713 pool.setTasksQueueOptions({
716 tasksStealingOnBackPressure: true
718 expect(pool.opts.tasksQueueOptions).toStrictEqual({
720 size: Math.pow(numberOfWorkers, 2),
722 tasksStealingOnBackPressure: true,
723 tasksFinishedTimeout: 1000
725 for (const workerNode of pool.workerNodes) {
726 expect(workerNode.tasksQueueBackPressureSize).toBe(
727 pool.opts.tasksQueueOptions.size
730 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
731 new TypeError('Invalid tasks queue options: must be a plain object')
733 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
735 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
738 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
740 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
743 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
744 new TypeError('Invalid worker node tasks concurrency: must be an integer')
746 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
748 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
751 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
753 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
756 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
757 new TypeError('Invalid worker node tasks queue size: must be an integer')
762 it('Verify that pool info is set', async () => {
763 let pool = new FixedThreadPool(
765 './tests/worker-files/thread/testWorker.mjs'
767 expect(pool.info).toStrictEqual({
769 type: PoolTypes.fixed,
770 worker: WorkerTypes.thread,
773 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
774 minSize: numberOfWorkers,
775 maxSize: numberOfWorkers,
776 workerNodes: numberOfWorkers,
777 idleWorkerNodes: numberOfWorkers,
784 pool = new DynamicClusterPool(
785 Math.floor(numberOfWorkers / 2),
787 './tests/worker-files/cluster/testWorker.js'
789 expect(pool.info).toStrictEqual({
791 type: PoolTypes.dynamic,
792 worker: WorkerTypes.cluster,
795 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
796 minSize: Math.floor(numberOfWorkers / 2),
797 maxSize: numberOfWorkers,
798 workerNodes: Math.floor(numberOfWorkers / 2),
799 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
808 it('Verify that pool worker tasks usage are initialized', async () => {
809 const pool = new FixedClusterPool(
811 './tests/worker-files/cluster/testWorker.js'
813 for (const workerNode of pool.workerNodes) {
814 expect(workerNode).toBeInstanceOf(WorkerNode)
815 expect(workerNode.usage).toStrictEqual({
821 sequentiallyStolen: 0,
826 history: new CircularArray()
829 history: new CircularArray()
833 history: new CircularArray()
836 history: new CircularArray()
844 it('Verify that pool worker tasks queue are initialized', async () => {
845 let pool = new FixedClusterPool(
847 './tests/worker-files/cluster/testWorker.js'
849 for (const workerNode of pool.workerNodes) {
850 expect(workerNode).toBeInstanceOf(WorkerNode)
851 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
856 pool = new DynamicThreadPool(
857 Math.floor(numberOfWorkers / 2),
859 './tests/worker-files/thread/testWorker.mjs'
861 for (const workerNode of pool.workerNodes) {
862 expect(workerNode).toBeInstanceOf(WorkerNode)
863 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
864 expect(workerNode.tasksQueue.size).toBe(0)
865 expect(workerNode.tasksQueue.maxSize).toBe(0)
870 it('Verify that pool worker info are initialized', async () => {
871 let pool = new FixedClusterPool(
873 './tests/worker-files/cluster/testWorker.js'
875 for (const workerNode of pool.workerNodes) {
876 expect(workerNode).toBeInstanceOf(WorkerNode)
877 expect(workerNode.info).toStrictEqual({
878 id: expect.any(Number),
879 type: WorkerTypes.cluster,
885 pool = new DynamicThreadPool(
886 Math.floor(numberOfWorkers / 2),
888 './tests/worker-files/thread/testWorker.mjs'
890 for (const workerNode of pool.workerNodes) {
891 expect(workerNode).toBeInstanceOf(WorkerNode)
892 expect(workerNode.info).toStrictEqual({
893 id: expect.any(Number),
894 type: WorkerTypes.thread,
902 it('Verify that pool statuses are checked at start or destroy', async () => {
903 const pool = new FixedThreadPool(
905 './tests/worker-files/thread/testWorker.mjs'
907 expect(pool.info.started).toBe(true)
908 expect(pool.info.ready).toBe(true)
909 expect(() => pool.start()).toThrow(
910 new Error('Cannot start an already started pool')
913 expect(pool.info.started).toBe(false)
914 expect(pool.info.ready).toBe(false)
915 await expect(pool.destroy()).rejects.toThrow(
916 new Error('Cannot destroy an already destroyed pool')
920 it('Verify that pool can be started after initialization', async () => {
921 const pool = new FixedClusterPool(
923 './tests/worker-files/cluster/testWorker.js',
928 expect(pool.info.started).toBe(false)
929 expect(pool.info.ready).toBe(false)
930 expect(pool.readyEventEmitted).toBe(false)
931 expect(pool.workerNodes).toStrictEqual([])
932 await expect(pool.execute()).rejects.toThrow(
933 new Error('Cannot execute a task on not started pool')
936 expect(pool.info.started).toBe(true)
937 expect(pool.info.ready).toBe(true)
938 await waitPoolEvents(pool, PoolEvents.ready, 1)
939 expect(pool.readyEventEmitted).toBe(true)
940 expect(pool.workerNodes.length).toBe(numberOfWorkers)
941 for (const workerNode of pool.workerNodes) {
942 expect(workerNode).toBeInstanceOf(WorkerNode)
947 it('Verify that pool execute() arguments are checked', async () => {
948 const pool = new FixedClusterPool(
950 './tests/worker-files/cluster/testWorker.js'
952 await expect(pool.execute(undefined, 0)).rejects.toThrow(
953 new TypeError('name argument must be a string')
955 await expect(pool.execute(undefined, '')).rejects.toThrow(
956 new TypeError('name argument must not be an empty string')
958 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
959 new TypeError('transferList argument must be an array')
961 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
962 "Task function 'unknown' not found"
965 await expect(pool.execute()).rejects.toThrow(
966 new Error('Cannot execute a task on not started pool')
970 it('Verify that pool worker tasks usage are computed', async () => {
971 const pool = new FixedClusterPool(
973 './tests/worker-files/cluster/testWorker.js'
975 const promises = new Set()
976 const maxMultiplier = 2
977 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
978 promises.add(pool.execute())
980 for (const workerNode of pool.workerNodes) {
981 expect(workerNode.usage).toStrictEqual({
984 executing: maxMultiplier,
987 sequentiallyStolen: 0,
992 history: expect.any(CircularArray)
995 history: expect.any(CircularArray)
999 history: expect.any(CircularArray)
1002 history: expect.any(CircularArray)
1007 await Promise.all(promises)
1008 for (const workerNode of pool.workerNodes) {
1009 expect(workerNode.usage).toStrictEqual({
1011 executed: maxMultiplier,
1015 sequentiallyStolen: 0,
1020 history: expect.any(CircularArray)
1023 history: expect.any(CircularArray)
1027 history: expect.any(CircularArray)
1030 history: expect.any(CircularArray)
1035 await pool.destroy()
1038 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
1039 const pool = new DynamicThreadPool(
1040 Math.floor(numberOfWorkers / 2),
1042 './tests/worker-files/thread/testWorker.mjs'
1044 const promises = new Set()
1045 const maxMultiplier = 2
1046 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1047 promises.add(pool.execute())
1049 await Promise.all(promises)
1050 for (const workerNode of pool.workerNodes) {
1051 expect(workerNode.usage).toStrictEqual({
1053 executed: expect.any(Number),
1057 sequentiallyStolen: 0,
1062 history: expect.any(CircularArray)
1065 history: expect.any(CircularArray)
1069 history: expect.any(CircularArray)
1072 history: expect.any(CircularArray)
1076 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1077 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1078 numberOfWorkers * maxMultiplier
1080 expect(workerNode.usage.runTime.history.length).toBe(0)
1081 expect(workerNode.usage.waitTime.history.length).toBe(0)
1082 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1083 expect(workerNode.usage.elu.active.history.length).toBe(0)
1085 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1086 for (const workerNode of pool.workerNodes) {
1087 expect(workerNode.usage).toStrictEqual({
1093 sequentiallyStolen: 0,
1098 history: expect.any(CircularArray)
1101 history: expect.any(CircularArray)
1105 history: expect.any(CircularArray)
1108 history: expect.any(CircularArray)
1112 expect(workerNode.usage.runTime.history.length).toBe(0)
1113 expect(workerNode.usage.waitTime.history.length).toBe(0)
1114 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1115 expect(workerNode.usage.elu.active.history.length).toBe(0)
1117 await pool.destroy()
1120 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1121 const pool = new DynamicClusterPool(
1122 Math.floor(numberOfWorkers / 2),
1124 './tests/worker-files/cluster/testWorker.js'
1126 expect(pool.emitter.eventNames()).toStrictEqual([])
1129 pool.emitter.on(PoolEvents.ready, info => {
1133 await waitPoolEvents(pool, PoolEvents.ready, 1)
1134 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1135 expect(poolReady).toBe(1)
1136 expect(poolInfo).toStrictEqual({
1138 type: PoolTypes.dynamic,
1139 worker: WorkerTypes.cluster,
1142 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1143 minSize: expect.any(Number),
1144 maxSize: expect.any(Number),
1145 workerNodes: expect.any(Number),
1146 idleWorkerNodes: expect.any(Number),
1147 busyWorkerNodes: expect.any(Number),
1148 executedTasks: expect.any(Number),
1149 executingTasks: expect.any(Number),
1150 failedTasks: expect.any(Number)
1152 await pool.destroy()
1155 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1156 const pool = new FixedThreadPool(
1158 './tests/worker-files/thread/testWorker.mjs'
1160 expect(pool.emitter.eventNames()).toStrictEqual([])
1161 const promises = new Set()
1164 pool.emitter.on(PoolEvents.busy, info => {
1168 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1169 for (let i = 0; i < numberOfWorkers * 2; i++) {
1170 promises.add(pool.execute())
1172 await Promise.all(promises)
1173 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1174 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1175 expect(poolBusy).toBe(numberOfWorkers + 1)
1176 expect(poolInfo).toStrictEqual({
1178 type: PoolTypes.fixed,
1179 worker: WorkerTypes.thread,
1182 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1183 minSize: expect.any(Number),
1184 maxSize: expect.any(Number),
1185 workerNodes: expect.any(Number),
1186 idleWorkerNodes: expect.any(Number),
1187 busyWorkerNodes: expect.any(Number),
1188 executedTasks: expect.any(Number),
1189 executingTasks: expect.any(Number),
1190 failedTasks: expect.any(Number)
1192 await pool.destroy()
1195 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1196 const pool = new DynamicThreadPool(
1197 Math.floor(numberOfWorkers / 2),
1199 './tests/worker-files/thread/testWorker.mjs'
1201 expect(pool.emitter.eventNames()).toStrictEqual([])
1202 const promises = new Set()
1205 pool.emitter.on(PoolEvents.full, info => {
1209 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1210 for (let i = 0; i < numberOfWorkers * 2; i++) {
1211 promises.add(pool.execute())
1213 await Promise.all(promises)
1214 expect(poolFull).toBe(1)
1215 expect(poolInfo).toStrictEqual({
1217 type: PoolTypes.dynamic,
1218 worker: WorkerTypes.thread,
1221 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1222 minSize: expect.any(Number),
1223 maxSize: expect.any(Number),
1224 workerNodes: expect.any(Number),
1225 idleWorkerNodes: expect.any(Number),
1226 busyWorkerNodes: expect.any(Number),
1227 executedTasks: expect.any(Number),
1228 executingTasks: expect.any(Number),
1229 failedTasks: expect.any(Number)
1231 await pool.destroy()
1234 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1235 const pool = new FixedThreadPool(
1237 './tests/worker-files/thread/testWorker.mjs',
1239 enableTasksQueue: true
1242 stub(pool, 'hasBackPressure').returns(true)
1243 expect(pool.emitter.eventNames()).toStrictEqual([])
1244 const promises = new Set()
1245 let poolBackPressure = 0
1247 pool.emitter.on(PoolEvents.backPressure, info => {
1251 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1252 for (let i = 0; i < numberOfWorkers + 1; i++) {
1253 promises.add(pool.execute())
1255 await Promise.all(promises)
1256 expect(poolBackPressure).toBe(1)
1257 expect(poolInfo).toStrictEqual({
1259 type: PoolTypes.fixed,
1260 worker: WorkerTypes.thread,
1263 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1264 minSize: expect.any(Number),
1265 maxSize: expect.any(Number),
1266 workerNodes: expect.any(Number),
1267 idleWorkerNodes: expect.any(Number),
1268 busyWorkerNodes: expect.any(Number),
1269 executedTasks: expect.any(Number),
1270 executingTasks: expect.any(Number),
1271 maxQueuedTasks: expect.any(Number),
1272 queuedTasks: expect.any(Number),
1274 stolenTasks: expect.any(Number),
1275 failedTasks: expect.any(Number)
1277 expect(pool.hasBackPressure.callCount).toBe(5)
1278 await pool.destroy()
1281 it('Verify that pool asynchronous resource track tasks execution', async () => {
1286 let resolveCalls = 0
1287 const hook = createHook({
1288 init (asyncId, type) {
1289 if (type === 'poolifier:task') {
1291 taskAsyncId = asyncId
1295 if (asyncId === taskAsyncId) beforeCalls++
1298 if (asyncId === taskAsyncId) afterCalls++
1301 if (executionAsyncId() === taskAsyncId) resolveCalls++
1304 const pool = new FixedThreadPool(
1306 './tests/worker-files/thread/testWorker.mjs'
1309 await pool.execute()
1311 expect(initCalls).toBe(1)
1312 expect(beforeCalls).toBe(1)
1313 expect(afterCalls).toBe(1)
1314 expect(resolveCalls).toBe(1)
1315 await pool.destroy()
1318 it('Verify that hasTaskFunction() is working', async () => {
1319 const dynamicThreadPool = new DynamicThreadPool(
1320 Math.floor(numberOfWorkers / 2),
1322 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1324 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1325 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1326 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1329 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1330 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1331 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1332 await dynamicThreadPool.destroy()
1333 const fixedClusterPool = new FixedClusterPool(
1335 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1337 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1338 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1339 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1342 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1343 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1344 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1345 await fixedClusterPool.destroy()
1348 it('Verify that addTaskFunction() is working', async () => {
1349 const dynamicThreadPool = new DynamicThreadPool(
1350 Math.floor(numberOfWorkers / 2),
1352 './tests/worker-files/thread/testWorker.mjs'
1354 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1356 dynamicThreadPool.addTaskFunction(0, () => {})
1357 ).rejects.toThrow(new TypeError('name argument must be a string'))
1359 dynamicThreadPool.addTaskFunction('', () => {})
1361 new TypeError('name argument must not be an empty string')
1363 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1364 new TypeError('fn argument must be a function')
1366 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1367 new TypeError('fn argument must be a function')
1369 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1373 const echoTaskFunction = data => {
1377 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1378 ).resolves.toBe(true)
1379 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1380 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1383 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1388 const taskFunctionData = { test: 'test' }
1389 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1390 expect(echoResult).toStrictEqual(taskFunctionData)
1391 for (const workerNode of dynamicThreadPool.workerNodes) {
1392 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1394 executed: expect.any(Number),
1397 sequentiallyStolen: 0,
1402 history: new CircularArray()
1405 history: new CircularArray()
1409 history: new CircularArray()
1412 history: new CircularArray()
1417 await dynamicThreadPool.destroy()
1420 it('Verify that removeTaskFunction() is working', async () => {
1421 const dynamicThreadPool = new DynamicThreadPool(
1422 Math.floor(numberOfWorkers / 2),
1424 './tests/worker-files/thread/testWorker.mjs'
1426 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1427 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1431 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1432 new Error('Cannot remove a task function not handled on the pool side')
1434 const echoTaskFunction = data => {
1437 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1438 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1439 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1442 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1447 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1450 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1451 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1452 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1456 await dynamicThreadPool.destroy()
1459 it('Verify that listTaskFunctionNames() is working', async () => {
1460 const dynamicThreadPool = new DynamicThreadPool(
1461 Math.floor(numberOfWorkers / 2),
1463 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1465 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1466 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1468 'jsonIntegerSerialization',
1472 await dynamicThreadPool.destroy()
1473 const fixedClusterPool = new FixedClusterPool(
1475 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1477 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1478 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1480 'jsonIntegerSerialization',
1484 await fixedClusterPool.destroy()
1487 it('Verify that setDefaultTaskFunction() is working', async () => {
1488 const dynamicThreadPool = new DynamicThreadPool(
1489 Math.floor(numberOfWorkers / 2),
1491 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1493 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1494 const workerId = dynamicThreadPool.workerNodes[0].info.id
1495 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1497 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1501 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1504 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1508 dynamicThreadPool.setDefaultTaskFunction('unknown')
1511 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1514 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1516 'jsonIntegerSerialization',
1521 dynamicThreadPool.setDefaultTaskFunction('factorial')
1522 ).resolves.toBe(true)
1523 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1526 'jsonIntegerSerialization',
1530 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1531 ).resolves.toBe(true)
1532 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1535 'jsonIntegerSerialization',
1538 await dynamicThreadPool.destroy()
1541 it('Verify that multiple task functions worker is working', async () => {
1542 const pool = new DynamicClusterPool(
1543 Math.floor(numberOfWorkers / 2),
1545 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
1547 const data = { n: 10 }
1548 const result0 = await pool.execute(data)
1549 expect(result0).toStrictEqual({ ok: 1 })
1550 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1551 expect(result1).toStrictEqual({ ok: 1 })
1552 const result2 = await pool.execute(data, 'factorial')
1553 expect(result2).toBe(3628800)
1554 const result3 = await pool.execute(data, 'fibonacci')
1555 expect(result3).toBe(55)
1556 expect(pool.info.executingTasks).toBe(0)
1557 expect(pool.info.executedTasks).toBe(4)
1558 for (const workerNode of pool.workerNodes) {
1559 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1561 'jsonIntegerSerialization',
1565 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1566 for (const name of pool.listTaskFunctionNames()) {
1567 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1569 executed: expect.any(Number),
1573 sequentiallyStolen: 0,
1577 history: expect.any(CircularArray)
1580 history: expect.any(CircularArray)
1584 history: expect.any(CircularArray)
1587 history: expect.any(CircularArray)
1592 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1593 ).toBeGreaterThan(0)
1596 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1598 workerNode.getTaskFunctionWorkerUsage(
1599 workerNode.info.taskFunctionNames[1]
1603 await pool.destroy()
1606 it('Verify sendKillMessageToWorker()', async () => {
1607 const pool = new DynamicClusterPool(
1608 Math.floor(numberOfWorkers / 2),
1610 './tests/worker-files/cluster/testWorker.js'
1612 const workerNodeKey = 0
1614 pool.sendKillMessageToWorker(workerNodeKey)
1615 ).resolves.toBeUndefined()
1616 await pool.destroy()
1619 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1620 const pool = new DynamicClusterPool(
1621 Math.floor(numberOfWorkers / 2),
1623 './tests/worker-files/cluster/testWorker.js'
1625 const workerNodeKey = 0
1627 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1628 taskFunctionOperation: 'add',
1629 taskFunctionName: 'empty',
1630 taskFunction: (() => {}).toString()
1632 ).resolves.toBe(true)
1634 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1635 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1636 await pool.destroy()
1639 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1640 const pool = new DynamicClusterPool(
1641 Math.floor(numberOfWorkers / 2),
1643 './tests/worker-files/cluster/testWorker.js'
1646 pool.sendTaskFunctionOperationToWorkers({
1647 taskFunctionOperation: 'add',
1648 taskFunctionName: 'empty',
1649 taskFunction: (() => {}).toString()
1651 ).resolves.toBe(true)
1652 for (const workerNode of pool.workerNodes) {
1653 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1659 await pool.destroy()