X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract%2Fabstract-pool.test.js;h=aeede25d7149a9c5918626ae9e860e7733cea4b4;hb=d6ca14169b6d2e9ad12d438a7acca797ae683e7e;hp=d1c58ddbb73658f495df83f345c013f16c631132;hpb=85bb5e488cb5adb8fef806e1f3fd9f85c525713e;p=poolifier.git diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index d1c58ddb..aeede25d 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -1,3 +1,4 @@ +const { EventEmitter } = require('node:events') const { expect } = require('expect') const sinon = require('sinon') const { @@ -11,9 +12,11 @@ const { WorkerTypes } = require('../../../lib') const { CircularArray } = require('../../../lib/circular-array') -const { Queue } = require('../../../lib/queue') +const { Deque } = require('../../../lib/deque') +const { DEFAULT_TASK_NAME } = require('../../../lib/utils') const { version } = require('../../../package.json') const { waitPoolEvents } = require('../../test-utils') +const { WorkerNode } = require('../../../lib/pools/worker-node') describe('Abstract pool test suite', () => { const numberOfWorkers = 2 @@ -23,6 +26,10 @@ describe('Abstract pool test suite', () => { } } + afterEach(() => { + sinon.restore() + }) + it('Simulate pool creation from a non main thread/process', () => { expect( () => @@ -30,14 +37,26 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.js', { - errorHandler: (e) => console.error(e) + errorHandler: e => console.error(e) } ) ).toThrowError( - 'Cannot start a pool from a worker with the same type as the pool' + new Error( + 'Cannot start a pool from a worker with the same type as the pool' + ) ) }) + it('Verify that pool statuses properties are set', async () => { + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.js' + ) + expect(pool.starting).toBe(false) + expect(pool.started).toBe(true) + await pool.destroy() + }) + it('Verify that filePath is checked', () => { const expectedError = new Error( 'Please specify a file with a worker implementation' @@ -61,7 +80,9 @@ describe('Abstract pool test suite', () => { it('Verify that numberOfWorkers is checked', () => { expect(() => new FixedThreadPool()).toThrowError( - 'Cannot instantiate a pool without specifying the number of workers' + new Error( + 'Cannot instantiate a pool without specifying the number of workers' + ) ) }) @@ -134,22 +155,22 @@ describe('Abstract pool test suite', () => { ) expect( () => - new DynamicClusterPool( - 1, - 1, - './tests/worker-files/cluster/testWorker.js' - ) + new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js') ).toThrowError( new RangeError( - 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' + 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' ) ) expect( () => - new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js') + new DynamicClusterPool( + 1, + 1, + './tests/worker-files/cluster/testWorker.js' + ) ).toThrowError( new RangeError( - 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' + 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' ) ) }) @@ -159,30 +180,35 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.js' ) - expect(pool.emitter).toBeDefined() - expect(pool.opts.enableEvents).toBe(true) - expect(pool.opts.restartWorkerOnError).toBe(true) - expect(pool.opts.enableTasksQueue).toBe(false) - expect(pool.opts.tasksQueueOptions).toBeUndefined() - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - choiceRetries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } + expect(pool.emitter).toBeInstanceOf(EventEmitter) + expect(pool.opts).toStrictEqual({ + startWorkers: true, + enableEvents: true, + restartWorkerOnError: true, + enableTasksQueue: false, + workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + workerChoiceStrategyOptions: { + retries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - choiceRetries: 6, + retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }) - expect(pool.opts.messageHandler).toBeUndefined() - expect(pool.opts.errorHandler).toBeUndefined() - expect(pool.opts.onlineHandler).toBeUndefined() - expect(pool.opts.exitHandler).toBeUndefined() + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + .workerChoiceStrategies) { + expect(workerChoiceStrategy.opts).toStrictEqual({ + retries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + } await pool.destroy() const testHandler = () => console.info('test handler executed') pool = new FixedThreadPool( @@ -205,31 +231,47 @@ describe('Abstract pool test suite', () => { } ) expect(pool.emitter).toBeUndefined() - expect(pool.opts.enableEvents).toBe(false) - expect(pool.opts.restartWorkerOnError).toBe(false) - expect(pool.opts.enableTasksQueue).toBe(true) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LEAST_USED - ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - choiceRetries: 6, - runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, - weights: { 0: 300, 1: 200 } + expect(pool.opts).toStrictEqual({ + startWorkers: true, + enableEvents: false, + restartWorkerOnError: false, + enableTasksQueue: true, + tasksQueueOptions: { + concurrency: 2, + size: 4, + taskStealing: true, + tasksStealingOnBackPressure: true + }, + workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, + workerChoiceStrategyOptions: { + retries: 6, + runTime: { median: true }, + waitTime: { median: false }, + elu: { median: false }, + weights: { 0: 300, 1: 200 } + }, + onlineHandler: testHandler, + messageHandler: testHandler, + errorHandler: testHandler, + exitHandler: testHandler }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - choiceRetries: 6, + retries: 6, runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, weights: { 0: 300, 1: 200 } }) - expect(pool.opts.messageHandler).toStrictEqual(testHandler) - expect(pool.opts.errorHandler).toStrictEqual(testHandler) - expect(pool.opts.onlineHandler).toStrictEqual(testHandler) - expect(pool.opts.exitHandler).toStrictEqual(testHandler) + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + .workerChoiceStrategies) { + expect(workerChoiceStrategy.opts).toStrictEqual({ + retries: 6, + runTime: { median: true }, + waitTime: { median: false }, + elu: { median: false }, + weights: { 0: 300, 1: 200 } + }) + } await pool.destroy() }) @@ -246,6 +288,38 @@ describe('Abstract pool test suite', () => { ).toThrowError( new Error("Invalid worker choice strategy 'invalidStrategy'") ) + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.js', + { + workerChoiceStrategyOptions: { + retries: 'invalidChoiceRetries' + } + } + ) + ).toThrowError( + new TypeError( + 'Invalid worker choice strategy options: retries must be an integer' + ) + ) + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.js', + { + workerChoiceStrategyOptions: { + retries: -1 + } + } + ) + ).toThrowError( + new RangeError( + "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" + ) + ) expect( () => new FixedThreadPool( @@ -274,6 +348,19 @@ describe('Abstract pool test suite', () => { "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" ) ) + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.js', + { + enableTasksQueue: true, + tasksQueueOptions: 'invalidTasksQueueOptions' + } + ) + ).toThrowError( + new TypeError('Invalid tasks queue options: must be a plain object') + ) expect( () => new FixedThreadPool( @@ -285,8 +372,8 @@ describe('Abstract pool test suite', () => { } ) ).toThrowError( - new TypeError( - 'Invalid worker tasks concurrency: 0 is a negative integer or zero' + new RangeError( + 'Invalid worker node tasks concurrency: 0 is a negative integer or zero' ) ) expect( @@ -296,11 +383,13 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.js', { enableTasksQueue: true, - tasksQueueOptions: 'invalidTasksQueueOptions' + tasksQueueOptions: { concurrency: -1 } } ) ).toThrowError( - new TypeError('Invalid tasks queue options: must be a plain object') + new RangeError( + 'Invalid worker node tasks concurrency: -1 is a negative integer or zero' + ) ) expect( () => @@ -313,7 +402,50 @@ describe('Abstract pool test suite', () => { } ) ).toThrowError( - new TypeError('Invalid worker tasks concurrency: must be an integer') + new TypeError('Invalid worker node tasks concurrency: must be an integer') + ) + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.js', + { + enableTasksQueue: true, + tasksQueueOptions: { size: 0 } + } + ) + ).toThrowError( + new RangeError( + 'Invalid worker node tasks queue size: 0 is a negative integer or zero' + ) + ) + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.js', + { + enableTasksQueue: true, + tasksQueueOptions: { size: -1 } + } + ) + ).toThrowError( + new RangeError( + 'Invalid worker node tasks queue size: -1 is a negative integer or zero' + ) + ) + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.js', + { + enableTasksQueue: true, + tasksQueueOptions: { size: 0.2 } + } + ) + ).toThrowError( + new TypeError('Invalid worker node tasks queue size: must be an integer') ) }) @@ -324,13 +456,13 @@ describe('Abstract pool test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - choiceRetries: 6, + retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - choiceRetries: 6, + retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -338,7 +470,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - choiceRetries: 6, + retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -368,13 +500,13 @@ describe('Abstract pool test suite', () => { elu: { median: true } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - choiceRetries: 6, + retries: 6, runTime: { median: true }, waitTime: { median: false }, elu: { median: true } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - choiceRetries: 6, + retries: 6, runTime: { median: true }, waitTime: { median: false }, elu: { median: true } @@ -382,7 +514,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - choiceRetries: 6, + retries: 6, runTime: { median: true }, waitTime: { median: false }, elu: { median: true } @@ -412,13 +544,13 @@ describe('Abstract pool test suite', () => { elu: { median: false } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - choiceRetries: 6, + retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - choiceRetries: 6, + retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -426,7 +558,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - choiceRetries: 6, + retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -458,6 +590,22 @@ describe('Abstract pool test suite', () => { 'Invalid worker choice strategy options: must be a plain object' ) ) + expect(() => + pool.setWorkerChoiceStrategyOptions({ + retries: 'invalidChoiceRetries' + }) + ).toThrowError( + new TypeError( + 'Invalid worker choice strategy options: retries must be an integer' + ) + ) + expect(() => + pool.setWorkerChoiceStrategyOptions({ retries: -1 }) + ).toThrowError( + new RangeError( + "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" + ) + ) expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} }) ).toThrowError( @@ -482,15 +630,41 @@ describe('Abstract pool test suite', () => { ) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeUndefined() + expect(workerNode.onBackPressure).toBeUndefined() + } pool.enableTasksQueue(true) expect(pool.opts.enableTasksQueue).toBe(true) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 1, + size: 4, + taskStealing: true, + tasksStealingOnBackPressure: true + }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) + expect(workerNode.onBackPressure).toBeInstanceOf(Function) + } pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 2, + size: 4, + taskStealing: true, + tasksStealingOnBackPressure: true + }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) + expect(workerNode.onBackPressure).toBeInstanceOf(Function) + } pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeUndefined() + expect(workerNode.onBackPressure).toBeUndefined() + } await pool.destroy() }) @@ -500,26 +674,76 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.js', { enableTasksQueue: true } ) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 }) - pool.setTasksQueueOptions({ concurrency: 2 }) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 1, + size: 4, + taskStealing: true, + tasksStealingOnBackPressure: true + }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) + expect(workerNode.onBackPressure).toBeInstanceOf(Function) + } + pool.setTasksQueueOptions({ + concurrency: 2, + taskStealing: false, + tasksStealingOnBackPressure: false + }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 2, + size: 4, + taskStealing: false, + tasksStealingOnBackPressure: false + }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeUndefined() + expect(workerNode.onBackPressure).toBeUndefined() + } + pool.setTasksQueueOptions({ + concurrency: 1, + taskStealing: true, + tasksStealingOnBackPressure: true + }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 1, + size: 4, + taskStealing: true, + tasksStealingOnBackPressure: true + }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) + expect(workerNode.onBackPressure).toBeInstanceOf(Function) + } expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions') ).toThrowError( new TypeError('Invalid tasks queue options: must be a plain object') ) expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError( - new Error( - 'Invalid worker tasks concurrency: 0 is a negative integer or zero' + new RangeError( + 'Invalid worker node tasks concurrency: 0 is a negative integer or zero' ) ) expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError( - new Error( - 'Invalid worker tasks concurrency: -1 is a negative integer or zero' + new RangeError( + 'Invalid worker node tasks concurrency: -1 is a negative integer or zero' ) ) expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError( - new TypeError('Invalid worker tasks concurrency: must be an integer') + new TypeError('Invalid worker node tasks concurrency: must be an integer') + ) + expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError( + new RangeError( + 'Invalid worker node tasks queue size: 0 is a negative integer or zero' + ) + ) + expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError( + new RangeError( + 'Invalid worker node tasks queue size: -1 is a negative integer or zero' + ) + ) + expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError( + new TypeError('Invalid worker node tasks queue size: must be an integer') ) await pool.destroy() }) @@ -533,6 +757,7 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.fixed, worker: WorkerTypes.thread, + started: true, ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: numberOfWorkers, @@ -554,6 +779,7 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.dynamic, worker: WorkerTypes.cluster, + started: true, ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: Math.floor(numberOfWorkers / 2), @@ -574,26 +800,28 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testWorker.js' ) for (const workerNode of pool.workerNodes) { + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.usage).toStrictEqual({ tasks: { executed: 0, executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, waitTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, elu: { idle: { - history: expect.any(CircularArray) + history: new CircularArray() }, active: { - history: expect.any(CircularArray) + history: new CircularArray() } } }) @@ -607,8 +835,8 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testWorker.js' ) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksQueue).toBeDefined() - expect(workerNode.tasksQueue).toBeInstanceOf(Queue) + expect(workerNode).toBeInstanceOf(WorkerNode) + expect(workerNode.tasksQueue).toBeInstanceOf(Deque) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) } @@ -619,11 +847,12 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.js' ) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksQueue).toBeDefined() - expect(workerNode.tasksQueue).toBeInstanceOf(Queue) + expect(workerNode).toBeInstanceOf(WorkerNode) + expect(workerNode.tasksQueue).toBeInstanceOf(Deque) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) } + await pool.destroy() }) it('Verify that pool worker info are initialized', async () => { @@ -632,6 +861,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testWorker.js' ) for (const workerNode of pool.workerNodes) { + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.info).toStrictEqual({ id: expect.any(Number), type: WorkerTypes.cluster, @@ -646,6 +876,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.js' ) for (const workerNode of pool.workerNodes) { + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.info).toStrictEqual({ id: expect.any(Number), type: WorkerTypes.thread, @@ -653,6 +884,54 @@ describe('Abstract pool test suite', () => { ready: true }) } + await pool.destroy() + }) + + it('Verify that pool can be started after initialization', async () => { + const pool = new FixedClusterPool( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js', + { + startWorkers: false + } + ) + expect(pool.info.started).toBe(false) + expect(pool.info.ready).toBe(false) + expect(pool.workerNodes).toStrictEqual([]) + await expect(pool.execute()).rejects.toThrowError( + new Error('Cannot execute a task on not started pool') + ) + pool.start() + expect(pool.info.started).toBe(true) + expect(pool.info.ready).toBe(true) + expect(pool.workerNodes.length).toBe(numberOfWorkers) + for (const workerNode of pool.workerNodes) { + expect(workerNode).toBeInstanceOf(WorkerNode) + } + await pool.destroy() + }) + + it('Verify that pool execute() arguments are checked', async () => { + const pool = new FixedClusterPool( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js' + ) + await expect(pool.execute(undefined, 0)).rejects.toThrowError( + new TypeError('name argument must be a string') + ) + await expect(pool.execute(undefined, '')).rejects.toThrowError( + new TypeError('name argument must not be an empty string') + ) + await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError( + new TypeError('transferList argument must be an array') + ) + await expect(pool.execute(undefined, 'unknown')).rejects.toBe( + "Task function 'unknown' not found" + ) + await pool.destroy() + await expect(pool.execute()).rejects.toThrowError( + new Error('Cannot execute a task on not started pool') + ) }) it('Verify that pool worker tasks usage are computed', async () => { @@ -672,6 +951,7 @@ describe('Abstract pool test suite', () => { executing: maxMultiplier, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -698,6 +978,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -738,6 +1019,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -772,6 +1054,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -805,7 +1088,7 @@ describe('Abstract pool test suite', () => { ) let poolInfo let poolReady = 0 - pool.emitter.on(PoolEvents.ready, (info) => { + pool.emitter.on(PoolEvents.ready, info => { ++poolReady poolInfo = info }) @@ -815,6 +1098,7 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.dynamic, worker: WorkerTypes.cluster, + started: true, ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), @@ -837,7 +1121,7 @@ describe('Abstract pool test suite', () => { const promises = new Set() let poolBusy = 0 let poolInfo - pool.emitter.on(PoolEvents.busy, (info) => { + pool.emitter.on(PoolEvents.busy, info => { ++poolBusy poolInfo = info }) @@ -852,7 +1136,8 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.fixed, worker: WorkerTypes.thread, - ready: expect.any(Boolean), + started: true, + ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), maxSize: expect.any(Number), @@ -875,7 +1160,7 @@ describe('Abstract pool test suite', () => { const promises = new Set() let poolFull = 0 let poolInfo - pool.emitter.on(PoolEvents.full, (info) => { + pool.emitter.on(PoolEvents.full, info => { ++poolFull poolInfo = info }) @@ -888,7 +1173,8 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.dynamic, worker: WorkerTypes.thread, - ready: expect.any(Boolean), + started: true, + ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), maxSize: expect.any(Number), @@ -914,20 +1200,21 @@ describe('Abstract pool test suite', () => { const promises = new Set() let poolBackPressure = 0 let poolInfo - pool.emitter.on(PoolEvents.backPressure, (info) => { + pool.emitter.on(PoolEvents.backPressure, info => { ++poolBackPressure poolInfo = info }) - for (let i = 0; i < numberOfWorkers * 2; i++) { + for (let i = 0; i < numberOfWorkers + 1; i++) { promises.add(pool.execute()) } await Promise.all(promises) - expect(poolBackPressure).toBe(2) + expect(poolBackPressure).toBe(1) expect(poolInfo).toStrictEqual({ version, type: PoolTypes.fixed, worker: WorkerTypes.thread, - ready: expect.any(Boolean), + started: true, + ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), maxSize: expect.any(Number), @@ -939,6 +1226,7 @@ describe('Abstract pool test suite', () => { maxQueuedTasks: expect.any(Number), queuedTasks: expect.any(Number), backPressure: true, + stolenTasks: expect.any(Number), failedTasks: expect.any(Number) }) expect(pool.hasBackPressure.called).toBe(true) @@ -953,7 +1241,7 @@ describe('Abstract pool test suite', () => { ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([ - 'default', + DEFAULT_TASK_NAME, 'jsonIntegerSerialization', 'factorial', 'fibonacci' @@ -964,11 +1252,13 @@ describe('Abstract pool test suite', () => { ) await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1) expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([ - 'default', + DEFAULT_TASK_NAME, 'jsonIntegerSerialization', 'factorial', 'fibonacci' ]) + await dynamicThreadPool.destroy() + await fixedClusterPool.destroy() }) it('Verify that multiple task functions worker is working', async () => { @@ -990,7 +1280,7 @@ describe('Abstract pool test suite', () => { expect(pool.info.executedTasks).toBe(4) for (const workerNode of pool.workerNodes) { expect(workerNode.info.taskFunctions).toStrictEqual([ - 'default', + DEFAULT_TASK_NAME, 'jsonIntegerSerialization', 'factorial', 'fibonacci' @@ -1000,9 +1290,10 @@ describe('Abstract pool test suite', () => { expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({ tasks: { executed: expect.any(Number), - executing: expect.any(Number), + executing: 0, failed: 0, - queued: 0 + queued: 0, + stolen: 0 }, runTime: { history: expect.any(CircularArray) @@ -1020,9 +1311,31 @@ describe('Abstract pool test suite', () => { } }) expect( - workerNode.getTaskFunctionWorkerUsage(name).tasks.executing - ).toBeGreaterThanOrEqual(0) + workerNode.getTaskFunctionWorkerUsage(name).tasks.executed + ).toBeGreaterThan(0) } + expect( + workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) + ).toStrictEqual( + workerNode.getTaskFunctionWorkerUsage(workerNode.info.taskFunctions[1]) + ) } + await pool.destroy() + }) + + it('Verify sendKillMessageToWorker()', async () => { + const pool = new DynamicClusterPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js' + ) + const workerNodeKey = 0 + await expect( + pool.sendKillMessageToWorker( + workerNodeKey, + pool.workerNodes[workerNodeKey].info.id + ) + ).resolves.toBeUndefined() + await pool.destroy() }) })