1 // eslint-disable-next-line n/no-unsupported-features/node-builtins
2 import { createHook, executionAsyncId } from 'node:async_hooks'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { readFileSync } from 'node:fs'
5 import { dirname, join } from 'node:path'
6 import { fileURLToPath } from 'node:url'
8 import { expect } from 'expect'
9 import { restore, stub } from 'sinon'
11 import { CircularBuffer } from '../../lib/circular-buffer.cjs'
19 WorkerChoiceStrategies,
21 } from '../../lib/index.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23 import { defaultBucketSize, PriorityQueue } from '../../lib/priority-queue.cjs'
24 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
25 import { waitPoolEvents } from '../test-utils.cjs'
27 describe('Abstract pool test suite', () => {
28 const version = JSON.parse(
30 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
34 const numberOfWorkers = 2
35 class StubPoolWithIsMain extends FixedThreadPool {
45 it('Verify that pool can be created and destroyed', async () => {
46 const pool = new FixedThreadPool(
48 './tests/worker-files/thread/testWorker.mjs'
50 expect(pool).toBeInstanceOf(FixedThreadPool)
54 it('Verify that pool cannot be created from a non main thread/process', () => {
57 new StubPoolWithIsMain(
59 './tests/worker-files/thread/testWorker.mjs',
61 errorHandler: e => console.error(e),
66 'Cannot start a pool from a worker with the same type as the pool'
71 it('Verify that pool statuses properties are set', async () => {
72 const pool = new FixedThreadPool(
74 './tests/worker-files/thread/testWorker.mjs'
76 expect(pool.started).toBe(true)
77 expect(pool.starting).toBe(false)
78 expect(pool.destroying).toBe(false)
82 it('Verify that filePath is checked', () => {
83 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
84 new TypeError('The worker file path must be specified')
86 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
87 new TypeError('The worker file path must be a string')
90 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
91 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
94 it('Verify that numberOfWorkers is checked', () => {
99 './tests/worker-files/thread/testWorker.mjs'
103 'Cannot instantiate a pool without specifying the number of workers'
108 it('Verify that a negative number of workers is checked', () => {
111 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
114 'Cannot instantiate a pool with a negative number of workers'
119 it('Verify that a non integer number of workers is checked', () => {
122 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
125 'Cannot instantiate a pool with a non safe integer number of workers'
130 it('Verify that pool arguments number and pool type are checked', () => {
135 './tests/worker-files/thread/testWorker.mjs',
141 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
146 it('Verify that dynamic pool sizing is checked', () => {
149 new DynamicClusterPool(
152 './tests/worker-files/cluster/testWorker.cjs'
156 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
161 new DynamicThreadPool(
164 './tests/worker-files/thread/testWorker.mjs'
168 'Cannot instantiate a pool with a non safe integer number of workers'
173 new DynamicClusterPool(
176 './tests/worker-files/cluster/testWorker.cjs'
180 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 new DynamicThreadPool(
188 './tests/worker-files/thread/testWorker.mjs'
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
197 new DynamicThreadPool(
200 './tests/worker-files/thread/testWorker.mjs'
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
209 new DynamicClusterPool(
212 './tests/worker-files/cluster/testWorker.cjs'
216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
221 it('Verify that pool options are checked', async () => {
222 let pool = new FixedThreadPool(
224 './tests/worker-files/thread/testWorker.mjs'
226 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
227 expect(pool.emitter.eventNames()).toStrictEqual([])
228 expect(pool.opts).toStrictEqual({
231 restartWorkerOnError: true,
232 enableTasksQueue: false,
233 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
235 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
236 .workerChoiceStrategies) {
237 expect(workerChoiceStrategy.opts).toStrictEqual({
238 runTime: { median: false },
239 waitTime: { median: false },
240 elu: { median: false },
241 weights: expect.objectContaining({
242 0: expect.any(Number),
243 [pool.info.maxSize - 1]: expect.any(Number),
248 const testHandler = () => console.info('test handler executed')
249 pool = new FixedThreadPool(
251 './tests/worker-files/thread/testWorker.mjs',
253 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
254 workerChoiceStrategyOptions: {
255 runTime: { median: true },
256 weights: { 0: 300, 1: 200 },
259 restartWorkerOnError: false,
260 enableTasksQueue: true,
261 tasksQueueOptions: { concurrency: 2 },
262 messageHandler: testHandler,
263 errorHandler: testHandler,
264 onlineHandler: testHandler,
265 exitHandler: testHandler,
268 expect(pool.emitter).toBeUndefined()
269 expect(pool.opts).toStrictEqual({
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
276 size: Math.pow(numberOfWorkers, 2),
278 tasksStealingOnBackPressure: false,
279 tasksFinishedTimeout: 2000,
281 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
282 workerChoiceStrategyOptions: {
283 runTime: { median: true },
284 weights: { 0: 300, 1: 200 },
286 onlineHandler: testHandler,
287 messageHandler: testHandler,
288 errorHandler: testHandler,
289 exitHandler: testHandler,
291 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
292 .workerChoiceStrategies) {
293 expect(workerChoiceStrategy.opts).toStrictEqual({
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 },
303 it('Verify that pool options are validated', () => {
308 './tests/worker-files/thread/testWorker.mjs',
310 workerChoiceStrategy: 'invalidStrategy',
313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
318 './tests/worker-files/thread/testWorker.mjs',
320 workerChoiceStrategyOptions: { weights: {} },
325 'Invalid worker choice strategy options: must have a weight for each worker node'
332 './tests/worker-files/thread/testWorker.mjs',
334 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' },
339 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
346 './tests/worker-files/thread/testWorker.mjs',
348 enableTasksQueue: true,
349 tasksQueueOptions: 'invalidTasksQueueOptions',
353 new TypeError('Invalid tasks queue options: must be a plain object')
359 './tests/worker-files/thread/testWorker.mjs',
361 enableTasksQueue: true,
362 tasksQueueOptions: { concurrency: 0 },
367 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
374 './tests/worker-files/thread/testWorker.mjs',
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: -1 },
382 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
389 './tests/worker-files/thread/testWorker.mjs',
391 enableTasksQueue: true,
392 tasksQueueOptions: { concurrency: 0.2 },
396 new TypeError('Invalid worker node tasks concurrency: must be an integer')
402 './tests/worker-files/thread/testWorker.mjs',
404 enableTasksQueue: true,
405 tasksQueueOptions: { size: 0 },
410 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
417 './tests/worker-files/thread/testWorker.mjs',
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: -1 },
425 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
432 './tests/worker-files/thread/testWorker.mjs',
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: 0.2 },
439 new TypeError('Invalid worker node tasks queue size: must be an integer')
443 it('Verify that pool worker choice strategy options can be set', async () => {
444 const pool = new FixedThreadPool(
446 './tests/worker-files/thread/testWorker.mjs',
447 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
449 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
450 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
451 .workerChoiceStrategies) {
452 expect(workerChoiceStrategy.opts).toStrictEqual({
453 runTime: { median: false },
454 waitTime: { median: false },
455 elu: { median: false },
456 weights: expect.objectContaining({
457 0: expect.any(Number),
458 [pool.info.maxSize - 1]: expect.any(Number),
463 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
481 pool.setWorkerChoiceStrategyOptions({
482 runTime: { median: true },
483 elu: { median: true },
485 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
486 runTime: { median: true },
487 elu: { median: true },
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
490 .workerChoiceStrategies) {
491 expect(workerChoiceStrategy.opts).toStrictEqual({
492 runTime: { median: true },
493 waitTime: { median: false },
494 elu: { median: true },
495 weights: expect.objectContaining({
496 0: expect.any(Number),
497 [pool.info.maxSize - 1]: expect.any(Number),
502 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: false },
522 elu: { median: false },
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
525 runTime: { median: false },
526 elu: { median: false },
528 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
529 .workerChoiceStrategies) {
530 expect(workerChoiceStrategy.opts).toStrictEqual({
531 runTime: { median: false },
532 waitTime: { median: false },
533 elu: { median: false },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number),
541 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
560 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
563 'Invalid worker choice strategy options: must be a plain object'
566 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
568 'Invalid worker choice strategy options: must have a weight for each worker node'
572 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
575 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
581 it('Verify that pool tasks queue can be enabled/disabled', async () => {
582 const pool = new FixedThreadPool(
584 './tests/worker-files/thread/testWorker.mjs'
586 expect(pool.opts.enableTasksQueue).toBe(false)
587 expect(pool.opts.tasksQueueOptions).toBeUndefined()
588 pool.enableTasksQueue(true)
589 expect(pool.opts.enableTasksQueue).toBe(true)
590 expect(pool.opts.tasksQueueOptions).toStrictEqual({
592 size: Math.pow(numberOfWorkers, 2),
594 tasksStealingOnBackPressure: false,
595 tasksFinishedTimeout: 2000,
597 pool.enableTasksQueue(true, { concurrency: 2 })
598 expect(pool.opts.enableTasksQueue).toBe(true)
599 expect(pool.opts.tasksQueueOptions).toStrictEqual({
601 size: Math.pow(numberOfWorkers, 2),
603 tasksStealingOnBackPressure: false,
604 tasksFinishedTimeout: 2000,
606 pool.enableTasksQueue(false)
607 expect(pool.opts.enableTasksQueue).toBe(false)
608 expect(pool.opts.tasksQueueOptions).toBeUndefined()
612 it('Verify that pool tasks queue options can be set', async () => {
613 const pool = new FixedThreadPool(
615 './tests/worker-files/thread/testWorker.mjs',
616 { enableTasksQueue: true }
618 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 size: Math.pow(numberOfWorkers, 2),
622 tasksStealingOnBackPressure: false,
623 tasksFinishedTimeout: 2000,
625 for (const workerNode of pool.workerNodes) {
626 expect(workerNode.tasksQueueBackPressureSize).toBe(
627 pool.opts.tasksQueueOptions.size
630 pool.setTasksQueueOptions({
634 tasksStealingOnBackPressure: false,
635 tasksFinishedTimeout: 3000,
637 expect(pool.opts.tasksQueueOptions).toStrictEqual({
641 tasksStealingOnBackPressure: false,
642 tasksFinishedTimeout: 3000,
644 for (const workerNode of pool.workerNodes) {
645 expect(workerNode.tasksQueueBackPressureSize).toBe(
646 pool.opts.tasksQueueOptions.size
649 pool.setTasksQueueOptions({
652 tasksStealingOnBackPressure: true,
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 size: Math.pow(numberOfWorkers, 2),
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 2000,
661 for (const workerNode of pool.workerNodes) {
662 expect(workerNode.tasksQueueBackPressureSize).toBe(
663 pool.opts.tasksQueueOptions.size
666 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
667 new TypeError('Invalid tasks queue options: must be a plain object')
669 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
671 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
674 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
676 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
679 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
680 new TypeError('Invalid worker node tasks concurrency: must be an integer')
682 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
684 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
687 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
689 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
692 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
693 new TypeError('Invalid worker node tasks queue size: must be an integer')
698 it('Verify that pool info is set', async () => {
699 let pool = new FixedThreadPool(
701 './tests/worker-files/thread/testWorker.mjs'
703 expect(pool.info).toStrictEqual({
705 type: PoolTypes.fixed,
706 worker: WorkerTypes.thread,
709 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
711 minSize: numberOfWorkers,
712 maxSize: numberOfWorkers,
713 workerNodes: numberOfWorkers,
714 idleWorkerNodes: numberOfWorkers,
721 pool = new DynamicClusterPool(
722 Math.floor(numberOfWorkers / 2),
724 './tests/worker-files/cluster/testWorker.cjs'
726 expect(pool.info).toStrictEqual({
728 type: PoolTypes.dynamic,
729 worker: WorkerTypes.cluster,
732 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
734 minSize: Math.floor(numberOfWorkers / 2),
735 maxSize: numberOfWorkers,
736 workerNodes: Math.floor(numberOfWorkers / 2),
737 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
746 it('Verify that pool worker tasks usage are initialized', async () => {
747 const pool = new FixedClusterPool(
749 './tests/worker-files/cluster/testWorker.cjs'
751 for (const workerNode of pool.workerNodes) {
752 expect(workerNode).toBeInstanceOf(WorkerNode)
753 expect(workerNode.usage).toStrictEqual({
759 sequentiallyStolen: 0,
764 history: expect.any(CircularBuffer),
767 history: expect.any(CircularBuffer),
771 history: expect.any(CircularBuffer),
774 history: expect.any(CircularBuffer),
782 it('Verify that pool worker tasks queue are initialized', async () => {
783 let pool = new FixedClusterPool(
785 './tests/worker-files/cluster/testWorker.cjs'
787 for (const workerNode of pool.workerNodes) {
788 expect(workerNode).toBeInstanceOf(WorkerNode)
789 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
790 expect(workerNode.tasksQueue.size).toBe(0)
791 expect(workerNode.tasksQueue.maxSize).toBe(0)
792 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
793 expect(workerNode.tasksQueue.enablePriority).toBe(false)
796 pool = new DynamicThreadPool(
797 Math.floor(numberOfWorkers / 2),
799 './tests/worker-files/thread/testWorker.mjs'
801 for (const workerNode of pool.workerNodes) {
802 expect(workerNode).toBeInstanceOf(WorkerNode)
803 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
804 expect(workerNode.tasksQueue.size).toBe(0)
805 expect(workerNode.tasksQueue.maxSize).toBe(0)
806 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
807 expect(workerNode.tasksQueue.enablePriority).toBe(false)
812 it('Verify that pool worker info are initialized', async () => {
813 let pool = new FixedClusterPool(
815 './tests/worker-files/cluster/testWorker.cjs'
817 for (const workerNode of pool.workerNodes) {
818 expect(workerNode).toBeInstanceOf(WorkerNode)
819 expect(workerNode.info).toStrictEqual({
820 id: expect.any(Number),
821 type: WorkerTypes.cluster,
829 pool = new DynamicThreadPool(
830 Math.floor(numberOfWorkers / 2),
832 './tests/worker-files/thread/testWorker.mjs'
834 for (const workerNode of pool.workerNodes) {
835 expect(workerNode).toBeInstanceOf(WorkerNode)
836 expect(workerNode.info).toStrictEqual({
837 id: expect.any(Number),
838 type: WorkerTypes.thread,
848 it('Verify that pool statuses are checked at start or destroy', async () => {
849 const pool = new FixedThreadPool(
851 './tests/worker-files/thread/testWorker.mjs'
853 expect(pool.info.started).toBe(true)
854 expect(pool.info.ready).toBe(true)
855 expect(() => pool.start()).toThrow(
856 new Error('Cannot start an already started pool')
859 expect(pool.info.started).toBe(false)
860 expect(pool.info.ready).toBe(false)
861 await expect(pool.destroy()).rejects.toThrow(
862 new Error('Cannot destroy an already destroyed pool')
866 it('Verify that pool can be started after initialization', async () => {
867 const pool = new FixedClusterPool(
869 './tests/worker-files/cluster/testWorker.cjs',
874 expect(pool.info.started).toBe(false)
875 expect(pool.info.ready).toBe(false)
876 expect(pool.workerNodes).toStrictEqual([])
877 expect(pool.readyEventEmitted).toBe(false)
878 await expect(pool.execute()).rejects.toThrow(
879 new Error('Cannot execute a task on not started pool')
882 expect(pool.info.started).toBe(true)
883 expect(pool.info.ready).toBe(true)
884 await waitPoolEvents(pool, PoolEvents.ready, 1)
885 expect(pool.readyEventEmitted).toBe(true)
886 expect(pool.workerNodes.length).toBe(numberOfWorkers)
887 for (const workerNode of pool.workerNodes) {
888 expect(workerNode).toBeInstanceOf(WorkerNode)
893 it('Verify that pool execute() arguments are checked', async () => {
894 const pool = new FixedClusterPool(
896 './tests/worker-files/cluster/testWorker.cjs'
898 await expect(pool.execute(undefined, 0)).rejects.toThrow(
899 new TypeError('name argument must be a string')
901 await expect(pool.execute(undefined, '')).rejects.toThrow(
902 new TypeError('name argument must not be an empty string')
904 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
905 new TypeError('transferList argument must be an array')
907 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
908 "Task function 'unknown' not found"
911 await expect(pool.execute()).rejects.toThrow(
912 new Error('Cannot execute a task on not started pool')
916 it('Verify that pool worker tasks usage are computed', async () => {
917 const pool = new FixedClusterPool(
919 './tests/worker-files/cluster/testWorker.cjs'
921 const promises = new Set()
922 const maxMultiplier = 2
923 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
924 promises.add(pool.execute())
926 for (const workerNode of pool.workerNodes) {
927 expect(workerNode.usage).toStrictEqual({
930 executing: maxMultiplier,
933 sequentiallyStolen: 0,
938 history: expect.any(CircularBuffer),
941 history: expect.any(CircularBuffer),
945 history: expect.any(CircularBuffer),
948 history: expect.any(CircularBuffer),
953 await Promise.all(promises)
954 for (const workerNode of pool.workerNodes) {
955 expect(workerNode.usage).toStrictEqual({
957 executed: maxMultiplier,
961 sequentiallyStolen: 0,
966 history: expect.any(CircularBuffer),
969 history: expect.any(CircularBuffer),
973 history: expect.any(CircularBuffer),
976 history: expect.any(CircularBuffer),
984 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
985 const pool = new DynamicThreadPool(
986 Math.floor(numberOfWorkers / 2),
988 './tests/worker-files/thread/testWorker.mjs'
990 const promises = new Set()
991 const maxMultiplier = 2
992 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
993 promises.add(pool.execute())
995 await Promise.all(promises)
996 for (const workerNode of pool.workerNodes) {
997 expect(workerNode.usage).toStrictEqual({
999 executed: expect.any(Number),
1003 sequentiallyStolen: 0,
1008 history: expect.any(CircularBuffer),
1011 history: expect.any(CircularBuffer),
1015 history: expect.any(CircularBuffer),
1018 history: expect.any(CircularBuffer),
1022 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1023 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1024 numberOfWorkers * maxMultiplier
1027 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1028 for (const workerNode of pool.workerNodes) {
1029 expect(workerNode.usage).toStrictEqual({
1031 executed: expect.any(Number),
1035 sequentiallyStolen: 0,
1040 history: expect.any(CircularBuffer),
1043 history: expect.any(CircularBuffer),
1047 history: expect.any(CircularBuffer),
1050 history: expect.any(CircularBuffer),
1054 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1055 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1056 numberOfWorkers * maxMultiplier
1059 await pool.destroy()
1062 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1063 const pool = new DynamicClusterPool(
1064 Math.floor(numberOfWorkers / 2),
1066 './tests/worker-files/cluster/testWorker.cjs'
1068 expect(pool.emitter.eventNames()).toStrictEqual([])
1071 pool.emitter.on(PoolEvents.ready, info => {
1075 await waitPoolEvents(pool, PoolEvents.ready, 1)
1076 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1077 expect(poolReady).toBe(1)
1078 expect(poolInfo).toStrictEqual({
1080 type: PoolTypes.dynamic,
1081 worker: WorkerTypes.cluster,
1084 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1085 strategyRetries: expect.any(Number),
1086 minSize: expect.any(Number),
1087 maxSize: expect.any(Number),
1088 workerNodes: expect.any(Number),
1089 idleWorkerNodes: expect.any(Number),
1090 busyWorkerNodes: expect.any(Number),
1091 executedTasks: expect.any(Number),
1092 executingTasks: expect.any(Number),
1093 failedTasks: expect.any(Number),
1095 await pool.destroy()
1098 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1099 const pool = new FixedThreadPool(
1101 './tests/worker-files/thread/testWorker.mjs'
1103 expect(pool.emitter.eventNames()).toStrictEqual([])
1104 const promises = new Set()
1107 pool.emitter.on(PoolEvents.busy, info => {
1111 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1112 for (let i = 0; i < numberOfWorkers * 2; i++) {
1113 promises.add(pool.execute())
1115 await Promise.all(promises)
1116 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1117 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1118 expect(poolBusy).toBe(numberOfWorkers + 1)
1119 expect(poolInfo).toStrictEqual({
1121 type: PoolTypes.fixed,
1122 worker: WorkerTypes.thread,
1125 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1126 strategyRetries: expect.any(Number),
1127 minSize: expect.any(Number),
1128 maxSize: expect.any(Number),
1129 workerNodes: expect.any(Number),
1130 idleWorkerNodes: expect.any(Number),
1131 busyWorkerNodes: expect.any(Number),
1132 executedTasks: expect.any(Number),
1133 executingTasks: expect.any(Number),
1134 failedTasks: expect.any(Number),
1136 await pool.destroy()
1139 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1140 const pool = new DynamicThreadPool(
1141 Math.floor(numberOfWorkers / 2),
1143 './tests/worker-files/thread/testWorker.mjs'
1145 expect(pool.emitter.eventNames()).toStrictEqual([])
1146 const promises = new Set()
1149 pool.emitter.on(PoolEvents.full, info => {
1153 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1154 for (let i = 0; i < numberOfWorkers * 2; i++) {
1155 promises.add(pool.execute())
1157 await Promise.all(promises)
1158 expect(poolFull).toBe(1)
1159 expect(poolInfo).toStrictEqual({
1161 type: PoolTypes.dynamic,
1162 worker: WorkerTypes.thread,
1165 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1166 strategyRetries: expect.any(Number),
1167 minSize: expect.any(Number),
1168 maxSize: expect.any(Number),
1169 workerNodes: expect.any(Number),
1170 idleWorkerNodes: expect.any(Number),
1171 busyWorkerNodes: expect.any(Number),
1172 executedTasks: expect.any(Number),
1173 executingTasks: expect.any(Number),
1174 failedTasks: expect.any(Number),
1176 await pool.destroy()
1179 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1180 const pool = new FixedThreadPool(
1182 './tests/worker-files/thread/testWorker.mjs',
1184 enableTasksQueue: true,
1187 stub(pool, 'hasBackPressure').returns(true)
1188 expect(pool.emitter.eventNames()).toStrictEqual([])
1189 const promises = new Set()
1190 let poolBackPressure = 0
1192 pool.emitter.on(PoolEvents.backPressure, info => {
1196 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1197 for (let i = 0; i < numberOfWorkers + 1; i++) {
1198 promises.add(pool.execute())
1200 await Promise.all(promises)
1201 expect(poolBackPressure).toBe(1)
1202 expect(poolInfo).toStrictEqual({
1204 type: PoolTypes.fixed,
1205 worker: WorkerTypes.thread,
1208 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1209 strategyRetries: expect.any(Number),
1210 minSize: expect.any(Number),
1211 maxSize: expect.any(Number),
1212 workerNodes: expect.any(Number),
1213 idleWorkerNodes: expect.any(Number),
1214 stealingWorkerNodes: expect.any(Number),
1215 busyWorkerNodes: expect.any(Number),
1216 executedTasks: expect.any(Number),
1217 executingTasks: expect.any(Number),
1218 maxQueuedTasks: expect.any(Number),
1219 queuedTasks: expect.any(Number),
1221 stolenTasks: expect.any(Number),
1222 failedTasks: expect.any(Number),
1224 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1225 await pool.destroy()
1228 it('Verify that destroy() waits for queued tasks to finish', async () => {
1229 const tasksFinishedTimeout = 2500
1230 const pool = new FixedThreadPool(
1232 './tests/worker-files/thread/asyncWorker.mjs',
1234 enableTasksQueue: true,
1235 tasksQueueOptions: { tasksFinishedTimeout },
1238 const maxMultiplier = 4
1239 let tasksFinished = 0
1240 for (const workerNode of pool.workerNodes) {
1241 workerNode.on('taskFinished', () => {
1245 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1248 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1249 const startTime = performance.now()
1250 await pool.destroy()
1251 const elapsedTime = performance.now() - startTime
1252 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1253 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1254 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1257 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1258 const tasksFinishedTimeout = 1000
1259 const pool = new FixedThreadPool(
1261 './tests/worker-files/thread/asyncWorker.mjs',
1263 enableTasksQueue: true,
1264 tasksQueueOptions: { tasksFinishedTimeout },
1267 const maxMultiplier = 4
1268 let tasksFinished = 0
1269 for (const workerNode of pool.workerNodes) {
1270 workerNode.on('taskFinished', () => {
1274 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1277 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1278 const startTime = performance.now()
1279 await pool.destroy()
1280 const elapsedTime = performance.now() - startTime
1281 expect(tasksFinished).toBe(0)
1282 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1285 it('Verify that pool asynchronous resource track tasks execution', async () => {
1290 let resolveCalls = 0
1291 const hook = createHook({
1292 init (asyncId, type) {
1293 if (type === 'poolifier:task') {
1295 taskAsyncId = asyncId
1299 if (asyncId === taskAsyncId) beforeCalls++
1302 if (asyncId === taskAsyncId) afterCalls++
1305 if (executionAsyncId() === taskAsyncId) resolveCalls++
1308 const pool = new FixedThreadPool(
1310 './tests/worker-files/thread/testWorker.mjs'
1313 await pool.execute()
1315 expect(initCalls).toBe(1)
1316 expect(beforeCalls).toBe(1)
1317 expect(afterCalls).toBe(1)
1318 expect(resolveCalls).toBe(1)
1319 await pool.destroy()
1322 it('Verify that hasTaskFunction() is working', async () => {
1323 const dynamicThreadPool = new DynamicThreadPool(
1324 Math.floor(numberOfWorkers / 2),
1326 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1328 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1329 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1330 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1333 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1334 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1335 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1336 await dynamicThreadPool.destroy()
1337 const fixedClusterPool = new FixedClusterPool(
1339 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1341 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1342 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1343 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1346 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1347 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1348 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1349 await fixedClusterPool.destroy()
1352 it('Verify that addTaskFunction() is working', async () => {
1353 const dynamicThreadPool = new DynamicThreadPool(
1354 Math.floor(numberOfWorkers / 2),
1356 './tests/worker-files/thread/testWorker.mjs'
1358 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1360 dynamicThreadPool.addTaskFunction(0, () => {})
1361 ).rejects.toThrow(new TypeError('name argument must be a string'))
1363 dynamicThreadPool.addTaskFunction('', () => {})
1365 new TypeError('name argument must not be an empty string')
1367 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1368 new TypeError('taskFunction property must be a function')
1370 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1371 new TypeError('taskFunction property must be a function')
1374 dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
1375 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1377 dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
1378 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1380 dynamicThreadPool.addTaskFunction('test', {
1381 taskFunction: () => {},
1385 new RangeError("Property 'priority' must be between -20 and 19")
1388 dynamicThreadPool.addTaskFunction('test', {
1389 taskFunction: () => {},
1393 new RangeError("Property 'priority' must be between -20 and 19")
1396 dynamicThreadPool.addTaskFunction('test', {
1397 taskFunction: () => {},
1398 strategy: 'invalidStrategy',
1401 new Error("Invalid worker choice strategy 'invalidStrategy'")
1403 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1404 { name: DEFAULT_TASK_NAME },
1408 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1409 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1410 const echoTaskFunction = data => {
1414 dynamicThreadPool.addTaskFunction('echo', {
1415 taskFunction: echoTaskFunction,
1416 strategy: WorkerChoiceStrategies.LEAST_ELU,
1418 ).resolves.toBe(true)
1419 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1420 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1421 taskFunction: echoTaskFunction,
1422 strategy: WorkerChoiceStrategies.LEAST_ELU,
1425 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1427 WorkerChoiceStrategies.ROUND_ROBIN,
1428 WorkerChoiceStrategies.LEAST_ELU,
1430 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1431 { name: DEFAULT_TASK_NAME },
1433 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
1435 const taskFunctionData = { test: 'test' }
1436 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1437 expect(echoResult).toStrictEqual(taskFunctionData)
1438 for (const workerNode of dynamicThreadPool.workerNodes) {
1439 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1441 executed: expect.any(Number),
1444 sequentiallyStolen: 0,
1449 history: expect.any(CircularBuffer),
1452 history: expect.any(CircularBuffer),
1454 elu: expect.objectContaining({
1455 idle: expect.objectContaining({
1456 history: expect.any(CircularBuffer),
1458 active: expect.objectContaining({
1459 history: expect.any(CircularBuffer),
1464 workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
1465 ).toBeGreaterThan(0)
1467 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
1471 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1475 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1476 ).toBeGreaterThan(0)
1479 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
1482 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1486 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1487 ).toBeGreaterThanOrEqual(0)
1490 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
1493 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1497 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1498 ).toBeGreaterThanOrEqual(0)
1500 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1501 ).toBeLessThanOrEqual(1)
1504 await dynamicThreadPool.destroy()
1507 it('Verify that removeTaskFunction() is working', async () => {
1508 const dynamicThreadPool = new DynamicThreadPool(
1509 Math.floor(numberOfWorkers / 2),
1511 './tests/worker-files/thread/testWorker.mjs'
1513 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1514 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1515 { name: DEFAULT_TASK_NAME },
1518 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1519 new Error('Cannot remove a task function not handled on the pool side')
1521 const echoTaskFunction = data => {
1524 await dynamicThreadPool.addTaskFunction('echo', {
1525 taskFunction: echoTaskFunction,
1526 strategy: WorkerChoiceStrategies.LEAST_ELU,
1528 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1529 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1530 taskFunction: echoTaskFunction,
1531 strategy: WorkerChoiceStrategies.LEAST_ELU,
1534 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1536 WorkerChoiceStrategies.ROUND_ROBIN,
1537 WorkerChoiceStrategies.LEAST_ELU,
1539 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1540 { name: DEFAULT_TASK_NAME },
1542 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
1544 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1547 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1548 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1550 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1551 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1552 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1553 { name: DEFAULT_TASK_NAME },
1556 await dynamicThreadPool.destroy()
1559 it('Verify that listTaskFunctionsProperties() is working', async () => {
1560 const dynamicThreadPool = new DynamicThreadPool(
1561 Math.floor(numberOfWorkers / 2),
1563 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1565 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1566 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1567 { name: DEFAULT_TASK_NAME },
1568 { name: 'jsonIntegerSerialization' },
1569 { name: 'factorial' },
1570 { name: 'fibonacci' },
1572 await dynamicThreadPool.destroy()
1573 const fixedClusterPool = new FixedClusterPool(
1575 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1577 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1578 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1579 { name: DEFAULT_TASK_NAME },
1580 { name: 'jsonIntegerSerialization' },
1581 { name: 'factorial' },
1582 { name: 'fibonacci' },
1584 await fixedClusterPool.destroy()
1587 it('Verify that setDefaultTaskFunction() is working', async () => {
1588 const dynamicThreadPool = new DynamicThreadPool(
1589 Math.floor(numberOfWorkers / 2),
1591 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1593 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1594 const workerId = dynamicThreadPool.workerNodes[0].info.id
1595 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1597 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1601 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1604 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1608 dynamicThreadPool.setDefaultTaskFunction('unknown')
1611 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1614 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1615 { name: DEFAULT_TASK_NAME },
1616 { name: 'jsonIntegerSerialization' },
1617 { name: 'factorial' },
1618 { name: 'fibonacci' },
1621 dynamicThreadPool.setDefaultTaskFunction('factorial')
1622 ).resolves.toBe(true)
1623 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1624 { name: DEFAULT_TASK_NAME },
1625 { name: 'factorial' },
1626 { name: 'jsonIntegerSerialization' },
1627 { name: 'fibonacci' },
1630 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1631 ).resolves.toBe(true)
1632 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1633 { name: DEFAULT_TASK_NAME },
1634 { name: 'fibonacci' },
1635 { name: 'jsonIntegerSerialization' },
1636 { name: 'factorial' },
1638 await dynamicThreadPool.destroy()
1641 it('Verify that multiple task functions worker is working', async () => {
1642 const pool = new DynamicClusterPool(
1643 Math.floor(numberOfWorkers / 2),
1645 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1647 const data = { n: 10 }
1648 const result0 = await pool.execute(data)
1649 expect(result0).toStrictEqual({ ok: 1 })
1650 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1651 expect(result1).toStrictEqual({ ok: 1 })
1652 const result2 = await pool.execute(data, 'factorial')
1653 expect(result2).toBe(3628800)
1654 const result3 = await pool.execute(data, 'fibonacci')
1655 expect(result3).toBe(55)
1656 expect(pool.info.executingTasks).toBe(0)
1657 expect(pool.info.executedTasks).toBe(4)
1658 for (const workerNode of pool.workerNodes) {
1659 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1660 { name: DEFAULT_TASK_NAME },
1661 { name: 'jsonIntegerSerialization' },
1662 { name: 'factorial' },
1663 { name: 'fibonacci' },
1665 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1666 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1667 expect(workerNode.tasksQueue.enablePriority).toBe(false)
1668 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1670 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1673 executed: expect.any(Number),
1677 sequentiallyStolen: 0,
1681 history: expect.any(CircularBuffer),
1684 history: expect.any(CircularBuffer),
1688 history: expect.any(CircularBuffer),
1691 history: expect.any(CircularBuffer),
1696 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1698 ).toBeGreaterThan(0)
1701 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1703 workerNode.getTaskFunctionWorkerUsage(
1704 workerNode.info.taskFunctionsProperties[1].name
1708 await pool.destroy()
1711 it('Verify that mapExecute() is working', async () => {
1712 const pool = new DynamicThreadPool(
1713 Math.floor(numberOfWorkers / 2),
1715 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1717 expect(() => pool.mapExecute()).toThrow(
1718 new TypeError('data argument must be a defined iterable')
1720 expect(() => pool.mapExecute(0)).toThrow(
1721 new TypeError('data argument must be an iterable')
1723 let results = await pool.mapExecute([{}, {}, {}, {}])
1724 expect(results).toStrictEqual([{ ok: 1 }, { ok: 1 }, { ok: 1 }, { ok: 1 }])
1725 expect(pool.info.executingTasks).toBe(0)
1726 expect(pool.info.executedTasks).toBe(4)
1727 results = await pool.mapExecute(
1728 [{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }],
1731 expect(results).toStrictEqual([
1732 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47,
1734 expect(pool.info.executingTasks).toBe(0)
1735 expect(pool.info.executedTasks).toBe(8)
1736 results = await pool.mapExecute(
1737 new Set([{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }]),
1740 expect(results).toStrictEqual([
1741 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47,
1743 expect(pool.info.executingTasks).toBe(0)
1744 expect(pool.info.executedTasks).toBe(12)
1745 await pool.destroy()
1748 it('Verify that task function objects worker is working', async () => {
1749 const pool = new DynamicThreadPool(
1750 Math.floor(numberOfWorkers / 2),
1752 './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs'
1754 const data = { n: 10 }
1755 const result0 = await pool.execute(data)
1756 expect(result0).toStrictEqual({ ok: 1 })
1757 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1758 expect(result1).toStrictEqual({ ok: 1 })
1759 const result2 = await pool.execute(data, 'factorial')
1760 expect(result2).toBe(3628800)
1761 const result3 = await pool.execute(data, 'fibonacci')
1762 expect(result3).toBe(55)
1763 expect(pool.info.executingTasks).toBe(0)
1764 expect(pool.info.executedTasks).toBe(4)
1765 for (const workerNode of pool.workerNodes) {
1766 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1767 { name: DEFAULT_TASK_NAME },
1768 { name: 'jsonIntegerSerialization' },
1769 { name: 'factorial' },
1770 { name: 'fibonacci', priority: -5 },
1772 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1773 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1774 expect(workerNode.tasksQueue.enablePriority).toBe(true)
1775 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1777 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1780 executed: expect.any(Number),
1784 sequentiallyStolen: 0,
1788 history: expect.any(CircularBuffer),
1791 history: expect.any(CircularBuffer),
1795 history: expect.any(CircularBuffer),
1798 history: expect.any(CircularBuffer),
1803 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1805 ).toBeGreaterThan(0)
1808 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1810 workerNode.getTaskFunctionWorkerUsage(
1811 workerNode.info.taskFunctionsProperties[1].name
1815 await pool.destroy()
1818 it('Verify sendKillMessageToWorker()', async () => {
1819 const pool = new DynamicClusterPool(
1820 Math.floor(numberOfWorkers / 2),
1822 './tests/worker-files/cluster/testWorker.cjs'
1824 const workerNodeKey = 0
1826 pool.sendKillMessageToWorker(workerNodeKey)
1827 ).resolves.toBeUndefined()
1828 await pool.destroy()
1831 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1832 const pool = new DynamicClusterPool(
1833 Math.floor(numberOfWorkers / 2),
1835 './tests/worker-files/cluster/testWorker.cjs'
1837 const workerNodeKey = 0
1839 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1840 taskFunctionOperation: 'add',
1841 taskFunctionProperties: { name: 'empty' },
1842 taskFunction: (() => {}).toString(),
1844 ).resolves.toBe(true)
1846 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
1848 { name: DEFAULT_TASK_NAME },
1852 await pool.destroy()
1855 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1856 const pool = new DynamicClusterPool(
1857 Math.floor(numberOfWorkers / 2),
1859 './tests/worker-files/cluster/testWorker.cjs'
1862 pool.sendTaskFunctionOperationToWorkers({
1863 taskFunctionOperation: 'add',
1864 taskFunctionProperties: { name: 'empty' },
1865 taskFunction: (() => {}).toString(),
1867 ).resolves.toBe(true)
1868 for (const workerNode of pool.workerNodes) {
1869 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1870 { name: DEFAULT_TASK_NAME },
1875 await pool.destroy()