1 import { expect } from 'expect'
2 // eslint-disable-next-line n/no-unsupported-features/node-builtins
3 import { createHook, executionAsyncId } from 'node:async_hooks'
4 import { EventEmitterAsyncResource } from 'node:events'
5 import { readFileSync } from 'node:fs'
6 import { dirname, join } from 'node:path'
7 import { fileURLToPath } from 'node:url'
9 import { CircularBuffer } from '../../lib/circular-buffer.cjs'
17 WorkerChoiceStrategies,
19 } from '../../lib/index.cjs'
20 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
21 import { PriorityQueue } from '../../lib/queues/priority-queue.cjs'
22 import { defaultBucketSize } from '../../lib/queues/queue-types.cjs'
23 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
24 import { waitPoolEvents } from '../test-utils.cjs'
26 describe('Abstract pool test suite', () => {
27 const version = JSON.parse(
29 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
33 const numberOfWorkers = 2
34 class StubPoolWithIsMain extends FixedThreadPool {
40 it('Verify that pool can be created and destroyed', async () => {
41 const pool = new FixedThreadPool(
43 './tests/worker-files/thread/testWorker.mjs'
45 expect(pool).toBeInstanceOf(FixedThreadPool)
49 it('Verify that pool cannot be created from a non main thread/process', () => {
52 new StubPoolWithIsMain(
54 './tests/worker-files/thread/testWorker.mjs',
56 errorHandler: e => console.error(e),
61 'Cannot start a pool from a worker with the same type as the pool'
66 it('Verify that pool statuses properties are set', async () => {
67 const pool = new FixedThreadPool(
69 './tests/worker-files/thread/testWorker.mjs'
71 expect(pool.started).toBe(true)
72 expect(pool.starting).toBe(false)
73 expect(pool.destroying).toBe(false)
75 expect(pool.started).toBe(false)
76 expect(pool.starting).toBe(false)
77 expect(pool.destroying).toBe(false)
80 it('Verify that filePath is checked', () => {
81 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
82 new TypeError('The worker file path must be specified')
84 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
85 new TypeError('The worker file path must be a string')
88 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
89 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
92 it('Verify that numberOfWorkers is checked', () => {
97 './tests/worker-files/thread/testWorker.mjs'
101 'Cannot instantiate a pool without specifying the number of workers'
106 it('Verify that a negative number of workers is checked', () => {
109 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
112 'Cannot instantiate a pool with a negative number of workers'
117 it('Verify that a non integer number of workers is checked', () => {
120 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
123 'Cannot instantiate a pool with a non safe integer number of workers'
128 it('Verify that pool arguments number and pool type are checked', () => {
133 './tests/worker-files/thread/testWorker.mjs',
139 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
144 it('Verify that dynamic pool sizing is checked', () => {
147 new DynamicClusterPool(
150 './tests/worker-files/cluster/testWorker.cjs'
154 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
159 new DynamicThreadPool(
162 './tests/worker-files/thread/testWorker.mjs'
166 'Cannot instantiate a pool with a non safe integer number of workers'
171 new DynamicClusterPool(
174 './tests/worker-files/cluster/testWorker.cjs'
178 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
183 new DynamicThreadPool(
186 './tests/worker-files/thread/testWorker.mjs'
190 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
195 new DynamicThreadPool(
198 './tests/worker-files/thread/testWorker.mjs'
202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
207 new DynamicClusterPool(
210 './tests/worker-files/cluster/testWorker.cjs'
214 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
219 it('Verify that pool options are checked', async () => {
220 let pool = new FixedThreadPool(
222 './tests/worker-files/thread/testWorker.mjs'
224 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
225 expect(pool.emitter.eventNames()).toStrictEqual([])
226 expect(pool.opts).toStrictEqual({
228 enableTasksQueue: false,
229 restartWorkerOnError: true,
231 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
233 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
234 .workerChoiceStrategies) {
235 expect(workerChoiceStrategy.opts).toStrictEqual({
236 elu: { median: false },
237 runTime: { median: false },
238 waitTime: { median: false },
239 weights: expect.objectContaining({
240 0: expect.any(Number),
241 [pool.info.maxSize - 1]: expect.any(Number),
246 const testHandler = () => console.info('test handler executed')
247 pool = new FixedThreadPool(
249 './tests/worker-files/thread/testWorker.mjs',
252 enableTasksQueue: true,
253 errorHandler: testHandler,
254 exitHandler: testHandler,
255 messageHandler: testHandler,
256 onlineHandler: testHandler,
257 restartWorkerOnError: false,
258 tasksQueueOptions: { concurrency: 2 },
259 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
260 workerChoiceStrategyOptions: {
261 runTime: { median: true },
262 weights: { 0: 300, 1: 200 },
266 expect(pool.emitter).toBeUndefined()
267 expect(pool.opts).toStrictEqual({
269 enableTasksQueue: true,
270 errorHandler: testHandler,
271 exitHandler: testHandler,
272 messageHandler: testHandler,
273 onlineHandler: testHandler,
274 restartWorkerOnError: false,
278 size: Math.pow(numberOfWorkers, 2),
279 tasksFinishedTimeout: 2000,
280 tasksStealingOnBackPressure: true,
281 tasksStealingRatio: 0.6,
284 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
285 workerChoiceStrategyOptions: {
286 runTime: { median: true },
287 weights: { 0: 300, 1: 200 },
290 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
291 .workerChoiceStrategies) {
292 expect(workerChoiceStrategy.opts).toStrictEqual({
293 elu: { median: false },
294 runTime: { median: true },
295 waitTime: { median: false },
296 weights: { 0: 300, 1: 200 },
302 it('Verify that pool options are validated', () => {
307 './tests/worker-files/thread/testWorker.mjs',
309 workerChoiceStrategy: 'invalidStrategy',
312 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
317 './tests/worker-files/thread/testWorker.mjs',
319 workerChoiceStrategyOptions: { weights: {} },
324 'Invalid worker choice strategy options: must have a weight for each worker node'
331 './tests/worker-files/thread/testWorker.mjs',
333 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' },
338 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
345 './tests/worker-files/thread/testWorker.mjs',
347 enableTasksQueue: true,
348 tasksQueueOptions: 'invalidTasksQueueOptions',
352 new TypeError('Invalid tasks queue options: must be a plain object')
358 './tests/worker-files/thread/testWorker.mjs',
360 enableTasksQueue: true,
361 tasksQueueOptions: { concurrency: 0 },
366 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
373 './tests/worker-files/thread/testWorker.mjs',
375 enableTasksQueue: true,
376 tasksQueueOptions: { concurrency: -1 },
381 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
388 './tests/worker-files/thread/testWorker.mjs',
390 enableTasksQueue: true,
391 tasksQueueOptions: { concurrency: 0.2 },
395 new TypeError('Invalid worker node tasks concurrency: must be an integer')
401 './tests/worker-files/thread/testWorker.mjs',
403 enableTasksQueue: true,
404 tasksQueueOptions: { size: 0 },
409 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
416 './tests/worker-files/thread/testWorker.mjs',
418 enableTasksQueue: true,
419 tasksQueueOptions: { size: -1 },
424 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
431 './tests/worker-files/thread/testWorker.mjs',
433 enableTasksQueue: true,
434 tasksQueueOptions: { size: 0.2 },
438 new TypeError('Invalid worker node tasks queue size: must be an integer')
444 './tests/worker-files/thread/testWorker.mjs',
446 enableTasksQueue: true,
447 tasksQueueOptions: { tasksStealingRatio: '' },
452 'Invalid worker node tasks stealing ratio: must be a number'
459 './tests/worker-files/thread/testWorker.mjs',
461 enableTasksQueue: true,
462 tasksQueueOptions: { tasksStealingRatio: 1.1 },
467 'Invalid worker node tasks stealing ratio: must be between 0 and 1'
472 it('Verify that pool worker choice strategy options can be set', async () => {
473 const pool = new FixedThreadPool(
475 './tests/worker-files/thread/testWorker.mjs',
476 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
478 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
479 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
480 .workerChoiceStrategies) {
481 expect(workerChoiceStrategy.opts).toStrictEqual({
482 elu: { median: false },
483 runTime: { median: false },
484 waitTime: { median: false },
485 weights: expect.objectContaining({
486 0: expect.any(Number),
487 [pool.info.maxSize - 1]: expect.any(Number),
492 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
510 pool.setWorkerChoiceStrategyOptions({
511 elu: { median: true },
512 runTime: { median: true },
514 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
515 elu: { median: true },
516 runTime: { median: true },
518 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
519 .workerChoiceStrategies) {
520 expect(workerChoiceStrategy.opts).toStrictEqual({
521 elu: { median: true },
522 runTime: { median: true },
523 waitTime: { median: false },
524 weights: expect.objectContaining({
525 0: expect.any(Number),
526 [pool.info.maxSize - 1]: expect.any(Number),
531 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
549 pool.setWorkerChoiceStrategyOptions({
550 elu: { median: false },
551 runTime: { median: false },
553 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
554 elu: { median: false },
555 runTime: { median: false },
557 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
558 .workerChoiceStrategies) {
559 expect(workerChoiceStrategy.opts).toStrictEqual({
560 elu: { median: false },
561 runTime: { median: false },
562 waitTime: { median: false },
563 weights: expect.objectContaining({
564 0: expect.any(Number),
565 [pool.info.maxSize - 1]: expect.any(Number),
570 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
589 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
592 'Invalid worker choice strategy options: must be a plain object'
595 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
597 'Invalid worker choice strategy options: must have a weight for each worker node'
601 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
604 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
610 it('Verify that pool tasks queue can be enabled/disabled', async () => {
611 const pool = new FixedThreadPool(
613 './tests/worker-files/thread/testWorker.mjs'
615 expect(pool.opts.enableTasksQueue).toBe(false)
616 expect(pool.opts.tasksQueueOptions).toBeUndefined()
617 pool.enableTasksQueue(true)
618 expect(pool.opts.enableTasksQueue).toBe(true)
619 expect(pool.opts.tasksQueueOptions).toStrictEqual({
621 size: Math.pow(numberOfWorkers, 2),
622 tasksFinishedTimeout: 2000,
623 tasksStealingOnBackPressure: true,
624 tasksStealingRatio: 0.6,
627 pool.enableTasksQueue(true, { concurrency: 2 })
628 expect(pool.opts.enableTasksQueue).toBe(true)
629 expect(pool.opts.tasksQueueOptions).toStrictEqual({
631 size: Math.pow(numberOfWorkers, 2),
632 tasksFinishedTimeout: 2000,
633 tasksStealingOnBackPressure: true,
634 tasksStealingRatio: 0.6,
637 pool.enableTasksQueue(false)
638 expect(pool.opts.enableTasksQueue).toBe(false)
639 expect(pool.opts.tasksQueueOptions).toBeUndefined()
643 it('Verify that pool tasks queue options can be set', async () => {
644 const pool = new FixedThreadPool(
646 './tests/worker-files/thread/testWorker.mjs',
647 { enableTasksQueue: true }
649 expect(pool.opts.tasksQueueOptions).toStrictEqual({
651 size: Math.pow(numberOfWorkers, 2),
652 tasksFinishedTimeout: 2000,
653 tasksStealingOnBackPressure: true,
654 tasksStealingRatio: 0.6,
657 for (const workerNode of pool.workerNodes) {
658 expect(workerNode.tasksQueueBackPressureSize).toBe(
659 pool.opts.tasksQueueOptions.size
662 pool.setTasksQueueOptions({
665 tasksFinishedTimeout: 3000,
666 tasksStealingOnBackPressure: false,
667 tasksStealingRatio: 0.5,
670 expect(pool.opts.tasksQueueOptions).toStrictEqual({
673 tasksFinishedTimeout: 3000,
674 tasksStealingOnBackPressure: false,
675 tasksStealingRatio: 0.5,
678 for (const workerNode of pool.workerNodes) {
679 expect(workerNode.tasksQueueBackPressureSize).toBe(
680 pool.opts.tasksQueueOptions.size
683 pool.setTasksQueueOptions({
685 tasksStealingOnBackPressure: true,
688 expect(pool.opts.tasksQueueOptions).toStrictEqual({
691 tasksFinishedTimeout: 3000,
692 tasksStealingOnBackPressure: true,
693 tasksStealingRatio: 0.5,
696 for (const workerNode of pool.workerNodes) {
697 expect(workerNode.tasksQueueBackPressureSize).toBe(
698 pool.opts.tasksQueueOptions.size
701 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
702 new TypeError('Invalid tasks queue options: must be a plain object')
704 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
706 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
709 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
711 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
714 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
715 new TypeError('Invalid worker node tasks concurrency: must be an integer')
717 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
719 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
722 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
724 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
727 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
728 new TypeError('Invalid worker node tasks queue size: must be an integer')
730 expect(() => pool.setTasksQueueOptions({ tasksStealingRatio: '' })).toThrow(
732 'Invalid worker node tasks stealing ratio: must be a number'
736 pool.setTasksQueueOptions({ tasksStealingRatio: 1.1 })
739 'Invalid worker node tasks stealing ratio: must be between 0 and 1'
745 it('Verify that pool info is set', async () => {
746 let pool = new FixedThreadPool(
748 './tests/worker-files/thread/testWorker.mjs'
750 expect(pool.info).toStrictEqual({
752 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
756 idleWorkerNodes: numberOfWorkers,
757 maxSize: numberOfWorkers,
758 minSize: numberOfWorkers,
762 type: PoolTypes.fixed,
764 worker: WorkerTypes.thread,
765 workerNodes: numberOfWorkers,
768 pool = new DynamicClusterPool(
769 Math.floor(numberOfWorkers / 2),
771 './tests/worker-files/cluster/testWorker.cjs'
773 expect(pool.info).toStrictEqual({
775 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
776 dynamicWorkerNodes: 0,
780 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
781 maxSize: numberOfWorkers,
782 minSize: Math.floor(numberOfWorkers / 2),
786 type: PoolTypes.dynamic,
788 worker: WorkerTypes.cluster,
789 workerNodes: Math.floor(numberOfWorkers / 2),
794 it('Verify that pool worker tasks usage are initialized', async () => {
795 const pool = new FixedClusterPool(
797 './tests/worker-files/cluster/testWorker.cjs'
799 for (const workerNode of pool.workerNodes) {
800 expect(workerNode).toBeInstanceOf(WorkerNode)
801 expect(workerNode.usage).toStrictEqual({
804 history: expect.any(CircularBuffer),
807 history: expect.any(CircularBuffer),
811 history: expect.any(CircularBuffer),
819 sequentiallyStolen: 0,
823 history: expect.any(CircularBuffer),
830 it('Verify that pool worker tasks queue are initialized', async () => {
831 let pool = new FixedClusterPool(
833 './tests/worker-files/cluster/testWorker.cjs'
835 for (const workerNode of pool.workerNodes) {
836 expect(workerNode).toBeInstanceOf(WorkerNode)
837 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
838 expect(workerNode.tasksQueue.size).toBe(0)
839 expect(workerNode.tasksQueue.maxSize).toBe(0)
840 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
841 expect(workerNode.tasksQueue.enablePriority).toBe(false)
844 pool = new DynamicThreadPool(
845 Math.floor(numberOfWorkers / 2),
847 './tests/worker-files/thread/testWorker.mjs'
849 for (const workerNode of pool.workerNodes) {
850 expect(workerNode).toBeInstanceOf(WorkerNode)
851 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
852 expect(workerNode.tasksQueue.size).toBe(0)
853 expect(workerNode.tasksQueue.maxSize).toBe(0)
854 expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
855 expect(workerNode.tasksQueue.enablePriority).toBe(false)
860 it('Verify that pool worker info are initialized', async () => {
861 let pool = new FixedClusterPool(
863 './tests/worker-files/cluster/testWorker.cjs'
865 for (const workerNode of pool.workerNodes) {
866 expect(workerNode).toBeInstanceOf(WorkerNode)
867 expect(workerNode.info).toStrictEqual({
869 backPressureStealing: false,
870 continuousStealing: false,
872 id: expect.any(Number),
876 type: WorkerTypes.cluster,
880 pool = new DynamicThreadPool(
881 Math.floor(numberOfWorkers / 2),
883 './tests/worker-files/thread/testWorker.mjs'
885 for (const workerNode of pool.workerNodes) {
886 expect(workerNode).toBeInstanceOf(WorkerNode)
887 expect(workerNode.info).toStrictEqual({
889 backPressureStealing: false,
890 continuousStealing: false,
892 id: expect.any(Number),
896 type: WorkerTypes.thread,
902 it('Verify that pool statuses are checked at start or destroy', async () => {
903 const pool = new FixedThreadPool(
905 './tests/worker-files/thread/testWorker.mjs'
907 expect(pool.info.started).toBe(true)
908 expect(pool.info.ready).toBe(true)
909 expect(() => pool.start()).toThrow(
910 new Error('Cannot start an already started pool')
913 expect(pool.info.started).toBe(false)
914 expect(pool.info.ready).toBe(false)
915 await expect(pool.destroy()).rejects.toThrow(
916 new Error('Cannot destroy an already destroyed pool')
920 it('Verify that pool can be started after initialization', async () => {
921 const pool = new FixedClusterPool(
923 './tests/worker-files/cluster/testWorker.cjs',
928 expect(pool.info.started).toBe(false)
929 expect(pool.info.ready).toBe(false)
930 expect(pool.workerNodes).toStrictEqual([])
931 expect(pool.readyEventEmitted).toBe(false)
932 expect(pool.busyEventEmitted).toBe(false)
933 expect(pool.backPressureEventEmitted).toBe(false)
935 expect(pool.info.started).toBe(true)
936 expect(pool.info.ready).toBe(true)
937 await waitPoolEvents(pool, PoolEvents.ready, 1)
938 expect(pool.readyEventEmitted).toBe(true)
939 expect(pool.busyEventEmitted).toBe(false)
940 expect(pool.backPressureEventEmitted).toBe(false)
941 expect(pool.workerNodes.length).toBe(numberOfWorkers)
942 for (const workerNode of pool.workerNodes) {
943 expect(workerNode).toBeInstanceOf(WorkerNode)
948 it('Verify that pool execute() arguments are checked', async () => {
949 const pool = new FixedClusterPool(
951 './tests/worker-files/cluster/testWorker.cjs'
953 await expect(pool.execute(undefined, 0)).rejects.toThrow(
954 new TypeError('name argument must be a string')
956 await expect(pool.execute(undefined, '')).rejects.toThrow(
957 new TypeError('name argument must not be an empty string')
959 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
960 new TypeError('transferList argument must be an array')
962 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
963 "Task function 'unknown' not found"
966 await expect(pool.execute()).rejects.toThrow(
967 new Error('Cannot execute a task on not started pool')
971 it('Verify that pool worker tasks usage are computed', async () => {
972 const pool = new FixedClusterPool(
974 './tests/worker-files/cluster/testWorker.cjs'
976 const promises = new Set()
977 const maxMultiplier = 2
978 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
979 promises.add(pool.execute())
981 for (const workerNode of pool.workerNodes) {
982 expect(workerNode.usage).toStrictEqual({
985 history: expect.any(CircularBuffer),
988 history: expect.any(CircularBuffer),
992 history: expect.any(CircularBuffer),
996 executing: maxMultiplier,
1000 sequentiallyStolen: 0,
1004 history: expect.any(CircularBuffer),
1008 await Promise.all(promises)
1009 for (const workerNode of pool.workerNodes) {
1010 expect(workerNode.usage).toStrictEqual({
1013 history: expect.any(CircularBuffer),
1016 history: expect.any(CircularBuffer),
1020 history: expect.any(CircularBuffer),
1023 executed: maxMultiplier,
1028 sequentiallyStolen: 0,
1032 history: expect.any(CircularBuffer),
1036 await pool.destroy()
1039 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
1040 const pool = new DynamicThreadPool(
1041 Math.floor(numberOfWorkers / 2),
1043 './tests/worker-files/thread/testWorker.mjs'
1045 const promises = new Set()
1046 const maxMultiplier = 2
1047 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1048 promises.add(pool.execute())
1050 await Promise.all(promises)
1051 for (const workerNode of pool.workerNodes) {
1052 expect(workerNode.usage).toStrictEqual({
1055 history: expect.any(CircularBuffer),
1058 history: expect.any(CircularBuffer),
1062 history: expect.any(CircularBuffer),
1065 executed: expect.any(Number),
1070 sequentiallyStolen: 0,
1074 history: expect.any(CircularBuffer),
1077 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1078 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1079 numberOfWorkers * maxMultiplier
1082 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1083 for (const workerNode of pool.workerNodes) {
1084 expect(workerNode.usage).toStrictEqual({
1087 history: expect.any(CircularBuffer),
1090 history: expect.any(CircularBuffer),
1094 history: expect.any(CircularBuffer),
1097 executed: expect.any(Number),
1102 sequentiallyStolen: 0,
1106 history: expect.any(CircularBuffer),
1109 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1110 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1111 numberOfWorkers * maxMultiplier
1114 await pool.destroy()
1117 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1118 const pool = new DynamicClusterPool(
1119 Math.floor(numberOfWorkers / 2),
1121 './tests/worker-files/cluster/testWorker.cjs'
1123 expect(pool.emitter.eventNames()).toStrictEqual([])
1126 pool.emitter.on(PoolEvents.ready, info => {
1130 await waitPoolEvents(pool, PoolEvents.ready, 1)
1131 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1132 expect(poolReady).toBe(1)
1133 expect(poolInfo).toStrictEqual({
1135 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1136 dynamicWorkerNodes: 0,
1140 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
1141 maxSize: numberOfWorkers,
1142 minSize: Math.floor(numberOfWorkers / 2),
1145 strategyRetries: expect.any(Number),
1146 type: PoolTypes.dynamic,
1148 worker: WorkerTypes.cluster,
1149 workerNodes: Math.floor(numberOfWorkers / 2),
1151 await pool.destroy()
1154 it("Verify that pool event emitter 'busy' and 'busyEnd' events can register a callback", async () => {
1155 const pool = new FixedThreadPool(
1157 './tests/worker-files/thread/testWorker.mjs'
1159 expect(pool.emitter.eventNames()).toStrictEqual([])
1160 const promises = new Set()
1163 pool.emitter.on(PoolEvents.busy, info => {
1169 pool.emitter.on(PoolEvents.busyEnd, info => {
1171 poolBusyEndInfo = info
1173 expect(pool.emitter.eventNames()).toStrictEqual([
1177 for (let i = 0; i < numberOfWorkers * 2; i++) {
1178 promises.add(pool.execute())
1180 await Promise.all(promises)
1181 expect(poolBusy).toBe(1)
1182 expect(poolBusyInfo).toStrictEqual({
1183 busyWorkerNodes: numberOfWorkers,
1184 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1185 executedTasks: expect.any(Number),
1186 executingTasks: expect.any(Number),
1187 failedTasks: expect.any(Number),
1189 maxSize: numberOfWorkers,
1190 minSize: numberOfWorkers,
1193 strategyRetries: expect.any(Number),
1194 type: PoolTypes.fixed,
1196 worker: WorkerTypes.thread,
1197 workerNodes: numberOfWorkers,
1199 expect(poolBusyEnd).toBe(1)
1200 expect(poolBusyEndInfo).toStrictEqual({
1201 busyWorkerNodes: expect.any(Number),
1202 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1203 executedTasks: expect.any(Number),
1204 executingTasks: expect.any(Number),
1205 failedTasks: expect.any(Number),
1206 idleWorkerNodes: expect.any(Number),
1207 maxSize: numberOfWorkers,
1208 minSize: numberOfWorkers,
1211 strategyRetries: expect.any(Number),
1212 type: PoolTypes.fixed,
1214 worker: WorkerTypes.thread,
1215 workerNodes: numberOfWorkers,
1217 expect(poolBusyEndInfo.busyWorkerNodes).toBeLessThan(numberOfWorkers)
1218 await pool.destroy()
1221 it("Verify that pool event emitter 'full' and 'fullEnd' events can register a callback", async () => {
1222 const pool = new DynamicClusterPool(
1223 Math.floor(numberOfWorkers / 2),
1225 './tests/worker-files/cluster/testWorker.cjs'
1227 expect(pool.emitter.eventNames()).toStrictEqual([])
1228 const promises = new Set()
1231 pool.emitter.on(PoolEvents.full, info => {
1237 pool.emitter.on(PoolEvents.fullEnd, info => {
1239 poolFullEndInfo = info
1241 expect(pool.emitter.eventNames()).toStrictEqual([
1245 for (let i = 0; i < numberOfWorkers * 2; i++) {
1246 promises.add(pool.execute())
1248 await Promise.all(promises)
1249 expect(poolFull).toBe(1)
1250 expect(poolFullInfo).toStrictEqual({
1251 busyWorkerNodes: expect.any(Number),
1252 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1253 dynamicWorkerNodes: Math.floor(numberOfWorkers / 2),
1254 executedTasks: expect.any(Number),
1255 executingTasks: expect.any(Number),
1256 failedTasks: expect.any(Number),
1257 idleWorkerNodes: expect.any(Number),
1258 maxSize: numberOfWorkers,
1259 minSize: Math.floor(numberOfWorkers / 2),
1262 strategyRetries: expect.any(Number),
1263 type: PoolTypes.dynamic,
1265 worker: WorkerTypes.cluster,
1266 workerNodes: numberOfWorkers,
1268 await waitPoolEvents(pool, PoolEvents.fullEnd, 1)
1269 expect(poolFullEnd).toBe(1)
1270 expect(poolFullEndInfo).toStrictEqual({
1271 busyWorkerNodes: expect.any(Number),
1272 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1273 dynamicWorkerNodes: 0,
1274 executedTasks: expect.any(Number),
1275 executingTasks: expect.any(Number),
1276 failedTasks: expect.any(Number),
1277 idleWorkerNodes: expect.any(Number),
1278 maxSize: numberOfWorkers,
1279 minSize: Math.floor(numberOfWorkers / 2),
1282 strategyRetries: expect.any(Number),
1283 type: PoolTypes.dynamic,
1285 worker: WorkerTypes.cluster,
1286 workerNodes: Math.floor(numberOfWorkers / 2),
1288 await pool.destroy()
1291 it("Verify that pool event emitter 'backPressure' and 'backPressureEnd' events can register a callback", async () => {
1292 const pool = new FixedThreadPool(
1294 './tests/worker-files/thread/testWorker.mjs',
1296 enableTasksQueue: true,
1299 expect(pool.emitter.eventNames()).toStrictEqual([])
1300 const promises = new Set()
1301 let poolBackPressure = 0
1302 let poolBackPressureInfo
1303 pool.emitter.on(PoolEvents.backPressure, info => {
1305 poolBackPressureInfo = info
1307 let poolBackPressureEnd = 0
1308 let poolBackPressureEndInfo
1309 pool.emitter.on(PoolEvents.backPressureEnd, info => {
1310 ++poolBackPressureEnd
1311 poolBackPressureEndInfo = info
1313 expect(pool.emitter.eventNames()).toStrictEqual([
1314 PoolEvents.backPressure,
1315 PoolEvents.backPressureEnd,
1317 for (let i = 0; i < numberOfWorkers * 10; i++) {
1318 promises.add(pool.execute())
1320 await Promise.all(promises)
1321 expect(poolBackPressure).toBe(1)
1322 expect(poolBackPressureInfo).toStrictEqual({
1324 backPressureWorkerNodes: numberOfWorkers,
1325 busyWorkerNodes: expect.any(Number),
1326 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1327 executedTasks: expect.any(Number),
1328 executingTasks: expect.any(Number),
1329 failedTasks: expect.any(Number),
1330 idleWorkerNodes: expect.any(Number),
1331 maxQueuedTasks: expect.any(Number),
1332 maxSize: numberOfWorkers,
1333 minSize: numberOfWorkers,
1334 queuedTasks: expect.any(Number),
1337 stealingWorkerNodes: expect.any(Number),
1338 stolenTasks: expect.any(Number),
1339 strategyRetries: expect.any(Number),
1340 type: PoolTypes.fixed,
1342 worker: WorkerTypes.thread,
1343 workerNodes: numberOfWorkers,
1345 expect(poolBackPressureEnd).toBe(1)
1346 expect(poolBackPressureEndInfo).toStrictEqual({
1347 backPressure: false,
1348 backPressureWorkerNodes: expect.any(Number),
1349 busyWorkerNodes: expect.any(Number),
1350 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1351 executedTasks: expect.any(Number),
1352 executingTasks: expect.any(Number),
1353 failedTasks: expect.any(Number),
1354 idleWorkerNodes: expect.any(Number),
1355 maxQueuedTasks: expect.any(Number),
1356 maxSize: numberOfWorkers,
1357 minSize: numberOfWorkers,
1358 queuedTasks: expect.any(Number),
1361 stealingWorkerNodes: expect.any(Number),
1362 stolenTasks: expect.any(Number),
1363 strategyRetries: expect.any(Number),
1364 type: PoolTypes.fixed,
1366 worker: WorkerTypes.thread,
1367 workerNodes: numberOfWorkers,
1369 expect(poolBackPressureEndInfo.backPressureWorkerNodes).toBeLessThan(
1372 await pool.destroy()
1375 it("Verify that pool event emitter 'empty' event can register a callback", async () => {
1376 const pool = new DynamicClusterPool(
1379 './tests/worker-files/cluster/testWorker.cjs'
1381 expect(pool.emitter.eventNames()).toStrictEqual([])
1382 const promises = new Set()
1385 pool.emitter.on(PoolEvents.empty, info => {
1389 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.empty])
1390 for (let i = 0; i < numberOfWorkers; i++) {
1391 promises.add(pool.execute())
1393 await Promise.all(promises)
1394 await waitPoolEvents(pool, PoolEvents.empty, 1)
1395 expect(poolEmpty).toBe(1)
1396 expect(poolInfo).toStrictEqual({
1398 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1399 dynamicWorkerNodes: 0,
1400 executedTasks: expect.any(Number),
1401 executingTasks: expect.any(Number),
1402 failedTasks: expect.any(Number),
1404 maxSize: numberOfWorkers,
1408 strategyRetries: expect.any(Number),
1409 type: PoolTypes.dynamic,
1411 worker: WorkerTypes.cluster,
1414 await pool.destroy()
1417 it('Verify that destroy() waits for queued tasks to finish', async () => {
1418 const tasksFinishedTimeout = 2500
1419 const pool = new FixedThreadPool(
1421 './tests/worker-files/thread/asyncWorker.mjs',
1423 enableTasksQueue: true,
1424 tasksQueueOptions: { tasksFinishedTimeout },
1427 const maxMultiplier = 4
1428 let tasksFinished = 0
1429 for (const workerNode of pool.workerNodes) {
1430 workerNode.on('taskFinished', () => {
1434 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1437 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1438 const startTime = performance.now()
1439 await pool.destroy()
1440 const elapsedTime = performance.now() - startTime
1441 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1442 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1443 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1446 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1447 const tasksFinishedTimeout = 1000
1448 const pool = new FixedThreadPool(
1450 './tests/worker-files/thread/asyncWorker.mjs',
1452 enableTasksQueue: true,
1453 tasksQueueOptions: { tasksFinishedTimeout },
1456 const maxMultiplier = 4
1457 let tasksFinished = 0
1458 for (const workerNode of pool.workerNodes) {
1459 workerNode.on('taskFinished', () => {
1463 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1466 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1467 const startTime = performance.now()
1468 await pool.destroy()
1469 const elapsedTime = performance.now() - startTime
1470 expect(tasksFinished).toBe(0)
1471 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1474 it('Verify that pool asynchronous resource track tasks execution', async () => {
1479 let resolveCalls = 0
1480 const hook = createHook({
1482 if (asyncId === taskAsyncId) afterCalls++
1485 if (asyncId === taskAsyncId) beforeCalls++
1487 init (asyncId, type) {
1488 if (type === 'poolifier:task') {
1490 taskAsyncId = asyncId
1494 if (executionAsyncId() === taskAsyncId) resolveCalls++
1497 const pool = new FixedThreadPool(
1499 './tests/worker-files/thread/testWorker.mjs'
1502 await pool.execute()
1504 expect(initCalls).toBe(1)
1505 expect(beforeCalls).toBe(1)
1506 expect(afterCalls).toBe(1)
1507 expect(resolveCalls).toBe(1)
1508 await pool.destroy()
1511 it('Verify that hasTaskFunction() is working', async () => {
1512 const dynamicThreadPool = new DynamicThreadPool(
1513 Math.floor(numberOfWorkers / 2),
1515 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1517 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1518 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1519 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1522 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1523 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1524 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1525 await dynamicThreadPool.destroy()
1526 const fixedClusterPool = new FixedClusterPool(
1528 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1530 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1531 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1532 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1535 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1536 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1537 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1538 await fixedClusterPool.destroy()
1541 it('Verify that addTaskFunction() is working', async () => {
1542 const dynamicThreadPool = new DynamicThreadPool(
1543 Math.floor(numberOfWorkers / 2),
1545 './tests/worker-files/thread/testWorker.mjs'
1547 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1549 dynamicThreadPool.addTaskFunction(0, () => {})
1550 ).rejects.toThrow(new TypeError('name argument must be a string'))
1552 dynamicThreadPool.addTaskFunction('', () => {})
1554 new TypeError('name argument must not be an empty string')
1556 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1557 new TypeError('taskFunction property must be a function')
1559 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1560 new TypeError('taskFunction property must be a function')
1563 dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
1564 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1566 dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
1567 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1569 dynamicThreadPool.addTaskFunction('test', {
1571 taskFunction: () => {},
1574 new RangeError("Property 'priority' must be between -20 and 19")
1577 dynamicThreadPool.addTaskFunction('test', {
1579 taskFunction: () => {},
1582 new RangeError("Property 'priority' must be between -20 and 19")
1585 dynamicThreadPool.addTaskFunction('test', {
1586 strategy: 'invalidStrategy',
1587 taskFunction: () => {},
1590 new Error("Invalid worker choice strategy 'invalidStrategy'")
1592 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1593 { name: DEFAULT_TASK_NAME },
1597 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1598 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1599 const echoTaskFunction = data => {
1603 dynamicThreadPool.addTaskFunction('echo', {
1604 strategy: WorkerChoiceStrategies.LEAST_ELU,
1605 taskFunction: echoTaskFunction,
1607 ).resolves.toBe(true)
1608 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1609 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1610 strategy: WorkerChoiceStrategies.LEAST_ELU,
1611 taskFunction: echoTaskFunction,
1614 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1616 WorkerChoiceStrategies.ROUND_ROBIN,
1617 WorkerChoiceStrategies.LEAST_ELU,
1619 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1620 { name: DEFAULT_TASK_NAME },
1622 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
1624 const taskFunctionData = { test: 'test' }
1625 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1626 expect(echoResult).toStrictEqual(taskFunctionData)
1627 for (const workerNode of dynamicThreadPool.workerNodes) {
1628 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1629 elu: expect.objectContaining({
1630 active: expect.objectContaining({
1631 history: expect.any(CircularBuffer),
1633 idle: expect.objectContaining({
1634 history: expect.any(CircularBuffer),
1638 history: expect.any(CircularBuffer),
1641 executed: expect.any(Number),
1645 sequentiallyStolen: 0,
1649 history: expect.any(CircularBuffer),
1653 workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
1654 ).toBeGreaterThan(0)
1656 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
1660 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1664 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1665 ).toBeGreaterThan(0)
1668 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
1671 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1675 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1676 ).toBeGreaterThanOrEqual(0)
1679 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
1682 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1686 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1687 ).toBeGreaterThanOrEqual(0)
1689 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1690 ).toBeLessThanOrEqual(1)
1693 await dynamicThreadPool.destroy()
1696 it('Verify that removeTaskFunction() is working', async () => {
1697 const dynamicThreadPool = new DynamicThreadPool(
1698 Math.floor(numberOfWorkers / 2),
1700 './tests/worker-files/thread/testWorker.mjs'
1702 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1703 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1704 { name: DEFAULT_TASK_NAME },
1707 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1708 new Error('Cannot remove a task function not handled on the pool side')
1710 const echoTaskFunction = data => {
1713 await dynamicThreadPool.addTaskFunction('echo', {
1714 strategy: WorkerChoiceStrategies.LEAST_ELU,
1715 taskFunction: echoTaskFunction,
1717 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1718 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1719 strategy: WorkerChoiceStrategies.LEAST_ELU,
1720 taskFunction: echoTaskFunction,
1723 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1725 WorkerChoiceStrategies.ROUND_ROBIN,
1726 WorkerChoiceStrategies.LEAST_ELU,
1728 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1729 { name: DEFAULT_TASK_NAME },
1731 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
1733 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1736 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1737 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1739 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
1740 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1741 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1742 { name: DEFAULT_TASK_NAME },
1745 await dynamicThreadPool.destroy()
1748 it('Verify that listTaskFunctionsProperties() is working', async () => {
1749 const dynamicThreadPool = new DynamicThreadPool(
1750 Math.floor(numberOfWorkers / 2),
1752 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1754 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1755 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1756 { name: DEFAULT_TASK_NAME },
1757 { name: 'factorial' },
1758 { name: 'fibonacci' },
1759 { name: 'jsonIntegerSerialization' },
1761 await dynamicThreadPool.destroy()
1762 const fixedClusterPool = new FixedClusterPool(
1764 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1766 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1767 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1768 { name: DEFAULT_TASK_NAME },
1769 { name: 'factorial' },
1770 { name: 'fibonacci' },
1771 { name: 'jsonIntegerSerialization' },
1773 await fixedClusterPool.destroy()
1776 it('Verify that setDefaultTaskFunction() is working', async () => {
1777 const dynamicThreadPool = new DynamicThreadPool(
1778 Math.floor(numberOfWorkers / 2),
1780 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1782 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1783 const workerId = dynamicThreadPool.workerNodes[0].info.id
1784 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1786 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1790 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1793 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1797 dynamicThreadPool.setDefaultTaskFunction('unknown')
1800 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1803 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1804 { name: DEFAULT_TASK_NAME },
1805 { name: 'factorial' },
1806 { name: 'fibonacci' },
1807 { name: 'jsonIntegerSerialization' },
1810 dynamicThreadPool.setDefaultTaskFunction('factorial')
1811 ).resolves.toBe(true)
1812 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1813 { name: DEFAULT_TASK_NAME },
1814 { name: 'factorial' },
1815 { name: 'fibonacci' },
1816 { name: 'jsonIntegerSerialization' },
1819 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1820 ).resolves.toBe(true)
1821 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1822 { name: DEFAULT_TASK_NAME },
1823 { name: 'fibonacci' },
1824 { name: 'factorial' },
1825 { name: 'jsonIntegerSerialization' },
1827 await dynamicThreadPool.destroy()
1830 it('Verify that multiple task functions worker is working', async () => {
1831 const pool = new DynamicClusterPool(
1832 Math.floor(numberOfWorkers / 2),
1834 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1836 const data = { n: 10 }
1837 const result0 = await pool.execute(data)
1838 expect(result0).toStrictEqual(3628800)
1839 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1840 expect(result1).toStrictEqual({ ok: 1 })
1841 const result2 = await pool.execute(data, 'factorial')
1842 expect(result2).toBe(3628800)
1843 const result3 = await pool.execute(data, 'fibonacci')
1844 expect(result3).toBe(55)
1845 expect(pool.info.executingTasks).toBe(0)
1846 expect(pool.info.executedTasks).toBe(4)
1847 for (const workerNode of pool.workerNodes) {
1848 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1849 { name: DEFAULT_TASK_NAME },
1850 { name: 'factorial' },
1851 { name: 'fibonacci' },
1852 { name: 'jsonIntegerSerialization' },
1854 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1855 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1856 expect(workerNode.tasksQueue.enablePriority).toBe(false)
1857 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1859 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1863 history: expect.any(CircularBuffer),
1866 history: expect.any(CircularBuffer),
1870 history: expect.any(CircularBuffer),
1873 executed: expect.any(Number),
1877 sequentiallyStolen: 0,
1881 history: expect.any(CircularBuffer),
1885 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1887 ).toBeGreaterThan(0)
1890 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1892 workerNode.getTaskFunctionWorkerUsage(
1893 workerNode.info.taskFunctionsProperties[1].name
1897 await pool.destroy()
1900 it('Verify that mapExecute() is working', async () => {
1901 const pool = new DynamicThreadPool(
1902 Math.floor(numberOfWorkers / 2),
1904 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1906 await expect(pool.mapExecute()).rejects.toThrow(
1907 new TypeError('data argument must be a defined iterable')
1909 await expect(pool.mapExecute(0)).rejects.toThrow(
1910 new TypeError('data argument must be an iterable')
1912 await expect(pool.mapExecute([undefined], 0)).rejects.toThrow(
1913 new TypeError('name argument must be a string')
1915 await expect(pool.mapExecute([undefined], '')).rejects.toThrow(
1916 new TypeError('name argument must not be an empty string')
1918 await expect(pool.mapExecute([undefined], undefined, {})).rejects.toThrow(
1919 new TypeError('transferList argument must be an array')
1921 await expect(pool.mapExecute([undefined], 'unknown')).rejects.toBe(
1922 "Task function 'unknown' not found"
1924 let results = await pool.mapExecute(
1926 'jsonIntegerSerialization'
1928 expect(results).toStrictEqual([{ ok: 1 }, { ok: 1 }, { ok: 1 }, { ok: 1 }])
1929 expect(pool.info.executingTasks).toBe(0)
1930 expect(pool.info.executedTasks).toBe(4)
1931 results = await pool.mapExecute(
1932 [{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }],
1935 expect(results).toStrictEqual([
1936 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47,
1938 expect(pool.info.executingTasks).toBe(0)
1939 expect(pool.info.executedTasks).toBe(8)
1940 results = await pool.mapExecute(
1941 new Set([{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }]),
1944 expect(results).toStrictEqual([
1945 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47,
1947 expect(pool.info.executingTasks).toBe(0)
1948 expect(pool.info.executedTasks).toBe(12)
1949 await pool.destroy()
1950 await expect(pool.mapExecute()).rejects.toThrow(
1951 new Error('Cannot execute task(s) on not started pool')
1955 it('Verify that task function objects worker is working', async () => {
1956 const pool = new DynamicThreadPool(
1957 Math.floor(numberOfWorkers / 2),
1959 './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs'
1961 const data = { n: 10 }
1962 const result0 = await pool.execute(data)
1963 expect(result0).toStrictEqual(3628800)
1964 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1965 expect(result1).toStrictEqual({ ok: 1 })
1966 const result2 = await pool.execute(data, 'factorial')
1967 expect(result2).toBe(3628800)
1968 const result3 = await pool.execute(data, 'fibonacci')
1969 expect(result3).toBe(55)
1970 expect(pool.info.executingTasks).toBe(0)
1971 expect(pool.info.executedTasks).toBe(4)
1972 for (const workerNode of pool.workerNodes) {
1973 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1974 { name: DEFAULT_TASK_NAME },
1975 { name: 'factorial' },
1976 { name: 'fibonacci', priority: -5 },
1977 { name: 'jsonIntegerSerialization' },
1979 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1980 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1981 expect(workerNode.tasksQueue.enablePriority).toBe(true)
1982 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1984 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1988 history: expect.any(CircularBuffer),
1991 history: expect.any(CircularBuffer),
1995 history: expect.any(CircularBuffer),
1998 executed: expect.any(Number),
2002 sequentiallyStolen: 0,
2006 history: expect.any(CircularBuffer),
2010 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
2012 ).toBeGreaterThan(0)
2015 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
2017 workerNode.getTaskFunctionWorkerUsage(
2018 workerNode.info.taskFunctionsProperties[1].name
2022 await pool.destroy()
2025 it('Verify sendKillMessageToWorker()', async () => {
2026 const pool = new DynamicClusterPool(
2027 Math.floor(numberOfWorkers / 2),
2029 './tests/worker-files/cluster/testWorker.cjs'
2031 const workerNodeKey = 0
2033 pool.sendKillMessageToWorker(workerNodeKey)
2034 ).resolves.toBeUndefined()
2035 await pool.destroy()
2038 it('Verify sendTaskFunctionOperationToWorker()', async () => {
2039 const pool = new DynamicClusterPool(
2040 Math.floor(numberOfWorkers / 2),
2042 './tests/worker-files/cluster/testWorker.cjs'
2044 const workerNodeKey = 0
2046 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
2047 taskFunction: (() => {}).toString(),
2048 taskFunctionOperation: 'add',
2049 taskFunctionProperties: { name: 'empty' },
2051 ).resolves.toBe(true)
2053 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
2055 { name: DEFAULT_TASK_NAME },
2059 await pool.destroy()
2062 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
2063 const pool = new DynamicClusterPool(
2064 Math.floor(numberOfWorkers / 2),
2066 './tests/worker-files/cluster/testWorker.cjs'
2069 pool.sendTaskFunctionOperationToWorkers({
2070 taskFunction: (() => {}).toString(),
2071 taskFunctionOperation: 'add',
2072 taskFunctionProperties: { name: 'empty' },
2074 ).resolves.toBe(true)
2075 for (const workerNode of pool.workerNodes) {
2076 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
2077 { name: DEFAULT_TASK_NAME },
2082 await pool.destroy()