X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract-pool.test.mjs;h=edf03f4ae327b4f7c79fe9a1a9c9917c91de6e19;hb=c5033c2df4fef157ccba4adaada5794b32e83f7f;hp=09a21c9d1fb3a57fb654c5f29c44adf703c025ba;hpb=a074ffee1b46f43d0dcfba58128748c7492104dd;p=poolifier.git diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 09a21c9d..edf03f4a 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1,5 +1,8 @@ import { EventEmitterAsyncResource } from 'node:events' +import { dirname, join } from 'node:path' import { readFileSync } from 'node:fs' +import { fileURLToPath } from 'node:url' +import { createHook, executionAsyncId } from 'node:async_hooks' import { expect } from 'expect' import { restore, stub } from 'sinon' import { @@ -19,7 +22,12 @@ import { waitPoolEvents } from '../test-utils.js' import { WorkerNode } from '../../lib/pools/worker-node.js' describe('Abstract pool test suite', () => { - const version = JSON.parse(readFileSync('./package.json', 'utf8')).version + const version = JSON.parse( + readFileSync( + join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'), + 'utf8' + ) + ).version const numberOfWorkers = 2 class StubPoolWithIsMain extends FixedThreadPool { isMain () { @@ -31,17 +39,26 @@ describe('Abstract pool test suite', () => { restore() }) - it('Simulate pool creation from a non main thread/process', () => { + it('Verify that pool can be created and destroyed', async () => { + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs' + ) + expect(pool).toBeInstanceOf(FixedThreadPool) + await pool.destroy() + }) + + it('Verify that pool cannot be created from a non main thread/process', () => { expect( () => new StubPoolWithIsMain( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { errorHandler: e => console.error(e) } ) - ).toThrowError( + ).toThrow( new Error( 'Cannot start a pool from a worker with the same type as the pool' ) @@ -51,32 +68,21 @@ describe('Abstract pool test suite', () => { it('Verify that pool statuses properties are set', async () => { const pool = new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) - expect(pool.starting).toBe(false) expect(pool.started).toBe(true) + expect(pool.starting).toBe(false) + expect(pool.destroying).toBe(false) await pool.destroy() }) it('Verify that filePath is checked', () => { - const expectedError = new Error( - 'Please specify a file with a worker implementation' - ) - expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError( - expectedError - ) - expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError( - expectedError - ) - expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError( - expectedError - ) - expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError( - expectedError + expect(() => new FixedThreadPool(numberOfWorkers)).toThrow( + new Error("Cannot find the worker file 'undefined'") ) expect( () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts') - ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'")) + ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'")) }) it('Verify that numberOfWorkers is checked', () => { @@ -84,9 +90,9 @@ describe('Abstract pool test suite', () => { () => new FixedThreadPool( undefined, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) - ).toThrowError( + ).toThrow( new Error( 'Cannot instantiate a pool without specifying the number of workers' ) @@ -97,7 +103,7 @@ describe('Abstract pool test suite', () => { expect( () => new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js') - ).toThrowError( + ).toThrow( new RangeError( 'Cannot instantiate a pool with a negative number of workers' ) @@ -107,8 +113,8 @@ describe('Abstract pool test suite', () => { it('Verify that a non integer number of workers is checked', () => { expect( () => - new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js') - ).toThrowError( + new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs') + ).toThrow( new TypeError( 'Cannot instantiate a pool with a non safe integer number of workers' ) @@ -123,7 +129,7 @@ describe('Abstract pool test suite', () => { undefined, './tests/worker-files/cluster/testWorker.js' ) - ).toThrowError( + ).toThrow( new TypeError( 'Cannot instantiate a dynamic pool without specifying the maximum pool size' ) @@ -133,9 +139,9 @@ describe('Abstract pool test suite', () => { new DynamicThreadPool( 0.5, 1, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) - ).toThrowError( + ).toThrow( new TypeError( 'Cannot instantiate a pool with a non safe integer number of workers' ) @@ -147,23 +153,31 @@ describe('Abstract pool test suite', () => { 0.5, './tests/worker-files/cluster/testWorker.js' ) - ).toThrowError( + ).toThrow( new TypeError( 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' ) ) expect( () => - new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js') - ).toThrowError( + new DynamicThreadPool( + 2, + 1, + './tests/worker-files/thread/testWorker.mjs' + ) + ).toThrow( new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' ) ) expect( () => - new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js') - ).toThrowError( + new DynamicThreadPool( + 0, + 0, + './tests/worker-files/thread/testWorker.mjs' + ) + ).toThrow( new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' ) @@ -175,7 +189,7 @@ describe('Abstract pool test suite', () => { 1, './tests/worker-files/cluster/testWorker.js' ) - ).toThrowError( + ).toThrow( new RangeError( 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' ) @@ -185,7 +199,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool options are checked', async () => { let pool = new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource) expect(pool.opts).toStrictEqual({ @@ -220,7 +234,7 @@ describe('Abstract pool test suite', () => { const testHandler = () => console.info('test handler executed') pool = new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, workerChoiceStrategyOptions: { @@ -282,31 +296,29 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it('Verify that pool options are validated', async () => { + it('Verify that pool options are validated', () => { expect( () => new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy: 'invalidStrategy' } ) - ).toThrowError( - new Error("Invalid worker choice strategy 'invalidStrategy'") - ) + ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'")) expect( () => new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategyOptions: { retries: 'invalidChoiceRetries' } } ) - ).toThrowError( + ).toThrow( new TypeError( 'Invalid worker choice strategy options: retries must be an integer' ) @@ -315,14 +327,14 @@ describe('Abstract pool test suite', () => { () => new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategyOptions: { retries: -1 } } ) - ).toThrowError( + ).toThrow( new RangeError( "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" ) @@ -331,12 +343,12 @@ describe('Abstract pool test suite', () => { () => new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategyOptions: { weights: {} } } ) - ).toThrowError( + ).toThrow( new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' ) @@ -345,12 +357,12 @@ describe('Abstract pool test suite', () => { () => new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' } } ) - ).toThrowError( + ).toThrow( new Error( "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" ) @@ -359,26 +371,26 @@ describe('Abstract pool test suite', () => { () => new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, tasksQueueOptions: 'invalidTasksQueueOptions' } ) - ).toThrowError( + ).toThrow( new TypeError('Invalid tasks queue options: must be a plain object') ) expect( () => new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, tasksQueueOptions: { concurrency: 0 } } ) - ).toThrowError( + ).toThrow( new RangeError( 'Invalid worker node tasks concurrency: 0 is a negative integer or zero' ) @@ -387,13 +399,13 @@ describe('Abstract pool test suite', () => { () => new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, tasksQueueOptions: { concurrency: -1 } } ) - ).toThrowError( + ).toThrow( new RangeError( 'Invalid worker node tasks concurrency: -1 is a negative integer or zero' ) @@ -402,26 +414,26 @@ describe('Abstract pool test suite', () => { () => new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, tasksQueueOptions: { concurrency: 0.2 } } ) - ).toThrowError( + ).toThrow( new TypeError('Invalid worker node tasks concurrency: must be an integer') ) expect( () => new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, tasksQueueOptions: { size: 0 } } ) - ).toThrowError( + ).toThrow( new RangeError( 'Invalid worker node tasks queue size: 0 is a negative integer or zero' ) @@ -430,13 +442,13 @@ describe('Abstract pool test suite', () => { () => new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, tasksQueueOptions: { size: -1 } } ) - ).toThrowError( + ).toThrow( new RangeError( 'Invalid worker node tasks queue size: -1 is a negative integer or zero' ) @@ -445,13 +457,13 @@ describe('Abstract pool test suite', () => { () => new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, tasksQueueOptions: { size: 0.2 } } ) - ).toThrowError( + ).toThrow( new TypeError('Invalid worker node tasks queue size: must be an integer') ) }) @@ -459,7 +471,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker choice strategy options can be set', async () => { const pool = new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ @@ -592,7 +604,7 @@ describe('Abstract pool test suite', () => { }) expect(() => pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions') - ).toThrowError( + ).toThrow( new TypeError( 'Invalid worker choice strategy options: must be a plain object' ) @@ -601,28 +613,24 @@ describe('Abstract pool test suite', () => { pool.setWorkerChoiceStrategyOptions({ retries: 'invalidChoiceRetries' }) - ).toThrowError( + ).toThrow( new TypeError( 'Invalid worker choice strategy options: retries must be an integer' ) ) - expect(() => - pool.setWorkerChoiceStrategyOptions({ retries: -1 }) - ).toThrowError( + expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow( new RangeError( "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" ) ) - expect(() => - pool.setWorkerChoiceStrategyOptions({ weights: {} }) - ).toThrowError( + expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow( new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' ) ) expect(() => pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' }) - ).toThrowError( + ).toThrow( new Error( "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" ) @@ -633,14 +641,10 @@ describe('Abstract pool test suite', () => { it('Verify that pool tasks queue can be enabled/disabled', async () => { const pool = new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() - for (const workerNode of pool.workerNodes) { - expect(workerNode.onEmptyQueue).toBeUndefined() - expect(workerNode.onBackPressure).toBeUndefined() - } pool.enableTasksQueue(true) expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ @@ -649,10 +653,6 @@ describe('Abstract pool test suite', () => { taskStealing: true, tasksStealingOnBackPressure: true }) - for (const workerNode of pool.workerNodes) { - expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) - expect(workerNode.onBackPressure).toBeInstanceOf(Function) - } pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ @@ -661,24 +661,16 @@ describe('Abstract pool test suite', () => { taskStealing: true, tasksStealingOnBackPressure: true }) - for (const workerNode of pool.workerNodes) { - expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) - expect(workerNode.onBackPressure).toBeInstanceOf(Function) - } pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() - for (const workerNode of pool.workerNodes) { - expect(workerNode.onEmptyQueue).toBeUndefined() - expect(workerNode.onBackPressure).toBeUndefined() - } await pool.destroy() }) it('Verify that pool tasks queue options can be set', async () => { const pool = new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true } ) expect(pool.opts.tasksQueueOptions).toStrictEqual({ @@ -691,8 +683,6 @@ describe('Abstract pool test suite', () => { expect(workerNode.tasksQueueBackPressureSize).toBe( pool.opts.tasksQueueOptions.size ) - expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) - expect(workerNode.onBackPressure).toBeInstanceOf(Function) } pool.setTasksQueueOptions({ concurrency: 2, @@ -710,8 +700,6 @@ describe('Abstract pool test suite', () => { expect(workerNode.tasksQueueBackPressureSize).toBe( pool.opts.tasksQueueOptions.size ) - expect(workerNode.onEmptyQueue).toBeUndefined() - expect(workerNode.onBackPressure).toBeUndefined() } pool.setTasksQueueOptions({ concurrency: 1, @@ -728,38 +716,34 @@ describe('Abstract pool test suite', () => { expect(workerNode.tasksQueueBackPressureSize).toBe( pool.opts.tasksQueueOptions.size ) - expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) - expect(workerNode.onBackPressure).toBeInstanceOf(Function) } - expect(() => - pool.setTasksQueueOptions('invalidTasksQueueOptions') - ).toThrowError( + expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow( new TypeError('Invalid tasks queue options: must be a plain object') ) - expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError( + expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow( new RangeError( 'Invalid worker node tasks concurrency: 0 is a negative integer or zero' ) ) - expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError( + expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow( new RangeError( 'Invalid worker node tasks concurrency: -1 is a negative integer or zero' ) ) - expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError( + expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow( new TypeError('Invalid worker node tasks concurrency: must be an integer') ) - expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError( + expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow( new RangeError( 'Invalid worker node tasks queue size: 0 is a negative integer or zero' ) ) - expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError( + expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow( new RangeError( 'Invalid worker node tasks queue size: -1 is a negative integer or zero' ) ) - expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError( + expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow( new TypeError('Invalid worker node tasks queue size: must be an integer') ) await pool.destroy() @@ -768,7 +752,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool info is set', async () => { let pool = new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) expect(pool.info).toStrictEqual({ version, @@ -824,6 +808,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -861,7 +846,7 @@ describe('Abstract pool test suite', () => { pool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -890,7 +875,7 @@ describe('Abstract pool test suite', () => { pool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -904,6 +889,24 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) + it('Verify that pool statuses are checked at start or destroy', async () => { + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs' + ) + expect(pool.info.started).toBe(true) + expect(pool.info.ready).toBe(true) + expect(() => pool.start()).toThrow( + new Error('Cannot start an already started pool') + ) + await pool.destroy() + expect(pool.info.started).toBe(false) + expect(pool.info.ready).toBe(false) + await expect(pool.destroy()).rejects.toThrow( + new Error('Cannot destroy an already destroyed pool') + ) + }) + it('Verify that pool can be started after initialization', async () => { const pool = new FixedClusterPool( numberOfWorkers, @@ -914,13 +917,16 @@ describe('Abstract pool test suite', () => { ) expect(pool.info.started).toBe(false) expect(pool.info.ready).toBe(false) + expect(pool.readyEventEmitted).toBe(false) expect(pool.workerNodes).toStrictEqual([]) - await expect(pool.execute()).rejects.toThrowError( + await expect(pool.execute()).rejects.toThrow( new Error('Cannot execute a task on not started pool') ) pool.start() expect(pool.info.started).toBe(true) expect(pool.info.ready).toBe(true) + await waitPoolEvents(pool, PoolEvents.ready, 1) + expect(pool.readyEventEmitted).toBe(true) expect(pool.workerNodes.length).toBe(numberOfWorkers) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -933,20 +939,20 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/cluster/testWorker.js' ) - await expect(pool.execute(undefined, 0)).rejects.toThrowError( + await expect(pool.execute(undefined, 0)).rejects.toThrow( new TypeError('name argument must be a string') ) - await expect(pool.execute(undefined, '')).rejects.toThrowError( + await expect(pool.execute(undefined, '')).rejects.toThrow( new TypeError('name argument must not be an empty string') ) - await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError( + await expect(pool.execute(undefined, undefined, {})).rejects.toThrow( new TypeError('transferList argument must be an array') ) await expect(pool.execute(undefined, 'unknown')).rejects.toBe( "Task function 'unknown' not found" ) await pool.destroy() - await expect(pool.execute()).rejects.toThrowError( + await expect(pool.execute()).rejects.toThrow( new Error('Cannot execute a task on not started pool') ) }) @@ -968,6 +974,7 @@ describe('Abstract pool test suite', () => { executing: maxMultiplier, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -995,6 +1002,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1021,7 +1029,7 @@ describe('Abstract pool test suite', () => { const pool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) const promises = new Set() const maxMultiplier = 2 @@ -1036,6 +1044,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1071,6 +1080,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1135,7 +1145,7 @@ describe('Abstract pool test suite', () => { it("Verify that pool event emitter 'busy' event can register a callback", async () => { const pool = new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) expect(pool.emitter.eventNames()).toStrictEqual([]) const promises = new Set() @@ -1176,7 +1186,7 @@ describe('Abstract pool test suite', () => { const pool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) expect(pool.emitter.eventNames()).toStrictEqual([]) const promises = new Set() @@ -1214,7 +1224,7 @@ describe('Abstract pool test suite', () => { it("Verify that pool event emitter 'backPressure' event can register a callback", async () => { const pool = new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js', + './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true } @@ -1258,11 +1268,48 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) + it('Verify that pool asynchronous resource track tasks execution', async () => { + let taskAsyncId + let initCalls = 0 + let beforeCalls = 0 + let afterCalls = 0 + let resolveCalls = 0 + const hook = createHook({ + init (asyncId, type) { + if (type === 'poolifier:task') { + initCalls++ + taskAsyncId = asyncId + } + }, + before (asyncId) { + if (asyncId === taskAsyncId) beforeCalls++ + }, + after (asyncId) { + if (asyncId === taskAsyncId) afterCalls++ + }, + promiseResolve () { + if (executionAsyncId() === taskAsyncId) resolveCalls++ + } + }) + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs' + ) + hook.enable() + await pool.execute() + hook.disable() + expect(initCalls).toBe(1) + expect(beforeCalls).toBe(1) + expect(afterCalls).toBe(1) + expect(resolveCalls).toBe(1) + await pool.destroy() + }) + it('Verify that hasTaskFunction() is working', async () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js' + './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true) @@ -1292,23 +1339,23 @@ describe('Abstract pool test suite', () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) await expect( dynamicThreadPool.addTaskFunction(0, () => {}) - ).rejects.toThrowError(new TypeError('name argument must be a string')) + ).rejects.toThrow(new TypeError('name argument must be a string')) await expect( dynamicThreadPool.addTaskFunction('', () => {}) - ).rejects.toThrowError( + ).rejects.toThrow( new TypeError('name argument must not be an empty string') ) - await expect( - dynamicThreadPool.addTaskFunction('test', 0) - ).rejects.toThrowError(new TypeError('fn argument must be a function')) - await expect( - dynamicThreadPool.addTaskFunction('test', '') - ).rejects.toThrowError(new TypeError('fn argument must be a function')) + await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow( + new TypeError('fn argument must be a function') + ) + await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow( + new TypeError('fn argument must be a function') + ) expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ DEFAULT_TASK_NAME, 'test' @@ -1337,6 +1384,7 @@ describe('Abstract pool test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1363,16 +1411,14 @@ describe('Abstract pool test suite', () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ DEFAULT_TASK_NAME, 'test' ]) - await expect( - dynamicThreadPool.removeTaskFunction('test') - ).rejects.toThrowError( + await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow( new Error('Cannot remove a task function not handled on the pool side') ) const echoTaskFunction = data => { @@ -1404,7 +1450,7 @@ describe('Abstract pool test suite', () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js' + './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ @@ -1432,28 +1478,27 @@ describe('Abstract pool test suite', () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js' + './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) - await expect( - dynamicThreadPool.setDefaultTaskFunction(0) - ).rejects.toThrowError( + const workerId = dynamicThreadPool.workerNodes[0].info.id + await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow( new Error( - "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'" + `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'` ) ) await expect( dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME) - ).rejects.toThrowError( + ).rejects.toThrow( new Error( - "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'" + `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'` ) ) await expect( dynamicThreadPool.setDefaultTaskFunction('unknown') - ).rejects.toThrowError( + ).rejects.toThrow( new Error( - "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'" + `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'` ) ) expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ @@ -1480,6 +1525,7 @@ describe('Abstract pool test suite', () => { 'jsonIntegerSerialization', 'factorial' ]) + await dynamicThreadPool.destroy() }) it('Verify that multiple task functions worker is working', async () => { @@ -1514,6 +1560,7 @@ describe('Abstract pool test suite', () => { executing: 0, failed: 0, queued: 0, + sequentiallyStolen: 0, stolen: 0 }, runTime: {