1 import { EventEmitterAsyncResource } from 'node:events'
2 import { dirname, join } from 'node:path'
3 import { readFileSync } from 'node:fs'
4 import { fileURLToPath } from 'node:url'
5 import { createHook, executionAsyncId } from 'node:async_hooks'
6 import { expect } from 'expect'
7 import { restore, stub } from 'sinon'
15 WorkerChoiceStrategies,
17 } from '../../lib/index.cjs'
18 import { CircularArray } from '../../lib/circular-array.cjs'
19 import { Deque } from '../../lib/deque.cjs'
20 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
21 import { waitPoolEvents } from '../test-utils.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
24 describe('Abstract pool test suite', () => {
25 const version = JSON.parse(
27 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
31 const numberOfWorkers = 2
32 class StubPoolWithIsMain extends FixedThreadPool {
42 it('Verify that pool can be created and destroyed', async () => {
43 const pool = new FixedThreadPool(
45 './tests/worker-files/thread/testWorker.mjs'
47 expect(pool).toBeInstanceOf(FixedThreadPool)
51 it('Verify that pool cannot be created from a non main thread/process', () => {
54 new StubPoolWithIsMain(
56 './tests/worker-files/thread/testWorker.mjs',
58 errorHandler: e => console.error(e)
63 'Cannot start a pool from a worker with the same type as the pool'
68 it('Verify that pool statuses properties are set', async () => {
69 const pool = new FixedThreadPool(
71 './tests/worker-files/thread/testWorker.mjs'
73 expect(pool.started).toBe(true)
74 expect(pool.starting).toBe(false)
75 expect(pool.destroying).toBe(false)
79 it('Verify that filePath is checked', () => {
80 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
81 new TypeError('The worker file path must be specified')
83 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
84 new TypeError('The worker file path must be a string')
87 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
88 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
91 it('Verify that numberOfWorkers is checked', () => {
96 './tests/worker-files/thread/testWorker.mjs'
100 'Cannot instantiate a pool without specifying the number of workers'
105 it('Verify that a negative number of workers is checked', () => {
108 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
111 'Cannot instantiate a pool with a negative number of workers'
116 it('Verify that a non integer number of workers is checked', () => {
119 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
122 'Cannot instantiate a pool with a non safe integer number of workers'
127 it('Verify that pool arguments number and pool type are checked', () => {
132 './tests/worker-files/thread/testWorker.mjs',
138 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
143 it('Verify that dynamic pool sizing is checked', () => {
146 new DynamicClusterPool(
149 './tests/worker-files/cluster/testWorker.cjs'
153 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
158 new DynamicThreadPool(
161 './tests/worker-files/thread/testWorker.mjs'
165 'Cannot instantiate a pool with a non safe integer number of workers'
170 new DynamicClusterPool(
173 './tests/worker-files/cluster/testWorker.cjs'
177 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 new DynamicThreadPool(
185 './tests/worker-files/thread/testWorker.mjs'
189 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 new DynamicThreadPool(
197 './tests/worker-files/thread/testWorker.mjs'
201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
206 new DynamicClusterPool(
209 './tests/worker-files/cluster/testWorker.cjs'
213 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 it('Verify that pool options are checked', async () => {
219 let pool = new FixedThreadPool(
221 './tests/worker-files/thread/testWorker.mjs'
223 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
224 expect(pool.opts).toStrictEqual({
227 restartWorkerOnError: true,
228 enableTasksQueue: false,
229 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
231 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
232 .workerChoiceStrategies) {
233 expect(workerChoiceStrategy.opts).toStrictEqual({
234 runTime: { median: false },
235 waitTime: { median: false },
236 elu: { median: false },
237 weights: expect.objectContaining({
238 0: expect.any(Number),
239 [pool.info.maxSize - 1]: expect.any(Number)
244 const testHandler = () => console.info('test handler executed')
245 pool = new FixedThreadPool(
247 './tests/worker-files/thread/testWorker.mjs',
249 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
250 workerChoiceStrategyOptions: {
251 runTime: { median: true },
252 weights: { 0: 300, 1: 200 }
255 restartWorkerOnError: false,
256 enableTasksQueue: true,
257 tasksQueueOptions: { concurrency: 2 },
258 messageHandler: testHandler,
259 errorHandler: testHandler,
260 onlineHandler: testHandler,
261 exitHandler: testHandler
264 expect(pool.emitter).toBeUndefined()
265 expect(pool.opts).toStrictEqual({
268 restartWorkerOnError: false,
269 enableTasksQueue: true,
272 size: Math.pow(numberOfWorkers, 2),
274 tasksStealingOnBackPressure: true,
275 tasksFinishedTimeout: 2000
277 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
278 workerChoiceStrategyOptions: {
279 runTime: { median: true },
280 weights: { 0: 300, 1: 200 }
282 onlineHandler: testHandler,
283 messageHandler: testHandler,
284 errorHandler: testHandler,
285 exitHandler: testHandler
287 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
288 .workerChoiceStrategies) {
289 expect(workerChoiceStrategy.opts).toStrictEqual({
290 runTime: { median: true },
291 waitTime: { median: false },
292 elu: { median: false },
293 weights: { 0: 300, 1: 200 }
299 it('Verify that pool options are validated', () => {
304 './tests/worker-files/thread/testWorker.mjs',
306 workerChoiceStrategy: 'invalidStrategy'
309 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
314 './tests/worker-files/thread/testWorker.mjs',
316 workerChoiceStrategyOptions: { weights: {} }
321 'Invalid worker choice strategy options: must have a weight for each worker node'
328 './tests/worker-files/thread/testWorker.mjs',
330 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
335 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
342 './tests/worker-files/thread/testWorker.mjs',
344 enableTasksQueue: true,
345 tasksQueueOptions: 'invalidTasksQueueOptions'
349 new TypeError('Invalid tasks queue options: must be a plain object')
355 './tests/worker-files/thread/testWorker.mjs',
357 enableTasksQueue: true,
358 tasksQueueOptions: { concurrency: 0 }
363 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
370 './tests/worker-files/thread/testWorker.mjs',
372 enableTasksQueue: true,
373 tasksQueueOptions: { concurrency: -1 }
378 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
385 './tests/worker-files/thread/testWorker.mjs',
387 enableTasksQueue: true,
388 tasksQueueOptions: { concurrency: 0.2 }
392 new TypeError('Invalid worker node tasks concurrency: must be an integer')
398 './tests/worker-files/thread/testWorker.mjs',
400 enableTasksQueue: true,
401 tasksQueueOptions: { size: 0 }
406 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
413 './tests/worker-files/thread/testWorker.mjs',
415 enableTasksQueue: true,
416 tasksQueueOptions: { size: -1 }
421 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
428 './tests/worker-files/thread/testWorker.mjs',
430 enableTasksQueue: true,
431 tasksQueueOptions: { size: 0.2 }
435 new TypeError('Invalid worker node tasks queue size: must be an integer')
439 it('Verify that pool worker choice strategy options can be set', async () => {
440 const pool = new FixedThreadPool(
442 './tests/worker-files/thread/testWorker.mjs',
443 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
445 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
446 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
447 .workerChoiceStrategies) {
448 expect(workerChoiceStrategy.opts).toStrictEqual({
449 runTime: { median: false },
450 waitTime: { median: false },
451 elu: { median: false },
452 weights: expect.objectContaining({
453 0: expect.any(Number),
454 [pool.info.maxSize - 1]: expect.any(Number)
459 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
477 pool.setWorkerChoiceStrategyOptions({
478 runTime: { median: true },
479 elu: { median: true }
481 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
482 runTime: { median: true },
483 elu: { median: true }
485 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
486 .workerChoiceStrategies) {
487 expect(workerChoiceStrategy.opts).toStrictEqual({
488 runTime: { median: true },
489 waitTime: { median: false },
490 elu: { median: true },
491 weights: expect.objectContaining({
492 0: expect.any(Number),
493 [pool.info.maxSize - 1]: expect.any(Number)
498 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
516 pool.setWorkerChoiceStrategyOptions({
517 runTime: { median: false },
518 elu: { median: false }
520 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
521 runTime: { median: false },
522 elu: { median: false }
524 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
525 .workerChoiceStrategies) {
526 expect(workerChoiceStrategy.opts).toStrictEqual({
527 runTime: { median: false },
528 waitTime: { median: false },
529 elu: { median: false },
530 weights: expect.objectContaining({
531 0: expect.any(Number),
532 [pool.info.maxSize - 1]: expect.any(Number)
537 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
556 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
559 'Invalid worker choice strategy options: must be a plain object'
562 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
564 'Invalid worker choice strategy options: must have a weight for each worker node'
568 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
571 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
577 it('Verify that pool tasks queue can be enabled/disabled', async () => {
578 const pool = new FixedThreadPool(
580 './tests/worker-files/thread/testWorker.mjs'
582 expect(pool.opts.enableTasksQueue).toBe(false)
583 expect(pool.opts.tasksQueueOptions).toBeUndefined()
584 pool.enableTasksQueue(true)
585 expect(pool.opts.enableTasksQueue).toBe(true)
586 expect(pool.opts.tasksQueueOptions).toStrictEqual({
588 size: Math.pow(numberOfWorkers, 2),
590 tasksStealingOnBackPressure: true,
591 tasksFinishedTimeout: 2000
593 pool.enableTasksQueue(true, { concurrency: 2 })
594 expect(pool.opts.enableTasksQueue).toBe(true)
595 expect(pool.opts.tasksQueueOptions).toStrictEqual({
597 size: Math.pow(numberOfWorkers, 2),
599 tasksStealingOnBackPressure: true,
600 tasksFinishedTimeout: 2000
602 pool.enableTasksQueue(false)
603 expect(pool.opts.enableTasksQueue).toBe(false)
604 expect(pool.opts.tasksQueueOptions).toBeUndefined()
608 it('Verify that pool tasks queue options can be set', async () => {
609 const pool = new FixedThreadPool(
611 './tests/worker-files/thread/testWorker.mjs',
612 { enableTasksQueue: true }
614 expect(pool.opts.tasksQueueOptions).toStrictEqual({
616 size: Math.pow(numberOfWorkers, 2),
618 tasksStealingOnBackPressure: true,
619 tasksFinishedTimeout: 2000
621 for (const workerNode of pool.workerNodes) {
622 expect(workerNode.tasksQueueBackPressureSize).toBe(
623 pool.opts.tasksQueueOptions.size
626 pool.setTasksQueueOptions({
630 tasksStealingOnBackPressure: false,
631 tasksFinishedTimeout: 3000
633 expect(pool.opts.tasksQueueOptions).toStrictEqual({
637 tasksStealingOnBackPressure: false,
638 tasksFinishedTimeout: 3000
640 for (const workerNode of pool.workerNodes) {
641 expect(workerNode.tasksQueueBackPressureSize).toBe(
642 pool.opts.tasksQueueOptions.size
645 pool.setTasksQueueOptions({
648 tasksStealingOnBackPressure: true
650 expect(pool.opts.tasksQueueOptions).toStrictEqual({
652 size: Math.pow(numberOfWorkers, 2),
654 tasksStealingOnBackPressure: true,
655 tasksFinishedTimeout: 2000
657 for (const workerNode of pool.workerNodes) {
658 expect(workerNode.tasksQueueBackPressureSize).toBe(
659 pool.opts.tasksQueueOptions.size
662 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
663 new TypeError('Invalid tasks queue options: must be a plain object')
665 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
667 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
670 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
672 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
675 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
676 new TypeError('Invalid worker node tasks concurrency: must be an integer')
678 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
680 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
683 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
685 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
688 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
689 new TypeError('Invalid worker node tasks queue size: must be an integer')
694 it('Verify that pool info is set', async () => {
695 let pool = new FixedThreadPool(
697 './tests/worker-files/thread/testWorker.mjs'
699 expect(pool.info).toStrictEqual({
701 type: PoolTypes.fixed,
702 worker: WorkerTypes.thread,
705 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
706 minSize: numberOfWorkers,
707 maxSize: numberOfWorkers,
708 workerNodes: numberOfWorkers,
709 idleWorkerNodes: numberOfWorkers,
716 pool = new DynamicClusterPool(
717 Math.floor(numberOfWorkers / 2),
719 './tests/worker-files/cluster/testWorker.cjs'
721 expect(pool.info).toStrictEqual({
723 type: PoolTypes.dynamic,
724 worker: WorkerTypes.cluster,
727 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
728 minSize: Math.floor(numberOfWorkers / 2),
729 maxSize: numberOfWorkers,
730 workerNodes: Math.floor(numberOfWorkers / 2),
731 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
740 it('Verify that pool worker tasks usage are initialized', async () => {
741 const pool = new FixedClusterPool(
743 './tests/worker-files/cluster/testWorker.cjs'
745 for (const workerNode of pool.workerNodes) {
746 expect(workerNode).toBeInstanceOf(WorkerNode)
747 expect(workerNode.usage).toStrictEqual({
753 sequentiallyStolen: 0,
758 history: new CircularArray()
761 history: new CircularArray()
765 history: new CircularArray()
768 history: new CircularArray()
776 it('Verify that pool worker tasks queue are initialized', async () => {
777 let pool = new FixedClusterPool(
779 './tests/worker-files/cluster/testWorker.cjs'
781 for (const workerNode of pool.workerNodes) {
782 expect(workerNode).toBeInstanceOf(WorkerNode)
783 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
784 expect(workerNode.tasksQueue.size).toBe(0)
785 expect(workerNode.tasksQueue.maxSize).toBe(0)
788 pool = new DynamicThreadPool(
789 Math.floor(numberOfWorkers / 2),
791 './tests/worker-files/thread/testWorker.mjs'
793 for (const workerNode of pool.workerNodes) {
794 expect(workerNode).toBeInstanceOf(WorkerNode)
795 expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
796 expect(workerNode.tasksQueue.size).toBe(0)
797 expect(workerNode.tasksQueue.maxSize).toBe(0)
802 it('Verify that pool worker info are initialized', async () => {
803 let pool = new FixedClusterPool(
805 './tests/worker-files/cluster/testWorker.cjs'
807 for (const workerNode of pool.workerNodes) {
808 expect(workerNode).toBeInstanceOf(WorkerNode)
809 expect(workerNode.info).toStrictEqual({
810 id: expect.any(Number),
811 type: WorkerTypes.cluster,
818 pool = new DynamicThreadPool(
819 Math.floor(numberOfWorkers / 2),
821 './tests/worker-files/thread/testWorker.mjs'
823 for (const workerNode of pool.workerNodes) {
824 expect(workerNode).toBeInstanceOf(WorkerNode)
825 expect(workerNode.info).toStrictEqual({
826 id: expect.any(Number),
827 type: WorkerTypes.thread,
836 it('Verify that pool statuses are checked at start or destroy', async () => {
837 const pool = new FixedThreadPool(
839 './tests/worker-files/thread/testWorker.mjs'
841 expect(pool.info.started).toBe(true)
842 expect(pool.info.ready).toBe(true)
843 expect(() => pool.start()).toThrow(
844 new Error('Cannot start an already started pool')
847 expect(pool.info.started).toBe(false)
848 expect(pool.info.ready).toBe(false)
849 await expect(pool.destroy()).rejects.toThrow(
850 new Error('Cannot destroy an already destroyed pool')
854 it('Verify that pool can be started after initialization', async () => {
855 const pool = new FixedClusterPool(
857 './tests/worker-files/cluster/testWorker.cjs',
862 expect(pool.info.started).toBe(false)
863 expect(pool.info.ready).toBe(false)
864 expect(pool.readyEventEmitted).toBe(false)
865 expect(pool.workerNodes).toStrictEqual([])
866 await expect(pool.execute()).rejects.toThrow(
867 new Error('Cannot execute a task on not started pool')
870 expect(pool.info.started).toBe(true)
871 expect(pool.info.ready).toBe(true)
872 await waitPoolEvents(pool, PoolEvents.ready, 1)
873 expect(pool.readyEventEmitted).toBe(true)
874 expect(pool.workerNodes.length).toBe(numberOfWorkers)
875 for (const workerNode of pool.workerNodes) {
876 expect(workerNode).toBeInstanceOf(WorkerNode)
881 it('Verify that pool execute() arguments are checked', async () => {
882 const pool = new FixedClusterPool(
884 './tests/worker-files/cluster/testWorker.cjs'
886 await expect(pool.execute(undefined, 0)).rejects.toThrow(
887 new TypeError('name argument must be a string')
889 await expect(pool.execute(undefined, '')).rejects.toThrow(
890 new TypeError('name argument must not be an empty string')
892 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
893 new TypeError('transferList argument must be an array')
895 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
896 "Task function 'unknown' not found"
899 await expect(pool.execute()).rejects.toThrow(
900 new Error('Cannot execute a task on not started pool')
904 it('Verify that pool worker tasks usage are computed', async () => {
905 const pool = new FixedClusterPool(
907 './tests/worker-files/cluster/testWorker.cjs'
909 const promises = new Set()
910 const maxMultiplier = 2
911 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
912 promises.add(pool.execute())
914 for (const workerNode of pool.workerNodes) {
915 expect(workerNode.usage).toStrictEqual({
918 executing: maxMultiplier,
921 sequentiallyStolen: 0,
926 history: expect.any(CircularArray)
929 history: expect.any(CircularArray)
933 history: expect.any(CircularArray)
936 history: expect.any(CircularArray)
941 await Promise.all(promises)
942 for (const workerNode of pool.workerNodes) {
943 expect(workerNode.usage).toStrictEqual({
945 executed: maxMultiplier,
949 sequentiallyStolen: 0,
954 history: expect.any(CircularArray)
957 history: expect.any(CircularArray)
961 history: expect.any(CircularArray)
964 history: expect.any(CircularArray)
972 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
973 const pool = new DynamicThreadPool(
974 Math.floor(numberOfWorkers / 2),
976 './tests/worker-files/thread/testWorker.mjs'
978 const promises = new Set()
979 const maxMultiplier = 2
980 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
981 promises.add(pool.execute())
983 await Promise.all(promises)
984 for (const workerNode of pool.workerNodes) {
985 expect(workerNode.usage).toStrictEqual({
987 executed: expect.any(Number),
991 sequentiallyStolen: 0,
996 history: expect.any(CircularArray)
999 history: expect.any(CircularArray)
1003 history: expect.any(CircularArray)
1006 history: expect.any(CircularArray)
1010 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1011 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1012 numberOfWorkers * maxMultiplier
1014 expect(workerNode.usage.runTime.history.length).toBe(0)
1015 expect(workerNode.usage.waitTime.history.length).toBe(0)
1016 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1017 expect(workerNode.usage.elu.active.history.length).toBe(0)
1019 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1020 for (const workerNode of pool.workerNodes) {
1021 expect(workerNode.usage).toStrictEqual({
1027 sequentiallyStolen: 0,
1032 history: expect.any(CircularArray)
1035 history: expect.any(CircularArray)
1039 history: expect.any(CircularArray)
1042 history: expect.any(CircularArray)
1046 expect(workerNode.usage.runTime.history.length).toBe(0)
1047 expect(workerNode.usage.waitTime.history.length).toBe(0)
1048 expect(workerNode.usage.elu.idle.history.length).toBe(0)
1049 expect(workerNode.usage.elu.active.history.length).toBe(0)
1051 await pool.destroy()
1054 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1055 const pool = new DynamicClusterPool(
1056 Math.floor(numberOfWorkers / 2),
1058 './tests/worker-files/cluster/testWorker.cjs'
1060 expect(pool.emitter.eventNames()).toStrictEqual([])
1063 pool.emitter.on(PoolEvents.ready, info => {
1067 await waitPoolEvents(pool, PoolEvents.ready, 1)
1068 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1069 expect(poolReady).toBe(1)
1070 expect(poolInfo).toStrictEqual({
1072 type: PoolTypes.dynamic,
1073 worker: WorkerTypes.cluster,
1076 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1077 minSize: expect.any(Number),
1078 maxSize: expect.any(Number),
1079 workerNodes: expect.any(Number),
1080 idleWorkerNodes: expect.any(Number),
1081 busyWorkerNodes: expect.any(Number),
1082 executedTasks: expect.any(Number),
1083 executingTasks: expect.any(Number),
1084 failedTasks: expect.any(Number)
1086 await pool.destroy()
1089 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1090 const pool = new FixedThreadPool(
1092 './tests/worker-files/thread/testWorker.mjs'
1094 expect(pool.emitter.eventNames()).toStrictEqual([])
1095 const promises = new Set()
1098 pool.emitter.on(PoolEvents.busy, info => {
1102 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1103 for (let i = 0; i < numberOfWorkers * 2; i++) {
1104 promises.add(pool.execute())
1106 await Promise.all(promises)
1107 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1108 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1109 expect(poolBusy).toBe(numberOfWorkers + 1)
1110 expect(poolInfo).toStrictEqual({
1112 type: PoolTypes.fixed,
1113 worker: WorkerTypes.thread,
1116 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1117 minSize: expect.any(Number),
1118 maxSize: expect.any(Number),
1119 workerNodes: expect.any(Number),
1120 idleWorkerNodes: expect.any(Number),
1121 busyWorkerNodes: expect.any(Number),
1122 executedTasks: expect.any(Number),
1123 executingTasks: expect.any(Number),
1124 failedTasks: expect.any(Number)
1126 await pool.destroy()
1129 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1130 const pool = new DynamicThreadPool(
1131 Math.floor(numberOfWorkers / 2),
1133 './tests/worker-files/thread/testWorker.mjs'
1135 expect(pool.emitter.eventNames()).toStrictEqual([])
1136 const promises = new Set()
1139 pool.emitter.on(PoolEvents.full, info => {
1143 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1144 for (let i = 0; i < numberOfWorkers * 2; i++) {
1145 promises.add(pool.execute())
1147 await Promise.all(promises)
1148 expect(poolFull).toBe(1)
1149 expect(poolInfo).toStrictEqual({
1151 type: PoolTypes.dynamic,
1152 worker: WorkerTypes.thread,
1155 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1156 minSize: expect.any(Number),
1157 maxSize: expect.any(Number),
1158 workerNodes: expect.any(Number),
1159 idleWorkerNodes: expect.any(Number),
1160 busyWorkerNodes: expect.any(Number),
1161 executedTasks: expect.any(Number),
1162 executingTasks: expect.any(Number),
1163 failedTasks: expect.any(Number)
1165 await pool.destroy()
1168 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1169 const pool = new FixedThreadPool(
1171 './tests/worker-files/thread/testWorker.mjs',
1173 enableTasksQueue: true
1176 stub(pool, 'hasBackPressure').returns(true)
1177 expect(pool.emitter.eventNames()).toStrictEqual([])
1178 const promises = new Set()
1179 let poolBackPressure = 0
1181 pool.emitter.on(PoolEvents.backPressure, info => {
1185 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1186 for (let i = 0; i < numberOfWorkers + 1; i++) {
1187 promises.add(pool.execute())
1189 await Promise.all(promises)
1190 expect(poolBackPressure).toBe(1)
1191 expect(poolInfo).toStrictEqual({
1193 type: PoolTypes.fixed,
1194 worker: WorkerTypes.thread,
1197 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
1198 minSize: expect.any(Number),
1199 maxSize: expect.any(Number),
1200 workerNodes: expect.any(Number),
1201 idleWorkerNodes: expect.any(Number),
1202 stealingWorkerNodes: expect.any(Number),
1203 busyWorkerNodes: expect.any(Number),
1204 executedTasks: expect.any(Number),
1205 executingTasks: expect.any(Number),
1206 maxQueuedTasks: expect.any(Number),
1207 queuedTasks: expect.any(Number),
1209 stolenTasks: expect.any(Number),
1210 failedTasks: expect.any(Number)
1212 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1213 await pool.destroy()
1216 it('Verify that destroy() waits for queued tasks to finish', async () => {
1217 const tasksFinishedTimeout = 2500
1218 const pool = new FixedThreadPool(
1220 './tests/worker-files/thread/asyncWorker.mjs',
1222 enableTasksQueue: true,
1223 tasksQueueOptions: { tasksFinishedTimeout }
1226 const maxMultiplier = 4
1227 let tasksFinished = 0
1228 for (const workerNode of pool.workerNodes) {
1229 workerNode.on('taskFinished', () => {
1233 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1236 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1237 const startTime = performance.now()
1238 await pool.destroy()
1239 const elapsedTime = performance.now() - startTime
1240 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1241 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1242 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 400)
1245 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1246 const tasksFinishedTimeout = 1000
1247 const pool = new FixedThreadPool(
1249 './tests/worker-files/thread/asyncWorker.mjs',
1251 enableTasksQueue: true,
1252 tasksQueueOptions: { tasksFinishedTimeout }
1255 const maxMultiplier = 4
1256 let tasksFinished = 0
1257 for (const workerNode of pool.workerNodes) {
1258 workerNode.on('taskFinished', () => {
1262 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1265 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1266 const startTime = performance.now()
1267 await pool.destroy()
1268 const elapsedTime = performance.now() - startTime
1269 expect(tasksFinished).toBe(0)
1270 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1273 it('Verify that pool asynchronous resource track tasks execution', async () => {
1278 let resolveCalls = 0
1279 const hook = createHook({
1280 init (asyncId, type) {
1281 if (type === 'poolifier:task') {
1283 taskAsyncId = asyncId
1287 if (asyncId === taskAsyncId) beforeCalls++
1290 if (asyncId === taskAsyncId) afterCalls++
1293 if (executionAsyncId() === taskAsyncId) resolveCalls++
1296 const pool = new FixedThreadPool(
1298 './tests/worker-files/thread/testWorker.mjs'
1301 await pool.execute()
1303 expect(initCalls).toBe(1)
1304 expect(beforeCalls).toBe(1)
1305 expect(afterCalls).toBe(1)
1306 expect(resolveCalls).toBe(1)
1307 await pool.destroy()
1310 it('Verify that hasTaskFunction() is working', async () => {
1311 const dynamicThreadPool = new DynamicThreadPool(
1312 Math.floor(numberOfWorkers / 2),
1314 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1316 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1317 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1318 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1321 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1322 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1323 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1324 await dynamicThreadPool.destroy()
1325 const fixedClusterPool = new FixedClusterPool(
1327 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1329 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1330 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1331 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1334 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1335 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1336 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1337 await fixedClusterPool.destroy()
1340 it('Verify that addTaskFunction() is working', async () => {
1341 const dynamicThreadPool = new DynamicThreadPool(
1342 Math.floor(numberOfWorkers / 2),
1344 './tests/worker-files/thread/testWorker.mjs'
1346 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1348 dynamicThreadPool.addTaskFunction(0, () => {})
1349 ).rejects.toThrow(new TypeError('name argument must be a string'))
1351 dynamicThreadPool.addTaskFunction('', () => {})
1353 new TypeError('name argument must not be an empty string')
1355 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1356 new TypeError('fn argument must be a function')
1358 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1359 new TypeError('fn argument must be a function')
1361 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1365 const echoTaskFunction = data => {
1369 dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1370 ).resolves.toBe(true)
1371 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1372 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1375 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1380 const taskFunctionData = { test: 'test' }
1381 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1382 expect(echoResult).toStrictEqual(taskFunctionData)
1383 for (const workerNode of dynamicThreadPool.workerNodes) {
1384 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1386 executed: expect.any(Number),
1389 sequentiallyStolen: 0,
1394 history: new CircularArray()
1397 history: new CircularArray()
1401 history: new CircularArray()
1404 history: new CircularArray()
1409 await dynamicThreadPool.destroy()
1412 it('Verify that removeTaskFunction() is working', async () => {
1413 const dynamicThreadPool = new DynamicThreadPool(
1414 Math.floor(numberOfWorkers / 2),
1416 './tests/worker-files/thread/testWorker.mjs'
1418 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1419 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1423 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1424 new Error('Cannot remove a task function not handled on the pool side')
1426 const echoTaskFunction = data => {
1429 await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
1430 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1431 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
1434 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1439 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1442 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1443 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1444 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1448 await dynamicThreadPool.destroy()
1451 it('Verify that listTaskFunctionNames() is working', async () => {
1452 const dynamicThreadPool = new DynamicThreadPool(
1453 Math.floor(numberOfWorkers / 2),
1455 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1457 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1458 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1460 'jsonIntegerSerialization',
1464 await dynamicThreadPool.destroy()
1465 const fixedClusterPool = new FixedClusterPool(
1467 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1469 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1470 expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
1472 'jsonIntegerSerialization',
1476 await fixedClusterPool.destroy()
1479 it('Verify that setDefaultTaskFunction() is working', async () => {
1480 const dynamicThreadPool = new DynamicThreadPool(
1481 Math.floor(numberOfWorkers / 2),
1483 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1485 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1486 const workerId = dynamicThreadPool.workerNodes[0].info.id
1487 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1489 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1493 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1496 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1500 dynamicThreadPool.setDefaultTaskFunction('unknown')
1503 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1506 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1508 'jsonIntegerSerialization',
1513 dynamicThreadPool.setDefaultTaskFunction('factorial')
1514 ).resolves.toBe(true)
1515 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1518 'jsonIntegerSerialization',
1522 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1523 ).resolves.toBe(true)
1524 expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
1527 'jsonIntegerSerialization',
1530 await dynamicThreadPool.destroy()
1533 it('Verify that multiple task functions worker is working', async () => {
1534 const pool = new DynamicClusterPool(
1535 Math.floor(numberOfWorkers / 2),
1537 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1539 const data = { n: 10 }
1540 const result0 = await pool.execute(data)
1541 expect(result0).toStrictEqual({ ok: 1 })
1542 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1543 expect(result1).toStrictEqual({ ok: 1 })
1544 const result2 = await pool.execute(data, 'factorial')
1545 expect(result2).toBe(3628800)
1546 const result3 = await pool.execute(data, 'fibonacci')
1547 expect(result3).toBe(55)
1548 expect(pool.info.executingTasks).toBe(0)
1549 expect(pool.info.executedTasks).toBe(4)
1550 for (const workerNode of pool.workerNodes) {
1551 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1553 'jsonIntegerSerialization',
1557 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1558 for (const name of pool.listTaskFunctionNames()) {
1559 expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
1561 executed: expect.any(Number),
1565 sequentiallyStolen: 0,
1569 history: expect.any(CircularArray)
1572 history: expect.any(CircularArray)
1576 history: expect.any(CircularArray)
1579 history: expect.any(CircularArray)
1584 workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
1585 ).toBeGreaterThan(0)
1588 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1590 workerNode.getTaskFunctionWorkerUsage(
1591 workerNode.info.taskFunctionNames[1]
1595 await pool.destroy()
1598 it('Verify sendKillMessageToWorker()', async () => {
1599 const pool = new DynamicClusterPool(
1600 Math.floor(numberOfWorkers / 2),
1602 './tests/worker-files/cluster/testWorker.cjs'
1604 const workerNodeKey = 0
1606 pool.sendKillMessageToWorker(workerNodeKey)
1607 ).resolves.toBeUndefined()
1608 await pool.destroy()
1611 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1612 const pool = new DynamicClusterPool(
1613 Math.floor(numberOfWorkers / 2),
1615 './tests/worker-files/cluster/testWorker.cjs'
1617 const workerNodeKey = 0
1619 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1620 taskFunctionOperation: 'add',
1621 taskFunctionName: 'empty',
1622 taskFunction: (() => {}).toString()
1624 ).resolves.toBe(true)
1626 pool.workerNodes[workerNodeKey].info.taskFunctionNames
1627 ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
1628 await pool.destroy()
1631 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1632 const pool = new DynamicClusterPool(
1633 Math.floor(numberOfWorkers / 2),
1635 './tests/worker-files/cluster/testWorker.cjs'
1638 pool.sendTaskFunctionOperationToWorkers({
1639 taskFunctionOperation: 'add',
1640 taskFunctionName: 'empty',
1641 taskFunction: (() => {}).toString()
1643 ).resolves.toBe(true)
1644 for (const workerNode of pool.workerNodes) {
1645 expect(workerNode.info.taskFunctionNames).toStrictEqual([
1651 await pool.destroy()