X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract-pool.test.mjs;h=0c91349765f65c80884d4c50d8b45d93424e37fd;hb=f12182ad6dc553c7a5dfeee01bcde65c0177f671;hp=41f4751fcae23153cbfab362245e105af376ba37;hpb=bcf85f7f89db68417a18393ad629fc26a2383841;p=poolifier.git diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 41f4751f..0c913497 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1,9 +1,14 @@ +// eslint-disable-next-line n/no-unsupported-features/node-builtins +import { createHook, executionAsyncId } from 'node:async_hooks' import { EventEmitterAsyncResource } from 'node:events' -import { dirname, join } from 'node:path' import { readFileSync } from 'node:fs' +import { dirname, join } from 'node:path' import { fileURLToPath } from 'node:url' + import { expect } from 'expect' import { restore, stub } from 'sinon' + +import { CircularBuffer } from '../../lib/circular-buffer.cjs' import { DynamicClusterPool, DynamicThreadPool, @@ -13,12 +18,11 @@ import { PoolTypes, WorkerChoiceStrategies, WorkerTypes -} from '../../lib/index.js' -import { CircularArray } from '../../lib/circular-array.js' -import { Deque } from '../../lib/deque.js' -import { DEFAULT_TASK_NAME } from '../../lib/utils.js' -import { waitPoolEvents } from '../test-utils.js' -import { WorkerNode } from '../../lib/pools/worker-node.js' +} from '../../lib/index.cjs' +import { WorkerNode } from '../../lib/pools/worker-node.cjs' +import { PriorityQueue } from '../../lib/priority-queue.cjs' +import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' +import { waitPoolEvents } from '../test-utils.cjs' describe('Abstract pool test suite', () => { const version = JSON.parse( @@ -69,14 +73,18 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.mjs' ) - expect(pool.starting).toBe(false) expect(pool.started).toBe(true) + expect(pool.starting).toBe(false) + expect(pool.destroying).toBe(false) await pool.destroy() }) it('Verify that filePath is checked', () => { expect(() => new FixedThreadPool(numberOfWorkers)).toThrow( - new Error("Cannot find the worker file 'undefined'") + new TypeError('The worker file path must be specified') + ) + expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow( + new TypeError('The worker file path must be a string') ) expect( () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts') @@ -100,7 +108,7 @@ describe('Abstract pool test suite', () => { it('Verify that a negative number of workers is checked', () => { expect( () => - new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js') + new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs') ).toThrow( new RangeError( 'Cannot instantiate a pool with a negative number of workers' @@ -119,13 +127,29 @@ describe('Abstract pool test suite', () => { ) }) + it('Verify that pool arguments number and pool type are checked', () => { + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs', + undefined, + numberOfWorkers * 2 + ) + ).toThrow( + new Error( + 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization' + ) + ) + }) + it('Verify that dynamic pool sizing is checked', () => { expect( () => new DynamicClusterPool( 1, undefined, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) ).toThrow( new TypeError( @@ -149,7 +173,7 @@ describe('Abstract pool test suite', () => { new DynamicClusterPool( 0, 0.5, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) ).toThrow( new TypeError( @@ -185,7 +209,7 @@ describe('Abstract pool test suite', () => { new DynamicClusterPool( 1, 1, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) ).toThrow( new RangeError( @@ -200,32 +224,24 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs' ) expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource) + expect(pool.emitter.eventNames()).toStrictEqual([]) expect(pool.opts).toStrictEqual({ startWorkers: true, enableEvents: true, restartWorkerOnError: true, enableTasksQueue: false, - workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN, - workerChoiceStrategyOptions: { - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - } - }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } + workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: false }, waitTime: { median: false }, - elu: { median: false } + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) } await pool.destroy() @@ -259,14 +275,12 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 2000 }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, workerChoiceStrategyOptions: { - retries: 6, runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, weights: { 0: 300, 1: 200 } }, onlineHandler: testHandler, @@ -274,17 +288,9 @@ describe('Abstract pool test suite', () => { errorHandler: testHandler, exitHandler: testHandler }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, - runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, - weights: { 0: 300, 1: 200 } - }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, @@ -305,38 +311,6 @@ describe('Abstract pool test suite', () => { } ) ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'")) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: 'invalidChoiceRetries' - } - } - ) - ).toThrow( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: -1 - } - } - ) - ).toThrow( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) expect( () => new FixedThreadPool( @@ -472,29 +446,21 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: false }, waitTime: { median: false }, - elu: { median: false } + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -502,8 +468,8 @@ describe('Abstract pool test suite', () => { median: false }, waitTime: { - aggregate: false, - average: false, + aggregate: true, + average: true, median: false }, elu: { @@ -517,28 +483,23 @@ describe('Abstract pool test suite', () => { elu: { median: true } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, runTime: { median: true }, - waitTime: { median: false }, elu: { median: true } }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, - runTime: { median: true }, - waitTime: { median: false }, - elu: { median: true } - }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: true }, waitTime: { median: false }, - elu: { median: true } + elu: { median: true }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -546,8 +507,8 @@ describe('Abstract pool test suite', () => { median: true }, waitTime: { - aggregate: false, - average: false, + aggregate: true, + average: true, median: false }, elu: { @@ -561,28 +522,23 @@ describe('Abstract pool test suite', () => { elu: { median: false } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, runTime: { median: false }, - waitTime: { median: false }, elu: { median: false } }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: false }, waitTime: { median: false }, - elu: { median: false } + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -590,8 +546,8 @@ describe('Abstract pool test suite', () => { median: false }, waitTime: { - aggregate: false, - average: false, + aggregate: true, + average: true, median: false }, elu: { @@ -607,20 +563,6 @@ describe('Abstract pool test suite', () => { 'Invalid worker choice strategy options: must be a plain object' ) ) - expect(() => - pool.setWorkerChoiceStrategyOptions({ - retries: 'invalidChoiceRetries' - }) - ).toThrow( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow( new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' @@ -649,7 +591,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) @@ -657,7 +600,8 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) @@ -675,7 +619,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -686,13 +631,15 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: 2, taskStealing: false, - tasksStealingOnBackPressure: false + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 3000 }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, size: 2, taskStealing: false, - tasksStealingOnBackPressure: false + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 3000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -708,7 +655,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -758,7 +706,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: 0, minSize: numberOfWorkers, maxSize: numberOfWorkers, workerNodes: numberOfWorkers, @@ -772,7 +721,7 @@ describe('Abstract pool test suite', () => { pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) expect(pool.info).toStrictEqual({ version, @@ -780,7 +729,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.cluster, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: 0, minSize: Math.floor(numberOfWorkers / 2), maxSize: numberOfWorkers, workerNodes: Math.floor(numberOfWorkers / 2), @@ -796,7 +746,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker tasks usage are initialized', async () => { const pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -806,21 +756,22 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, elu: { idle: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, active: { - history: new CircularArray() + history: expect.any(CircularBuffer) } } }) @@ -831,13 +782,14 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker tasks queue are initialized', async () => { let pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) - expect(workerNode.tasksQueue).toBeInstanceOf(Deque) + expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) + expect(workerNode.tasksQueue.bucketSize).toBe(numberOfWorkers * 2) } await pool.destroy() pool = new DynamicThreadPool( @@ -847,9 +799,10 @@ describe('Abstract pool test suite', () => { ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) - expect(workerNode.tasksQueue).toBeInstanceOf(Deque) + expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) + expect(workerNode.tasksQueue.bucketSize).toBe(numberOfWorkers * 2) } await pool.destroy() }) @@ -857,7 +810,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker info are initialized', async () => { let pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -865,7 +818,9 @@ describe('Abstract pool test suite', () => { id: expect.any(Number), type: WorkerTypes.cluster, dynamic: false, - ready: true + ready: true, + stealing: false, + backPressure: false }) } await pool.destroy() @@ -880,16 +835,36 @@ describe('Abstract pool test suite', () => { id: expect.any(Number), type: WorkerTypes.thread, dynamic: false, - ready: true + ready: true, + stealing: false, + backPressure: false }) } await pool.destroy() }) + it('Verify that pool statuses are checked at start or destroy', async () => { + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs' + ) + expect(pool.info.started).toBe(true) + expect(pool.info.ready).toBe(true) + expect(() => pool.start()).toThrow( + new Error('Cannot start an already started pool') + ) + await pool.destroy() + expect(pool.info.started).toBe(false) + expect(pool.info.ready).toBe(false) + await expect(pool.destroy()).rejects.toThrow( + new Error('Cannot destroy an already destroyed pool') + ) + }) + it('Verify that pool can be started after initialization', async () => { const pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js', + './tests/worker-files/cluster/testWorker.cjs', { startWorkers: false } @@ -897,12 +872,15 @@ describe('Abstract pool test suite', () => { expect(pool.info.started).toBe(false) expect(pool.info.ready).toBe(false) expect(pool.workerNodes).toStrictEqual([]) + expect(pool.readyEventEmitted).toBe(false) await expect(pool.execute()).rejects.toThrow( new Error('Cannot execute a task on not started pool') ) pool.start() expect(pool.info.started).toBe(true) expect(pool.info.ready).toBe(true) + await waitPoolEvents(pool, PoolEvents.ready, 1) + expect(pool.readyEventEmitted).toBe(true) expect(pool.workerNodes.length).toBe(numberOfWorkers) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -913,7 +891,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool execute() arguments are checked', async () => { const pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) await expect(pool.execute(undefined, 0)).rejects.toThrow( new TypeError('name argument must be a string') @@ -936,7 +914,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker tasks usage are computed', async () => { const pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) const promises = new Set() const maxMultiplier = 2 @@ -950,21 +928,22 @@ describe('Abstract pool test suite', () => { executing: maxMultiplier, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) @@ -977,21 +956,22 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) @@ -999,7 +979,7 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => { + it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => { const pool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, @@ -1018,21 +998,22 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) @@ -1040,41 +1021,38 @@ describe('Abstract pool test suite', () => { expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( numberOfWorkers * maxMultiplier ) - expect(workerNode.usage.runTime.history.length).toBe(0) - expect(workerNode.usage.waitTime.history.length).toBe(0) - expect(workerNode.usage.elu.idle.history.length).toBe(0) - expect(workerNode.usage.elu.active.history.length).toBe(0) } pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) for (const workerNode of pool.workerNodes) { expect(workerNode.usage).toStrictEqual({ tasks: { - executed: 0, + executed: expect.any(Number), executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) - expect(workerNode.usage.runTime.history.length).toBe(0) - expect(workerNode.usage.waitTime.history.length).toBe(0) - expect(workerNode.usage.elu.idle.history.length).toBe(0) - expect(workerNode.usage.elu.active.history.length).toBe(0) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + numberOfWorkers * maxMultiplier + ) } await pool.destroy() }) @@ -1083,7 +1061,7 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) expect(pool.emitter.eventNames()).toStrictEqual([]) let poolInfo @@ -1101,7 +1079,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.cluster, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1141,7 +1120,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1180,7 +1160,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1222,11 +1203,13 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), idleWorkerNodes: expect.any(Number), + stealingWorkerNodes: expect.any(Number), busyWorkerNodes: expect.any(Number), executedTasks: expect.any(Number), executingTasks: expect.any(Number), @@ -1236,7 +1219,101 @@ describe('Abstract pool test suite', () => { stolenTasks: expect.any(Number), failedTasks: expect.any(Number) }) - expect(pool.hasBackPressure.called).toBe(true) + expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7) + await pool.destroy() + }) + + it('Verify that destroy() waits for queued tasks to finish', async () => { + const tasksFinishedTimeout = 2500 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() + await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier) + expect(elapsedTime).toBeGreaterThanOrEqual(2000) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800) + }) + + it('Verify that destroy() waits until the tasks finished timeout is reached', async () => { + const tasksFinishedTimeout = 1000 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() + await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBe(0) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800) + }) + + it('Verify that pool asynchronous resource track tasks execution', async () => { + let taskAsyncId + let initCalls = 0 + let beforeCalls = 0 + let afterCalls = 0 + let resolveCalls = 0 + const hook = createHook({ + init (asyncId, type) { + if (type === 'poolifier:task') { + initCalls++ + taskAsyncId = asyncId + } + }, + before (asyncId) { + if (asyncId === taskAsyncId) beforeCalls++ + }, + after (asyncId) { + if (asyncId === taskAsyncId) afterCalls++ + }, + promiseResolve () { + if (executionAsyncId() === taskAsyncId) resolveCalls++ + } + }) + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs' + ) + hook.enable() + await pool.execute() + hook.disable() + expect(initCalls).toBe(1) + expect(beforeCalls).toBe(1) + expect(afterCalls).toBe(1) + expect(resolveCalls).toBe(1) await pool.destroy() }) @@ -1257,7 +1334,7 @@ describe('Abstract pool test suite', () => { await dynamicThreadPool.destroy() const fixedClusterPool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js' + './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs' ) await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1) expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true) @@ -1286,29 +1363,72 @@ describe('Abstract pool test suite', () => { new TypeError('name argument must not be an empty string') ) await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow( - new TypeError('fn argument must be a function') + new TypeError('taskFunction property must be a function') ) await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow( - new TypeError('fn argument must be a function') + new TypeError('taskFunction property must be a function') + ) + await expect( + dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 }) + ).rejects.toThrow(new TypeError('taskFunction property must be a function')) + await expect( + dynamicThreadPool.addTaskFunction('test', { taskFunction: '' }) + ).rejects.toThrow(new TypeError('taskFunction property must be a function')) + await expect( + dynamicThreadPool.addTaskFunction('test', { + taskFunction: () => {}, + priority: -21 + }) + ).rejects.toThrow( + new RangeError("Property 'priority' must be between -20 and 19") + ) + await expect( + dynamicThreadPool.addTaskFunction('test', { + taskFunction: () => {}, + priority: 20 + }) + ).rejects.toThrow( + new RangeError("Property 'priority' must be between -20 and 19") + ) + await expect( + dynamicThreadPool.addTaskFunction('test', { + taskFunction: () => {}, + strategy: 'invalidStrategy' + }) + ).rejects.toThrow( + new Error("Invalid worker choice strategy 'invalidStrategy'") ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' } ]) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN]) const echoTaskFunction = data => { return data } await expect( - dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + dynamicThreadPool.addTaskFunction('echo', { + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) ).resolves.toBe(true) expect(dynamicThreadPool.taskFunctions.size).toBe(1) - expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual( - echoTaskFunction - ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test', - 'echo' + expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({ + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([ + WorkerChoiceStrategies.ROUND_ROBIN, + WorkerChoiceStrategies.LEAST_ELU + ]) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU } ]) const taskFunctionData = { test: 'test' } const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo') @@ -1319,24 +1439,65 @@ describe('Abstract pool test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, - elu: { - idle: { - history: new CircularArray() - }, - active: { - history: new CircularArray() - } - } + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularBuffer) + }), + active: expect.objectContaining({ + history: expect.any(CircularBuffer) + }) + }) }) + expect( + workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed + ).toBeGreaterThan(0) + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate == + null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate + ).toBeGreaterThan(0) + } + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate + ).toBeGreaterThanOrEqual(0) + } + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeGreaterThanOrEqual(0) + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeLessThanOrEqual(1) + } } await dynamicThreadPool.destroy() }) @@ -1348,9 +1509,9 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' } ]) await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow( new Error('Cannot remove a task function not handled on the pool side') @@ -1358,52 +1519,65 @@ describe('Abstract pool test suite', () => { const echoTaskFunction = data => { return data } - await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + await dynamicThreadPool.addTaskFunction('echo', { + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) expect(dynamicThreadPool.taskFunctions.size).toBe(1) - expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual( - echoTaskFunction - ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test', - 'echo' + expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({ + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([ + WorkerChoiceStrategies.ROUND_ROBIN, + WorkerChoiceStrategies.LEAST_ELU + ]) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU } ]) await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe( true ) expect(dynamicThreadPool.taskFunctions.size).toBe(0) expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined() - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test' + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN]) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' } ]) await dynamicThreadPool.destroy() }) - it('Verify that listTaskFunctionNames() is working', async () => { + it('Verify that listTaskFunctionsProperties() is working', async () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } ]) await dynamicThreadPool.destroy() const fixedClusterPool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js' + './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs' ) await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1) - expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } ]) await fixedClusterPool.destroy() }) @@ -1415,48 +1589,49 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) + const workerId = dynamicThreadPool.workerNodes[0].info.id await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow( new Error( - "Task function operation 'default' failed on worker 33 with error: 'TypeError: name parameter is not a string'" + `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'` ) ) await expect( dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME) ).rejects.toThrow( new Error( - "Task function operation 'default' failed on worker 33 with error: 'Error: Cannot set the default task function reserved name as the default task function'" + `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'` ) ) await expect( dynamicThreadPool.setDefaultTaskFunction('unknown') ).rejects.toThrow( new Error( - "Task function operation 'default' failed on worker 33 with error: 'Error: Cannot set the default task function to a non-existing task function'" + `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'` ) ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } ]) await expect( dynamicThreadPool.setDefaultTaskFunction('factorial') ).resolves.toBe(true) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'factorial', - 'jsonIntegerSerialization', - 'fibonacci' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'factorial' }, + { name: 'jsonIntegerSerialization' }, + { name: 'fibonacci' } ]) await expect( dynamicThreadPool.setDefaultTaskFunction('fibonacci') ).resolves.toBe(true) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'fibonacci', - 'jsonIntegerSerialization', - 'factorial' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'fibonacci' }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' } ]) await dynamicThreadPool.destroy() }) @@ -1465,7 +1640,76 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js' + './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs' + ) + const data = { n: 10 } + const result0 = await pool.execute(data) + expect(result0).toStrictEqual({ ok: 1 }) + const result1 = await pool.execute(data, 'jsonIntegerSerialization') + expect(result1).toStrictEqual({ ok: 1 }) + const result2 = await pool.execute(data, 'factorial') + expect(result2).toBe(3628800) + const result3 = await pool.execute(data, 'fibonacci') + expect(result3).toBe(55) + expect(pool.info.executingTasks).toBe(0) + expect(pool.info.executedTasks).toBe(4) + for (const workerNode of pool.workerNodes) { + expect(workerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } + ]) + expect(workerNode.taskFunctionsUsage.size).toBe(3) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) { + expect( + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + ).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + failed: 0, + queued: 0, + sequentiallyStolen: 0, + stolen: 0 + }, + runTime: { + history: expect.any(CircularBuffer) + }, + waitTime: { + history: expect.any(CircularBuffer) + }, + elu: { + idle: { + history: expect.any(CircularBuffer) + }, + active: { + history: expect.any(CircularBuffer) + } + } + }) + expect( + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + .tasks.executed + ).toBeGreaterThan(0) + } + expect( + workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) + ).toStrictEqual( + workerNode.getTaskFunctionWorkerUsage( + workerNode.info.taskFunctionsProperties[1].name + ) + ) + } + await pool.destroy() + }) + + it('Verify that task function objects worker is working', async () => { + const pool = new DynamicThreadPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs' ) const data = { n: 10 } const result0 = await pool.execute(data) @@ -1479,46 +1723,51 @@ describe('Abstract pool test suite', () => { expect(pool.info.executingTasks).toBe(0) expect(pool.info.executedTasks).toBe(4) for (const workerNode of pool.workerNodes) { - expect(workerNode.info.taskFunctionNames).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(workerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } ]) expect(workerNode.taskFunctionsUsage.size).toBe(3) - for (const name of pool.listTaskFunctionNames()) { - expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({ + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) { + expect( + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + ).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, failed: 0, queued: 0, + sequentiallyStolen: 0, stolen: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) expect( - workerNode.getTaskFunctionWorkerUsage(name).tasks.executed + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + .tasks.executed ).toBeGreaterThan(0) } expect( workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) ).toStrictEqual( workerNode.getTaskFunctionWorkerUsage( - workerNode.info.taskFunctionNames[1] + workerNode.info.taskFunctionsProperties[1].name ) ) } @@ -1529,7 +1778,7 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) const workerNodeKey = 0 await expect( @@ -1542,19 +1791,23 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) const workerNodeKey = 0 await expect( pool.sendTaskFunctionOperationToWorker(workerNodeKey, { taskFunctionOperation: 'add', - taskFunctionName: 'empty', + taskFunctionProperties: { name: 'empty' }, taskFunction: (() => {}).toString() }) ).resolves.toBe(true) expect( - pool.workerNodes[workerNodeKey].info.taskFunctionNames - ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty']) + pool.workerNodes[workerNodeKey].info.taskFunctionsProperties + ).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'empty' } + ]) await pool.destroy() }) @@ -1562,20 +1815,20 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) await expect( pool.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', - taskFunctionName: 'empty', + taskFunctionProperties: { name: 'empty' }, taskFunction: (() => {}).toString() }) ).resolves.toBe(true) for (const workerNode of pool.workerNodes) { - expect(workerNode.info.taskFunctionNames).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test', - 'empty' + expect(workerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'empty' } ]) } await pool.destroy()