From 4735284627d276d3532cb3bd4b6d7fe392bb6b8b Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 16 Sep 2023 20:48:42 +0200 Subject: [PATCH] feat: add new pool options MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 2 + benchmarks/internal/bench.mjs | 59 +++++--- docs/api.md | 11 +- src/pools/abstract-pool.ts | 68 +++++---- src/pools/pool.ts | 27 +++- src/pools/worker-node.ts | 32 ++++- tests/pools/abstract/abstract-pool.test.js | 155 ++++++++++++--------- 7 files changed, 224 insertions(+), 130 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 87f4a866..6fec036f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Add `startWorkers` to pool options to whether start the minimum number of workers at pool creation or not. +- Add `tasksStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable tasks stealing or not and whether enable tasks stealing on back pressure or not. - Continuous internal benchmarking: https://poolifier.github.io/benchmark-results/dev/bench. ## [2.6.44] - 2023-09-08 diff --git a/benchmarks/internal/bench.mjs b/benchmarks/internal/bench.mjs index 23ab0625..258854f8 100644 --- a/benchmarks/internal/bench.mjs +++ b/benchmarks/internal/bench.mjs @@ -13,35 +13,52 @@ import { runPoolifierTest } from '../benchmarks-utils.mjs' +const poolifierSuite = new Benchmark.Suite('Poolifier', { + onCycle: event => { + console.info(event.target.toString()) + }, + onComplete: function () { + console.info( + 'Fastest is ' + LIST_FORMATTER.format(this.filter('fastest').map('name')) + ) + } +}) + const poolSize = availableParallelism() -const pools = [] +const benchmarkSettings = [] for (const poolType of Object.values(PoolTypes)) { for (const workerType of Object.values(WorkerTypes)) { if (workerType === WorkerTypes.cluster) { continue } for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { - for (const enableTasksQueue of [false]) { + for (const enableTasksQueue of [false, true]) { if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) { for (const measurement of [Measurements.runTime, Measurements.elu]) { - pools.push([ + benchmarkSettings.push([ `${poolType}|${workerType}|${workerChoiceStrategy}|tasks queue:${enableTasksQueue}|measurement:${measurement}`, - buildPoolifierPool(workerType, poolType, poolSize, { + workerType, + poolType, + poolSize, + { workerChoiceStrategy, workerChoiceStrategyOptions: { measurement }, enableTasksQueue - }) + } ]) } } else { - pools.push([ + benchmarkSettings.push([ `${poolType}|${workerType}|${workerChoiceStrategy}|tasks queue:${enableTasksQueue}`, - buildPoolifierPool(workerType, poolType, poolSize, { + workerType, + poolType, + poolSize, + { workerChoiceStrategy, enableTasksQueue - }) + } ]) } } @@ -55,25 +72,21 @@ const workerData = { taskSize: 100 } -const suite = new Benchmark.Suite('Poolifier') -for (const [name, pool] of pools) { - suite.add(name, async () => { +for (const [ + name, + workerType, + poolType, + poolSize, + poolOptions +] of benchmarkSettings) { + poolifierSuite.add(name, async () => { + const pool = buildPoolifierPool(workerType, poolType, poolSize, poolOptions) await runPoolifierTest(pool, { taskExecutions, workerData }) + await pool.destroy() }) } -suite - .on('cycle', event => { - console.info(event.target.toString()) - }) - .on('complete', function () { - console.info( - 'Fastest is ' + LIST_FORMATTER.format(this.filter('fastest').map('name')) - ) - // eslint-disable-next-line n/no-process-exit - process.exit() - }) - .run({ async: true, maxTime: 120 }) +poolifierSuite.run({ async: true }) diff --git a/docs/api.md b/docs/api.md index 07133926..5a5ffd49 100644 --- a/docs/api.md +++ b/docs/api.md @@ -6,6 +6,7 @@ - [`pool = new FixedThreadPool/FixedClusterPool(numberOfThreads/numberOfWorkers, filePath, opts)`](#pool--new-fixedthreadpoolfixedclusterpoolnumberofthreadsnumberofworkers-filepath-opts) - [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts) - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist) + - [`pool.start()`](#poolstart) - [`pool.destroy()`](#pooldestroy) - [`pool.listTaskFunctions()`](#poollisttaskfunctions) - [`PoolOptions`](#pooloptions) @@ -42,6 +43,10 @@ This method is available on both pool implementations and returns a promise with the task function execution response. +### `pool.start()` + +This method is available on both pool implementations and will start the minimum number of workers. + ### `pool.destroy()` This method is available on both pool implementations and will call the terminate method on each worker. @@ -58,6 +63,8 @@ An object with these properties: - `messageHandler` (optional) - A function that will listen for message event on each worker - `errorHandler` (optional) - A function that will listen for error event on each worker - `exitHandler` (optional) - A function that will listen for exit event on each worker +- `startWorkers` (optional) - Start the minimum number of workers at pool creation. + Default: `true` - `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool: - `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion @@ -95,8 +102,10 @@ An object with these properties: - `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer. - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer. + - `tasksStealing` (optional) - Tasks stealing enablement. + - `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement on back pressure. - Default: `{ size: (pool maximum size)^2, concurrency: 1 }` + Default: `{ size: (pool maximum size)^2, concurrency: 1, tasksStealing: true, tasksStealingOnBackPressure: true }` #### `ThreadPoolOptions extends PoolOptions` diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 6457b155..69df163b 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -92,14 +92,14 @@ export abstract class AbstractPool< */ protected readonly max?: number - /** - * Whether the pool is starting or not. - */ - private readonly starting: boolean /** * Whether the pool is started or not. */ private started: boolean + /** + * Whether the pool is starting or not. + */ + private starting: boolean /** * The start timestamp of the pool. */ @@ -145,10 +145,11 @@ export abstract class AbstractPool< this.setupHook() - this.starting = true - this.startPool() + this.started = false this.starting = false - this.started = true + if (this.opts.startWorkers === true) { + this.start() + } this.startTimestamp = performance.now() } @@ -212,6 +213,7 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { + this.opts.startWorkers = opts.startWorkers ?? true this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) @@ -314,11 +316,6 @@ export abstract class AbstractPool< `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero` ) } - if (tasksQueueOptions?.queueMaxSize != null) { - throw new Error( - 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead' - ) - } if ( tasksQueueOptions?.size != null && !Number.isSafeInteger(tasksQueueOptions?.size) @@ -334,24 +331,13 @@ export abstract class AbstractPool< } } - private startPool (): void { - while ( - this.workerNodes.reduce( - (accumulator, workerNode) => - !workerNode.info.dynamic ? accumulator + 1 : accumulator, - 0 - ) < this.numberOfWorkers - ) { - this.createAndSetupWorkerNode() - } - } - /** @inheritDoc */ public get info (): PoolInfo { return { version, type: this.type, worker: this.worker, + started: this.started, ready: this.ready, strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, minSize: this.minSize, @@ -675,7 +661,9 @@ export abstract class AbstractPool< return { ...{ size: Math.pow(this.maxSize, 2), - concurrency: 1 + concurrency: 1, + tasksStealing: true, + tasksStealingOnBackPressure: true }, ...tasksQueueOptions } @@ -751,7 +739,7 @@ export abstract class AbstractPool< ): Promise { return await new Promise((resolve, reject) => { if (!this.started) { - reject(new Error('Cannot execute a task on destroyed pool')) + reject(new Error('Cannot execute a task on not started pool')) return } if (name != null && typeof name !== 'string') { @@ -798,6 +786,22 @@ export abstract class AbstractPool< }) } + /** @inheritdoc */ + public start (): void { + this.starting = true + while ( + this.workerNodes.reduce( + (accumulator, workerNode) => + !workerNode.info.dynamic ? accumulator + 1 : accumulator, + 0 + ) < this.numberOfWorkers + ) { + this.createAndSetupWorkerNode() + } + this.starting = false + this.started = true + } + /** @inheritDoc */ public async destroy (): Promise { await Promise.all( @@ -1166,10 +1170,14 @@ export abstract class AbstractPool< // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { - this.workerNodes[workerNodeKey].onEmptyQueue = - this.taskStealingOnEmptyQueue.bind(this) - this.workerNodes[workerNodeKey].onBackPressure = - this.tasksStealingOnBackPressure.bind(this) + if (this.opts.tasksQueueOptions?.tasksStealing === true) { + this.workerNodes[workerNodeKey].onEmptyQueue = + this.taskStealingOnEmptyQueue.bind(this) + } + if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { + this.workerNodes[workerNodeKey].onBackPressure = + this.tasksStealingOnBackPressure.bind(this) + } } } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 7a103733..f76be0c0 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -63,6 +63,7 @@ export interface PoolInfo { readonly version: string readonly type: PoolType readonly worker: WorkerType + readonly started: boolean readonly ready: boolean readonly strategy: WorkerChoiceStrategy readonly minSize: number @@ -106,16 +107,24 @@ export interface TasksQueueOptions { * @defaultValue (pool maximum size)^2 */ readonly size?: number - /** - * @deprecated Use `size` instead. - */ - readonly queueMaxSize?: number /** * Maximum number of tasks that can be executed concurrently on a worker node. * * @defaultValue 1 */ readonly concurrency?: number + /** + * Whether to enable tasks stealing. + * + * @defaultValue true + */ + readonly tasksStealing?: boolean + /** + * Whether to enable tasks stealing on back pressure. + * + * @defaultValue true + */ + readonly tasksStealingOnBackPressure?: boolean } /** @@ -140,6 +149,12 @@ export interface PoolOptions { * A function that will listen for exit event on each worker. */ exitHandler?: ExitHandler + /** + * Whether to start the minimum number of workers at pool initialization. + * + * @defaultValue false + */ + startWorkers?: boolean /** * The worker choice strategy to use in this pool. * @@ -229,6 +244,10 @@ export interface IPool< name?: string, transferList?: TransferListItem[] ) => Promise + /** + * Starts the minimum number of workers in this pool. + */ + readonly start: () => void /** * Terminates all workers in this pool. */ diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index ca275ded..68c8cab2 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -46,6 +46,7 @@ implements IWorkerNode { /** @inheritdoc */ public onEmptyQueue?: WorkerNodeEventCallback private readonly tasksQueue: Deque> + private onBackPressureStarted: boolean private onEmptyQueueCount: number private readonly taskFunctionsUsage: Map @@ -65,6 +66,7 @@ implements IWorkerNode { } this.tasksQueueBackPressureSize = tasksQueueBackPressureSize this.tasksQueue = new Deque>() + this.onBackPressureStarted = false this.onEmptyQueueCount = 0 this.taskFunctionsUsage = new Map() } @@ -77,8 +79,14 @@ implements IWorkerNode { /** @inheritdoc */ public enqueueTask (task: Task): number { const tasksQueueSize = this.tasksQueue.push(task) - if (this.onBackPressure != null && this.hasBackPressure()) { + if ( + this.onBackPressure != null && + this.hasBackPressure() && + !this.onBackPressureStarted + ) { + this.onBackPressureStarted = true this.onBackPressure(this.info.id as number) + this.onBackPressureStarted = false } return tasksQueueSize } @@ -86,8 +94,14 @@ implements IWorkerNode { /** @inheritdoc */ public unshiftTask (task: Task): number { const tasksQueueSize = this.tasksQueue.unshift(task) - if (this.onBackPressure != null && this.hasBackPressure()) { + if ( + this.onBackPressure != null && + this.hasBackPressure() && + !this.onBackPressureStarted + ) { + this.onBackPressureStarted = true this.onBackPressure(this.info.id as number) + this.onBackPressureStarted = false } return tasksQueueSize } @@ -95,7 +109,11 @@ implements IWorkerNode { /** @inheritdoc */ public dequeueTask (): Task | undefined { const task = this.tasksQueue.shift() - if (this.onEmptyQueue != null && this.tasksQueue.size === 0) { + if ( + this.onEmptyQueue != null && + this.tasksQueue.size === 0 && + this.onEmptyQueueCount === 0 + ) { this.startOnEmptyQueue().catch(EMPTY_FUNCTION) } return task @@ -104,7 +122,11 @@ implements IWorkerNode { /** @inheritdoc */ public popTask (): Task | undefined { const task = this.tasksQueue.pop() - if (this.onEmptyQueue != null && this.tasksQueue.size === 0) { + if ( + this.onEmptyQueue != null && + this.tasksQueue.size === 0 && + this.onEmptyQueueCount === 0 + ) { this.startOnEmptyQueue().catch(EMPTY_FUNCTION) } return task @@ -169,8 +191,8 @@ implements IWorkerNode { this.onEmptyQueueCount = 0 return } - (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number) ++this.onEmptyQueueCount + this.onEmptyQueue?.(this.info.id as number) await sleep(exponentialDelay(this.onEmptyQueueCount)) await this.startOnEmptyQueue() } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index bedb1c6c..df109fe1 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -16,6 +16,7 @@ const { Deque } = require('../../../lib/deque') const { DEFAULT_TASK_NAME } = require('../../../lib/utils') const { version } = require('../../../package.json') const { waitPoolEvents } = require('../../test-utils') +const { WorkerNode } = require('../../../lib/pools/worker-node') describe('Abstract pool test suite', () => { const numberOfWorkers = 2 @@ -180,18 +181,18 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.js' ) expect(pool.emitter).toBeInstanceOf(EventEmitter) - expect(pool.opts.enableEvents).toBe(true) - expect(pool.opts.restartWorkerOnError).toBe(true) - expect(pool.opts.enableTasksQueue).toBe(false) - expect(pool.opts.tasksQueueOptions).toBeUndefined() - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } + expect(pool.opts).toStrictEqual({ + startWorkers: true, + enableEvents: true, + restartWorkerOnError: true, + enableTasksQueue: false, + workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + workerChoiceStrategyOptions: { + retries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ retries: 6, @@ -208,10 +209,6 @@ describe('Abstract pool test suite', () => { elu: { median: false } }) } - expect(pool.opts.messageHandler).toBeUndefined() - expect(pool.opts.errorHandler).toBeUndefined() - expect(pool.opts.onlineHandler).toBeUndefined() - expect(pool.opts.exitHandler).toBeUndefined() await pool.destroy() const testHandler = () => console.info('test handler executed') pool = new FixedThreadPool( @@ -234,22 +231,29 @@ describe('Abstract pool test suite', () => { } ) expect(pool.emitter).toBeUndefined() - expect(pool.opts.enableEvents).toBe(false) - expect(pool.opts.restartWorkerOnError).toBe(false) - expect(pool.opts.enableTasksQueue).toBe(true) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ - concurrency: 2, - size: 4 - }) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LEAST_USED - ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, - weights: { 0: 300, 1: 200 } + expect(pool.opts).toStrictEqual({ + startWorkers: true, + enableEvents: false, + restartWorkerOnError: false, + enableTasksQueue: true, + tasksQueueOptions: { + concurrency: 2, + size: 4, + tasksStealing: true, + tasksStealingOnBackPressure: true + }, + workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, + workerChoiceStrategyOptions: { + retries: 6, + runTime: { median: true }, + waitTime: { median: false }, + elu: { median: false }, + weights: { 0: 300, 1: 200 } + }, + onlineHandler: testHandler, + messageHandler: testHandler, + errorHandler: testHandler, + exitHandler: testHandler }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ retries: 6, @@ -268,10 +272,6 @@ describe('Abstract pool test suite', () => { weights: { 0: 300, 1: 200 } }) } - expect(pool.opts.messageHandler).toStrictEqual(testHandler) - expect(pool.opts.errorHandler).toStrictEqual(testHandler) - expect(pool.opts.onlineHandler).toStrictEqual(testHandler) - expect(pool.opts.exitHandler).toStrictEqual(testHandler) await pool.destroy() }) @@ -404,21 +404,6 @@ describe('Abstract pool test suite', () => { ).toThrowError( new TypeError('Invalid worker node tasks concurrency: must be an integer') ) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.js', - { - enableTasksQueue: true, - tasksQueueOptions: { queueMaxSize: 2 } - } - ) - ).toThrowError( - new Error( - 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead' - ) - ) expect( () => new FixedThreadPool( @@ -649,13 +634,17 @@ describe('Abstract pool test suite', () => { expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1, - size: 4 + size: 4, + tasksStealing: true, + tasksStealingOnBackPressure: true }) pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, - size: 4 + size: 4, + tasksStealing: true, + tasksStealingOnBackPressure: true }) pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) @@ -671,12 +660,16 @@ describe('Abstract pool test suite', () => { ) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1, - size: 4 + size: 4, + tasksStealing: true, + tasksStealingOnBackPressure: true }) pool.setTasksQueueOptions({ concurrency: 2 }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, - size: 4 + size: 4, + tasksStealing: true, + tasksStealingOnBackPressure: true }) expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions') @@ -696,11 +689,6 @@ describe('Abstract pool test suite', () => { expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError( new TypeError('Invalid worker node tasks concurrency: must be an integer') ) - expect(() => pool.setTasksQueueOptions({ queueMaxSize: 2 })).toThrowError( - new Error( - 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead' - ) - ) expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError( new RangeError( 'Invalid worker node tasks queue size: 0 is a negative integer or zero' @@ -726,6 +714,7 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.fixed, worker: WorkerTypes.thread, + started: true, ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: numberOfWorkers, @@ -747,6 +736,7 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.dynamic, worker: WorkerTypes.cluster, + started: true, ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: Math.floor(numberOfWorkers / 2), @@ -767,6 +757,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testWorker.js' ) for (const workerNode of pool.workerNodes) { + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.usage).toStrictEqual({ tasks: { executed: 0, @@ -801,7 +792,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testWorker.js' ) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksQueue).toBeDefined() + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.tasksQueue).toBeInstanceOf(Deque) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) @@ -813,7 +804,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.js' ) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksQueue).toBeDefined() + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.tasksQueue).toBeInstanceOf(Deque) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) @@ -827,6 +818,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testWorker.js' ) for (const workerNode of pool.workerNodes) { + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.info).toStrictEqual({ id: expect.any(Number), type: WorkerTypes.cluster, @@ -841,6 +833,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.js' ) for (const workerNode of pool.workerNodes) { + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.info).toStrictEqual({ id: expect.any(Number), type: WorkerTypes.thread, @@ -851,6 +844,30 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) + it('Verify that pool can be started after initialization', async () => { + const pool = new FixedClusterPool( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js', + { + startWorkers: false + } + ) + expect(pool.info.started).toBe(false) + expect(pool.info.ready).toBe(false) + expect(pool.workerNodes).toStrictEqual([]) + await expect(pool.execute()).rejects.toThrowError( + new Error('Cannot execute a task on not started pool') + ) + pool.start() + expect(pool.info.started).toBe(true) + expect(pool.info.ready).toBe(true) + expect(pool.workerNodes.length).toBe(numberOfWorkers) + for (const workerNode of pool.workerNodes) { + expect(workerNode).toBeInstanceOf(WorkerNode) + } + await pool.destroy() + }) + it('Verify that pool execute() arguments are checked', async () => { const pool = new FixedClusterPool( numberOfWorkers, @@ -869,8 +886,8 @@ describe('Abstract pool test suite', () => { "Task function 'unknown' not found" ) await pool.destroy() - await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError( - new Error('Cannot execute a task on destroyed pool') + await expect(pool.execute()).rejects.toThrowError( + new Error('Cannot execute a task on not started pool') ) }) @@ -1038,6 +1055,7 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.dynamic, worker: WorkerTypes.cluster, + started: true, ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), @@ -1075,7 +1093,8 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.fixed, worker: WorkerTypes.thread, - ready: expect.any(Boolean), + started: true, + ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), maxSize: expect.any(Number), @@ -1111,7 +1130,8 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.dynamic, worker: WorkerTypes.thread, - ready: expect.any(Boolean), + started: true, + ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), maxSize: expect.any(Number), @@ -1150,7 +1170,8 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.fixed, worker: WorkerTypes.thread, - ready: expect.any(Boolean), + started: true, + ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), maxSize: expect.any(Number), -- 2.34.1