X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract-pool.test.mjs;h=ecb1c5670e095a4f89678f8a511a99721084d3ea;hb=5993cfc5a031fd2d64f644505db125bc1d28b05b;hp=b4b9d706d0a10c9fa3fbb5ff1a24c1af81f46624;hpb=14d5e18329a40faffe8c95a9323d72872efdc3de;p=poolifier.git diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index b4b9d706..ecb1c567 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1,10 +1,14 @@ +// eslint-disable-next-line n/no-unsupported-features/node-builtins +import { createHook, executionAsyncId } from 'node:async_hooks' import { EventEmitterAsyncResource } from 'node:events' -import { dirname, join } from 'node:path' import { readFileSync } from 'node:fs' +import { dirname, join } from 'node:path' import { fileURLToPath } from 'node:url' -import { createHook, executionAsyncId } from 'node:async_hooks' + import { expect } from 'expect' import { restore, stub } from 'sinon' + +import { CircularBuffer } from '../../lib/circular-buffer.cjs' import { DynamicClusterPool, DynamicThreadPool, @@ -13,13 +17,12 @@ import { PoolEvents, PoolTypes, WorkerChoiceStrategies, - WorkerTypes + WorkerTypes, } from '../../lib/index.cjs' -import { CircularArray } from '../../lib/circular-array.cjs' -import { Deque } from '../../lib/deque.cjs' +import { WorkerNode } from '../../lib/pools/worker-node.cjs' +import { defaultBucketSize, PriorityQueue } from '../../lib/priority-queue.cjs' import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' import { waitPoolEvents } from '../test-utils.cjs' -import { WorkerNode } from '../../lib/pools/worker-node.cjs' describe('Abstract pool test suite', () => { const version = JSON.parse( @@ -55,7 +58,7 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.mjs', { - errorHandler: e => console.error(e) + errorHandler: e => console.error(e), } ) ).toThrow( @@ -221,14 +224,15 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs' ) expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource) + expect(pool.emitter.eventNames()).toStrictEqual([]) expect(pool.opts).toStrictEqual({ startWorkers: true, enableEvents: true, restartWorkerOnError: true, enableTasksQueue: false, - workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN + workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN, }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: false }, @@ -236,8 +240,8 @@ describe('Abstract pool test suite', () => { elu: { median: false }, weights: expect.objectContaining({ 0: expect.any(Number), - [pool.info.maxSize - 1]: expect.any(Number) - }) + [pool.info.maxSize - 1]: expect.any(Number), + }), }) } await pool.destroy() @@ -249,7 +253,7 @@ describe('Abstract pool test suite', () => { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, workerChoiceStrategyOptions: { runTime: { median: true }, - weights: { 0: 300, 1: 200 } + weights: { 0: 300, 1: 200 }, }, enableEvents: false, restartWorkerOnError: false, @@ -258,7 +262,7 @@ describe('Abstract pool test suite', () => { messageHandler: testHandler, errorHandler: testHandler, onlineHandler: testHandler, - exitHandler: testHandler + exitHandler: testHandler, } ) expect(pool.emitter).toBeUndefined() @@ -271,26 +275,26 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, - tasksFinishedTimeout: 2000 + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 2000, }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, workerChoiceStrategyOptions: { runTime: { median: true }, - weights: { 0: 300, 1: 200 } + weights: { 0: 300, 1: 200 }, }, onlineHandler: testHandler, messageHandler: testHandler, errorHandler: testHandler, - exitHandler: testHandler + exitHandler: testHandler, }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, - weights: { 0: 300, 1: 200 } + weights: { 0: 300, 1: 200 }, }) } await pool.destroy() @@ -303,7 +307,7 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.mjs', { - workerChoiceStrategy: 'invalidStrategy' + workerChoiceStrategy: 'invalidStrategy', } ) ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'")) @@ -313,7 +317,7 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.mjs', { - workerChoiceStrategyOptions: { weights: {} } + workerChoiceStrategyOptions: { weights: {} }, } ) ).toThrow( @@ -327,7 +331,7 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.mjs', { - workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' } + workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }, } ) ).toThrow( @@ -342,7 +346,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, - tasksQueueOptions: 'invalidTasksQueueOptions' + tasksQueueOptions: 'invalidTasksQueueOptions', } ) ).toThrow( @@ -355,7 +359,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, - tasksQueueOptions: { concurrency: 0 } + tasksQueueOptions: { concurrency: 0 }, } ) ).toThrow( @@ -370,7 +374,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, - tasksQueueOptions: { concurrency: -1 } + tasksQueueOptions: { concurrency: -1 }, } ) ).toThrow( @@ -385,7 +389,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, - tasksQueueOptions: { concurrency: 0.2 } + tasksQueueOptions: { concurrency: 0.2 }, } ) ).toThrow( @@ -398,7 +402,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, - tasksQueueOptions: { size: 0 } + tasksQueueOptions: { size: 0 }, } ) ).toThrow( @@ -413,7 +417,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, - tasksQueueOptions: { size: -1 } + tasksQueueOptions: { size: -1 }, } ) ).toThrow( @@ -428,7 +432,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { enableTasksQueue: true, - tasksQueueOptions: { size: 0.2 } + tasksQueueOptions: { size: 0.2 }, } ) ).toThrow( @@ -443,7 +447,7 @@ describe('Abstract pool test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: false }, @@ -451,38 +455,38 @@ describe('Abstract pool test suite', () => { elu: { median: false }, weights: expect.objectContaining({ 0: expect.any(Number), - [pool.info.maxSize - 1]: expect.any(Number) - }) + [pool.info.maxSize - 1]: expect.any(Number), + }), }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, average: true, - median: false + median: false, }, waitTime: { - aggregate: false, - average: false, - median: false + aggregate: true, + average: true, + median: false, }, elu: { aggregate: true, average: true, - median: false - } + median: false, + }, }) pool.setWorkerChoiceStrategyOptions({ runTime: { median: true }, - elu: { median: true } + elu: { median: true }, }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ runTime: { median: true }, - elu: { median: true } + elu: { median: true }, }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: true }, @@ -490,38 +494,38 @@ describe('Abstract pool test suite', () => { elu: { median: true }, weights: expect.objectContaining({ 0: expect.any(Number), - [pool.info.maxSize - 1]: expect.any(Number) - }) + [pool.info.maxSize - 1]: expect.any(Number), + }), }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, average: false, - median: true + median: true, }, waitTime: { - aggregate: false, - average: false, - median: false + aggregate: true, + average: true, + median: false, }, elu: { aggregate: true, average: false, - median: true - } + median: true, + }, }) pool.setWorkerChoiceStrategyOptions({ runTime: { median: false }, - elu: { median: false } + elu: { median: false }, }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ runTime: { median: false }, - elu: { median: false } + elu: { median: false }, }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: false }, @@ -529,28 +533,28 @@ describe('Abstract pool test suite', () => { elu: { median: false }, weights: expect.objectContaining({ 0: expect.any(Number), - [pool.info.maxSize - 1]: expect.any(Number) - }) + [pool.info.maxSize - 1]: expect.any(Number), + }), }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, average: true, - median: false + median: false, }, waitTime: { - aggregate: false, - average: false, - median: false + aggregate: true, + average: true, + median: false, }, elu: { aggregate: true, average: true, - median: false - } + median: false, + }, }) expect(() => pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions') @@ -587,8 +591,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, - tasksFinishedTimeout: 2000 + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 2000, }) pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) @@ -596,8 +600,8 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, - tasksFinishedTimeout: 2000 + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 2000, }) pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) @@ -615,8 +619,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, - tasksFinishedTimeout: 2000 + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 2000, }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -628,14 +632,14 @@ describe('Abstract pool test suite', () => { size: 2, taskStealing: false, tasksStealingOnBackPressure: false, - tasksFinishedTimeout: 3000 + tasksFinishedTimeout: 3000, }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, size: 2, taskStealing: false, tasksStealingOnBackPressure: false, - tasksFinishedTimeout: 3000 + tasksFinishedTimeout: 3000, }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -645,14 +649,14 @@ describe('Abstract pool test suite', () => { pool.setTasksQueueOptions({ concurrency: 1, taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: true, - tasksFinishedTimeout: 2000 + tasksFinishedTimeout: 2000, }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -702,7 +706,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: 0, minSize: numberOfWorkers, maxSize: numberOfWorkers, workerNodes: numberOfWorkers, @@ -710,7 +715,7 @@ describe('Abstract pool test suite', () => { busyWorkerNodes: 0, executedTasks: 0, executingTasks: 0, - failedTasks: 0 + failedTasks: 0, }) await pool.destroy() pool = new DynamicClusterPool( @@ -724,7 +729,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.cluster, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: 0, minSize: Math.floor(numberOfWorkers / 2), maxSize: numberOfWorkers, workerNodes: Math.floor(numberOfWorkers / 2), @@ -732,7 +738,7 @@ describe('Abstract pool test suite', () => { busyWorkerNodes: 0, executedTasks: 0, executingTasks: 0, - failedTasks: 0 + failedTasks: 0, }) await pool.destroy() }) @@ -752,22 +758,22 @@ describe('Abstract pool test suite', () => { maxQueued: 0, sequentiallyStolen: 0, stolen: 0, - failed: 0 + failed: 0, }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer), }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer), }, elu: { idle: { - history: new CircularArray() + history: expect.any(CircularBuffer), }, active: { - history: new CircularArray() - } - } + history: expect.any(CircularBuffer), + }, + }, }) } await pool.destroy() @@ -780,9 +786,11 @@ describe('Abstract pool test suite', () => { ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) - expect(workerNode.tasksQueue).toBeInstanceOf(Deque) + expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) + expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize) + expect(workerNode.tasksQueue.enablePriority).toBe(false) } await pool.destroy() pool = new DynamicThreadPool( @@ -792,9 +800,11 @@ describe('Abstract pool test suite', () => { ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) - expect(workerNode.tasksQueue).toBeInstanceOf(Deque) + expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) + expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize) + expect(workerNode.tasksQueue.enablePriority).toBe(false) } await pool.destroy() }) @@ -811,7 +821,8 @@ describe('Abstract pool test suite', () => { type: WorkerTypes.cluster, dynamic: false, ready: true, - stealing: false + stealing: false, + backPressure: false, }) } await pool.destroy() @@ -827,7 +838,8 @@ describe('Abstract pool test suite', () => { type: WorkerTypes.thread, dynamic: false, ready: true, - stealing: false + stealing: false, + backPressure: false, }) } await pool.destroy() @@ -856,13 +868,13 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/cluster/testWorker.cjs', { - startWorkers: false + startWorkers: false, } ) expect(pool.info.started).toBe(false) expect(pool.info.ready).toBe(false) - expect(pool.readyEventEmitted).toBe(false) expect(pool.workerNodes).toStrictEqual([]) + expect(pool.readyEventEmitted).toBe(false) await expect(pool.execute()).rejects.toThrow( new Error('Cannot execute a task on not started pool') ) @@ -920,22 +932,22 @@ describe('Abstract pool test suite', () => { maxQueued: 0, sequentiallyStolen: 0, stolen: 0, - failed: 0 + failed: 0, }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, active: { - history: expect.any(CircularArray) - } - } + history: expect.any(CircularBuffer), + }, + }, }) } await Promise.all(promises) @@ -948,28 +960,28 @@ describe('Abstract pool test suite', () => { maxQueued: 0, sequentiallyStolen: 0, stolen: 0, - failed: 0 + failed: 0, }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, active: { - history: expect.any(CircularArray) - } - } + history: expect.any(CircularBuffer), + }, + }, }) } await pool.destroy() }) - it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => { + it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => { const pool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, @@ -990,63 +1002,59 @@ describe('Abstract pool test suite', () => { maxQueued: 0, sequentiallyStolen: 0, stolen: 0, - failed: 0 + failed: 0, }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, active: { - history: expect.any(CircularArray) - } - } + history: expect.any(CircularBuffer), + }, + }, }) expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( numberOfWorkers * maxMultiplier ) - expect(workerNode.usage.runTime.history.length).toBe(0) - expect(workerNode.usage.waitTime.history.length).toBe(0) - expect(workerNode.usage.elu.idle.history.length).toBe(0) - expect(workerNode.usage.elu.active.history.length).toBe(0) } pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) for (const workerNode of pool.workerNodes) { expect(workerNode.usage).toStrictEqual({ tasks: { - executed: 0, + executed: expect.any(Number), executing: 0, queued: 0, maxQueued: 0, sequentiallyStolen: 0, stolen: 0, - failed: 0 + failed: 0, }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, active: { - history: expect.any(CircularArray) - } - } + history: expect.any(CircularBuffer), + }, + }, }) - expect(workerNode.usage.runTime.history.length).toBe(0) - expect(workerNode.usage.waitTime.history.length).toBe(0) - expect(workerNode.usage.elu.idle.history.length).toBe(0) - expect(workerNode.usage.elu.active.history.length).toBe(0) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + numberOfWorkers * maxMultiplier + ) } await pool.destroy() }) @@ -1073,7 +1081,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.cluster, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1081,7 +1090,7 @@ describe('Abstract pool test suite', () => { busyWorkerNodes: expect.any(Number), executedTasks: expect.any(Number), executingTasks: expect.any(Number), - failedTasks: expect.any(Number) + failedTasks: expect.any(Number), }) await pool.destroy() }) @@ -1113,7 +1122,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1121,7 +1131,7 @@ describe('Abstract pool test suite', () => { busyWorkerNodes: expect.any(Number), executedTasks: expect.any(Number), executingTasks: expect.any(Number), - failedTasks: expect.any(Number) + failedTasks: expect.any(Number), }) await pool.destroy() }) @@ -1152,7 +1162,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1160,7 +1171,7 @@ describe('Abstract pool test suite', () => { busyWorkerNodes: expect.any(Number), executedTasks: expect.any(Number), executingTasks: expect.any(Number), - failedTasks: expect.any(Number) + failedTasks: expect.any(Number), }) await pool.destroy() }) @@ -1170,7 +1181,7 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.mjs', { - enableTasksQueue: true + enableTasksQueue: true, } ) stub(pool, 'hasBackPressure').returns(true) @@ -1194,7 +1205,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1207,7 +1219,7 @@ describe('Abstract pool test suite', () => { queuedTasks: expect.any(Number), backPressure: true, stolenTasks: expect.any(Number), - failedTasks: expect.any(Number) + failedTasks: expect.any(Number), }) expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7) await pool.destroy() @@ -1220,7 +1232,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/asyncWorker.mjs', { enableTasksQueue: true, - tasksQueueOptions: { tasksFinishedTimeout } + tasksQueueOptions: { tasksFinishedTimeout }, } ) const maxMultiplier = 4 @@ -1249,7 +1261,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/asyncWorker.mjs', { enableTasksQueue: true, - tasksQueueOptions: { tasksFinishedTimeout } + tasksQueueOptions: { tasksFinishedTimeout }, } ) const maxMultiplier = 4 @@ -1291,7 +1303,7 @@ describe('Abstract pool test suite', () => { }, promiseResolve () { if (executionAsyncId() === taskAsyncId) resolveCalls++ - } + }, }) const pool = new FixedThreadPool( numberOfWorkers, @@ -1353,29 +1365,72 @@ describe('Abstract pool test suite', () => { new TypeError('name argument must not be an empty string') ) await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow( - new TypeError('fn argument must be a function') + new TypeError('taskFunction property must be a function') ) await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow( - new TypeError('fn argument must be a function') + new TypeError('taskFunction property must be a function') + ) + await expect( + dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 }) + ).rejects.toThrow(new TypeError('taskFunction property must be a function')) + await expect( + dynamicThreadPool.addTaskFunction('test', { taskFunction: '' }) + ).rejects.toThrow(new TypeError('taskFunction property must be a function')) + await expect( + dynamicThreadPool.addTaskFunction('test', { + taskFunction: () => {}, + priority: -21, + }) + ).rejects.toThrow( + new RangeError("Property 'priority' must be between -20 and 19") + ) + await expect( + dynamicThreadPool.addTaskFunction('test', { + taskFunction: () => {}, + priority: 20, + }) + ).rejects.toThrow( + new RangeError("Property 'priority' must be between -20 and 19") + ) + await expect( + dynamicThreadPool.addTaskFunction('test', { + taskFunction: () => {}, + strategy: 'invalidStrategy', + }) + ).rejects.toThrow( + new Error("Invalid worker choice strategy 'invalidStrategy'") ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, ]) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(), + ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN]) const echoTaskFunction = data => { return data } await expect( - dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + dynamicThreadPool.addTaskFunction('echo', { + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU, + }) ).resolves.toBe(true) expect(dynamicThreadPool.taskFunctions.size).toBe(1) - expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual( - echoTaskFunction - ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test', - 'echo' + expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({ + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU, + }) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(), + ]).toStrictEqual([ + WorkerChoiceStrategies.ROUND_ROBIN, + WorkerChoiceStrategies.LEAST_ELU, + ]) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }, ]) const taskFunctionData = { test: 'test' } const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo') @@ -1388,23 +1443,63 @@ describe('Abstract pool test suite', () => { queued: 0, sequentiallyStolen: 0, stolen: 0, - failed: 0 + failed: 0, }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer), }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer), }, - elu: { - idle: { - history: new CircularArray() - }, - active: { - history: new CircularArray() - } - } + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularBuffer), + }), + active: expect.objectContaining({ + history: expect.any(CircularBuffer), + }), + }), }) + expect( + workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed + ).toBeGreaterThan(0) + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate == + null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate + ).toBeGreaterThan(0) + } + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate + ).toBeGreaterThanOrEqual(0) + } + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeGreaterThanOrEqual(0) + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeLessThanOrEqual(1) + } } await dynamicThreadPool.destroy() }) @@ -1416,9 +1511,9 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, ]) await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow( new Error('Cannot remove a task function not handled on the pool side') @@ -1426,40 +1521,53 @@ describe('Abstract pool test suite', () => { const echoTaskFunction = data => { return data } - await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + await dynamicThreadPool.addTaskFunction('echo', { + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU, + }) expect(dynamicThreadPool.taskFunctions.size).toBe(1) - expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual( - echoTaskFunction - ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test', - 'echo' + expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({ + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU, + }) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(), + ]).toStrictEqual([ + WorkerChoiceStrategies.ROUND_ROBIN, + WorkerChoiceStrategies.LEAST_ELU, + ]) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }, ]) await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe( true ) expect(dynamicThreadPool.taskFunctions.size).toBe(0) expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined() - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test' + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(), + ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN]) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, ]) await dynamicThreadPool.destroy() }) - it('Verify that listTaskFunctionNames() is working', async () => { + it('Verify that listTaskFunctionsProperties() is working', async () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' }, ]) await dynamicThreadPool.destroy() const fixedClusterPool = new FixedClusterPool( @@ -1467,11 +1575,11 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs' ) await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1) - expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' }, ]) await fixedClusterPool.destroy() }) @@ -1503,29 +1611,29 @@ describe('Abstract pool test suite', () => { `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'` ) ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' }, ]) await expect( dynamicThreadPool.setDefaultTaskFunction('factorial') ).resolves.toBe(true) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'factorial', - 'jsonIntegerSerialization', - 'fibonacci' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'factorial' }, + { name: 'jsonIntegerSerialization' }, + { name: 'fibonacci' }, ]) await expect( dynamicThreadPool.setDefaultTaskFunction('fibonacci') ).resolves.toBe(true) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'fibonacci', - 'jsonIntegerSerialization', - 'factorial' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'fibonacci' }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, ]) await dynamicThreadPool.destroy() }) @@ -1548,47 +1656,160 @@ describe('Abstract pool test suite', () => { expect(pool.info.executingTasks).toBe(0) expect(pool.info.executedTasks).toBe(4) for (const workerNode of pool.workerNodes) { - expect(workerNode.info.taskFunctionNames).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(workerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' }, ]) expect(workerNode.taskFunctionsUsage.size).toBe(3) - for (const name of pool.listTaskFunctionNames()) { - expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({ + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.tasksQueue.enablePriority).toBe(false) + for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) { + expect( + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + ).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, failed: 0, queued: 0, sequentiallyStolen: 0, - stolen: 0 + stolen: 0, }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer), }, active: { - history: expect.any(CircularArray) - } - } + history: expect.any(CircularBuffer), + }, + }, }) expect( - workerNode.getTaskFunctionWorkerUsage(name).tasks.executed + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + .tasks.executed ).toBeGreaterThan(0) } expect( workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) ).toStrictEqual( workerNode.getTaskFunctionWorkerUsage( - workerNode.info.taskFunctionNames[1] + workerNode.info.taskFunctionsProperties[1].name + ) + ) + } + await pool.destroy() + }) + + it('Verify that mapExecute() is working', async () => { + const pool = new DynamicThreadPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs' + ) + expect(() => pool.mapExecute()).toThrow(new TypeError('data argument must be a defined iterable')) + expect(() => pool.mapExecute(0)).toThrow(new TypeError('data argument must be an iterable')) + let results = await pool.mapExecute([{}, {}, {}, {}]) + expect(results).toStrictEqual([ + { ok: 1 }, + { ok: 1 }, + { ok: 1 }, + { ok: 1 }, + ]) + expect(pool.info.executingTasks).toBe(0) + expect(pool.info.executedTasks).toBe(4) + results = await pool.mapExecute([{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }], 'factorial') + expect(results).toStrictEqual([ + 3628800, + 2432902008176640000, + 2.6525285981219103e+32, + 8.159152832478977e+47, + ]) + expect(pool.info.executingTasks).toBe(0) + expect(pool.info.executedTasks).toBe(8) + results = await pool.mapExecute(new Set([{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }]), 'factorial') + expect(results).toStrictEqual([ + 3628800, + 2432902008176640000, + 2.6525285981219103e+32, + 8.159152832478977e+47, + ]) + expect(pool.info.executingTasks).toBe(0) + expect(pool.info.executedTasks).toBe(12) + await pool.destroy() + }) + + it('Verify that task function objects worker is working', async () => { + const pool = new DynamicThreadPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs' + ) + const data = { n: 10 } + const result0 = await pool.execute(data) + expect(result0).toStrictEqual({ ok: 1 }) + const result1 = await pool.execute(data, 'jsonIntegerSerialization') + expect(result1).toStrictEqual({ ok: 1 }) + const result2 = await pool.execute(data, 'factorial') + expect(result2).toBe(3628800) + const result3 = await pool.execute(data, 'fibonacci') + expect(result3).toBe(55) + expect(pool.info.executingTasks).toBe(0) + expect(pool.info.executedTasks).toBe(4) + for (const workerNode of pool.workerNodes) { + expect(workerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci', priority: -5 }, + ]) + expect(workerNode.taskFunctionsUsage.size).toBe(3) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.tasksQueue.enablePriority).toBe(true) + for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) { + expect( + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + ).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + failed: 0, + queued: 0, + sequentiallyStolen: 0, + stolen: 0, + }, + runTime: { + history: expect.any(CircularBuffer), + }, + waitTime: { + history: expect.any(CircularBuffer), + }, + elu: { + idle: { + history: expect.any(CircularBuffer), + }, + active: { + history: expect.any(CircularBuffer), + }, + }, + }) + expect( + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + .tasks.executed + ).toBeGreaterThan(0) + } + expect( + workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) + ).toStrictEqual( + workerNode.getTaskFunctionWorkerUsage( + workerNode.info.taskFunctionsProperties[1].name ) ) } @@ -1618,13 +1839,17 @@ describe('Abstract pool test suite', () => { await expect( pool.sendTaskFunctionOperationToWorker(workerNodeKey, { taskFunctionOperation: 'add', - taskFunctionName: 'empty', - taskFunction: (() => {}).toString() + taskFunctionProperties: { name: 'empty' }, + taskFunction: (() => {}).toString(), }) ).resolves.toBe(true) expect( - pool.workerNodes[workerNodeKey].info.taskFunctionNames - ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty']) + pool.workerNodes[workerNodeKey].info.taskFunctionsProperties + ).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'empty' }, + ]) await pool.destroy() }) @@ -1637,15 +1862,15 @@ describe('Abstract pool test suite', () => { await expect( pool.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', - taskFunctionName: 'empty', - taskFunction: (() => {}).toString() + taskFunctionProperties: { name: 'empty' }, + taskFunction: (() => {}).toString(), }) ).resolves.toBe(true) for (const workerNode of pool.workerNodes) { - expect(workerNode.info.taskFunctionNames).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test', - 'empty' + expect(workerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'empty' }, ]) } await pool.destroy()