From: Jérôme Benoit Date: Tue, 23 Feb 2021 09:38:00 +0000 (+0100) Subject: Cleanups on bechmarking and strategies code: (#227) X-Git-Tag: v2.0.0~21 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=ff5e76e152be8540cba8118bb4e2b9da314dfff5;p=poolifier.git Cleanups on bechmarking and strategies code: (#227) + Factor out common code + Renaming + Add benchmarking on strategies + Add microtime to improve benchmark time resolution Signed-off-by: Jérôme Benoit --- diff --git a/benchmarks/internal/bench.js b/benchmarks/internal/bench.js index 2ad7f22c..c3db7165 100644 --- a/benchmarks/internal/bench.js +++ b/benchmarks/internal/bench.js @@ -1,10 +1,16 @@ const Benchmark = require('benchmark') -const { dynamicClusterTest } = require('./cluster/dynamic') +const { + dynamicClusterTest, + dynamicClusterTestLessRecentlyUsed +} = require('./cluster/dynamic') const { fixedClusterTest } = require('./cluster/fixed') -const { dynamicThreadTest } = require('./thread/dynamic') +const { + dynamicThreadTest, + dynamicThreadTestLessRecentlyUsed +} = require('./thread/dynamic') const { fixedThreadTest } = require('./thread/fixed') -const suite = new Benchmark.Suite() +const suite = new Benchmark.Suite('poolifier') const LIST_FORMATTER = new Intl.ListFormat('en-US', { style: 'long', @@ -19,18 +25,24 @@ setTimeout(async () => { async function test () { // Add tests suite - .add('Pioardi:Static:ThreadPool', async function () { + .add('Poolifier:Static:ThreadPool', async function () { await fixedThreadTest() }) - .add('Pioardi:Dynamic:ThreadPool', async function () { + .add('Poolifier:Dynamic:ThreadPool', async function () { await dynamicThreadTest() }) - .add('Pioardi:Static:ClusterPool', async function () { + .add('Poolifier:Dynamic:ThreadPool:LessRecentlyUsed', async function () { + await dynamicThreadTestLessRecentlyUsed() + }) + .add('Poolifier:Static:ClusterPool', async function () { await fixedClusterTest() }) - .add('Pioardi:Dynamic:ClusterPool', async function () { + .add('Poolifier:Dynamic:ClusterPool', async function () { await dynamicClusterTest() }) + .add('Poolifier:Dynamic:ClusterPool:LessRecentlyUsed', async function () { + await dynamicClusterTestLessRecentlyUsed() + }) // Add listeners .on('cycle', function (event) { console.log(event.target.toString()) @@ -43,5 +55,5 @@ async function test () { // eslint-disable-next-line no-process-exit process.exit() }) - .run() + .run({ async: true, queued: true }) } diff --git a/benchmarks/internal/benchmark-utils.js b/benchmarks/internal/benchmark-utils.js new file mode 100644 index 00000000..f84227d3 --- /dev/null +++ b/benchmarks/internal/benchmark-utils.js @@ -0,0 +1,19 @@ +async function runTest (pool, { tasks, workerData }) { + return new Promise((resolve, reject) => { + let executions = 0 + for (let i = 0; i <= tasks; i++) { + pool + .execute(workerData) + .then(res => { + executions++ + if (executions === tasks) { + return resolve('FINISH') + } + return null + }) + .catch(err => console.error(err)) + } + }) +} + +module.exports = { runTest } diff --git a/benchmarks/internal/choose-worker.js b/benchmarks/internal/choose-worker.js index fe98eff1..92eadfe4 100644 --- a/benchmarks/internal/choose-worker.js +++ b/benchmarks/internal/choose-worker.js @@ -11,7 +11,7 @@ const workers = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] let nextWorkerIndex = 0 -function chooseWorkerTernary () { +function chooseWorkerTernaryOffByOne () { nextWorkerIndex = workers.length - 1 === nextWorkerIndex ? 0 : nextWorkerIndex + 1 return workers[nextWorkerIndex] @@ -40,9 +40,9 @@ function chooseWorkerIncrementModulo () { } suite - .add('Ternary', function () { + .add('Ternary off by one', function () { nextWorkerIndex = 0 - chooseWorkerTernary() + chooseWorkerTernaryOffByOne() }) .add('Ternary with negation', function () { nextWorkerIndex = 0 diff --git a/benchmarks/internal/cluster/dynamic.js b/benchmarks/internal/cluster/dynamic.js index 7e607c87..7ad9dd28 100644 --- a/benchmarks/internal/cluster/dynamic.js +++ b/benchmarks/internal/cluster/dynamic.js @@ -1,4 +1,8 @@ -const { DynamicClusterPool } = require('../../../lib/index') +const { + DynamicClusterPool, + WorkerChoiceStrategies +} = require('../../../lib/index') +const { runTest } = require('../benchmark-utils') const size = 30 @@ -8,24 +12,26 @@ const dynamicPool = new DynamicClusterPool( './benchmarks/internal/cluster/worker.js' ) +const dynamicPoolLessRecentlyUsed = new DynamicClusterPool( + size / 2, + size * 3, + './benchmarks/internal/cluster/worker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED } +) + async function dynamicClusterTest ( { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } } ) { - return new Promise((resolve, reject) => { - let executions = 0 - for (let i = 0; i <= tasks; i++) { - dynamicPool - .execute(workerData) - .then(res => { - executions++ - if (executions === tasks) { - return resolve('FINISH') - } - return null - }) - .catch(err => console.error(err)) - } - }) + return runTest(dynamicPool, { tasks, workerData }) } -module.exports = { dynamicClusterTest } +async function dynamicClusterTestLessRecentlyUsed ( + { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } } +) { + return runTest(dynamicPoolLessRecentlyUsed, { tasks, workerData }) +} + +module.exports = { + dynamicClusterTest, + dynamicClusterTestLessRecentlyUsed +} diff --git a/benchmarks/internal/cluster/fixed.js b/benchmarks/internal/cluster/fixed.js index 1fd8d679..60a5b938 100644 --- a/benchmarks/internal/cluster/fixed.js +++ b/benchmarks/internal/cluster/fixed.js @@ -1,4 +1,5 @@ const { FixedClusterPool } = require('../../../lib/index') +const { runTest } = require('../benchmark-utils') const size = 30 @@ -10,23 +11,7 @@ const fixedPool = new FixedClusterPool( async function fixedClusterTest ( { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } } ) { - return new Promise((resolve, reject) => { - let executions = 0 - for (let i = 0; i <= tasks; i++) { - fixedPool - .execute(workerData) - .then(res => { - executions++ - if (executions === tasks) { - return resolve('FINISH') - } - return null - }) - .catch(err => { - console.error(err) - }) - } - }) + return runTest(fixedPool, { tasks, workerData }) } module.exports = { fixedClusterTest } diff --git a/benchmarks/internal/cluster/worker.js b/benchmarks/internal/cluster/worker.js index c7211b7b..1692c284 100644 --- a/benchmarks/internal/cluster/worker.js +++ b/benchmarks/internal/cluster/worker.js @@ -8,7 +8,7 @@ function yourFunction (data) { } JSON.stringify(o) } - // console.log('This is the main thread ' + isMainThread) + // console.log('This is the main thread ' + isMaster) return { ok: 1 } } diff --git a/benchmarks/internal/thread/dynamic.js b/benchmarks/internal/thread/dynamic.js index f3f0e049..bc9b3d81 100644 --- a/benchmarks/internal/thread/dynamic.js +++ b/benchmarks/internal/thread/dynamic.js @@ -1,27 +1,37 @@ -const { DynamicThreadPool } = require('../../../lib/index') +const { + DynamicThreadPool, + WorkerChoiceStrategies +} = require('../../../lib/index') +const { runTest } = require('../benchmark-utils') const size = 30 -const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './worker.js') +const dynamicPool = new DynamicThreadPool( + size / 2, + size * 3, + './benchmarks/internal/thread/worker.js' +) + +const dynamicPoolLessRecentlyUsed = new DynamicThreadPool( + size / 2, + size * 3, + './benchmarks/internal/thread/worker.js', + { workerChoiceStrategy: DynamicThreadPool.LESS_RECENTLY_USED } +) async function dynamicThreadTest ( { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } } ) { - return new Promise((resolve, reject) => { - let executions = 0 - for (let i = 0; i <= tasks; i++) { - dynamicPool - .execute(workerData) - .then(res => { - executions++ - if (executions === tasks) { - return resolve('FINISH') - } - return null - }) - .catch(err => console.error(err)) - } - }) + return runTest(dynamicPool, { tasks, workerData }) +} + +async function dynamicThreadTestLessRecentlyUsed ( + { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } } +) { + return runTest(dynamicPoolLessRecentlyUsed, { tasks, workerData }) } -module.exports = { dynamicThreadTest } +module.exports = { + dynamicThreadTest, + dynamicThreadTestLessRecentlyUsed +} diff --git a/benchmarks/internal/thread/fixed.js b/benchmarks/internal/thread/fixed.js index 8168a6dc..7ad60eeb 100644 --- a/benchmarks/internal/thread/fixed.js +++ b/benchmarks/internal/thread/fixed.js @@ -1,29 +1,17 @@ const { FixedThreadPool } = require('../../../lib/index') +const { runTest } = require('../benchmark-utils') const size = 30 -const fixedPool = new FixedThreadPool(size, './worker.js') +const fixedPool = new FixedThreadPool( + size, + './benchmarks/internal/thread/worker.js' +) async function fixedThreadTest ( { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } } ) { - return new Promise((resolve, reject) => { - let executions = 0 - for (let i = 0; i <= tasks; i++) { - fixedPool - .execute(workerData) - .then(res => { - executions++ - if (executions === tasks) { - return resolve('FINISH') - } - return null - }) - .catch(err => { - console.error(err) - }) - } - }) + return runTest(fixedPool, { tasks, workerData }) } module.exports = { fixedThreadTest } diff --git a/package-lock.json b/package-lock.json index 3596c88e..cba84058 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2917,6 +2917,16 @@ "picomatch": "^2.0.5" } }, + "microtime": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/microtime/-/microtime-3.0.0.tgz", + "integrity": "sha512-SirJr7ZL4ow2iWcb54bekS4aWyBQNVcEDBiwAz9D/sTgY59A+uE8UJU15cp5wyZmPBwg/3zf8lyCJ5NUe1nVlQ==", + "dev": true, + "requires": { + "node-addon-api": "^1.2.0", + "node-gyp-build": "^3.8.0" + } + }, "mimic-fn": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/mimic-fn/-/mimic-fn-3.1.0.tgz", @@ -3100,6 +3110,18 @@ "integrity": "sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw==", "dev": true }, + "node-addon-api": { + "version": "1.7.2", + "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-1.7.2.tgz", + "integrity": "sha512-ibPK3iA+vaY1eEjESkQkM0BbCqFOaZMiXRTtdB0u7b4djtY6JnsjvPdUHVMg6xQt3B8fpTTWHI9A+ADjM9frzg==", + "dev": true + }, + "node-gyp-build": { + "version": "3.9.0", + "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-3.9.0.tgz", + "integrity": "sha512-zLcTg6P4AbcHPq465ZMFNXx7XpKKJh+7kkN699NiQWisR2uWYOWNWqRHAmbnmKiL4e9aLSlmy5U7rEMUXV59+A==", + "dev": true + }, "node-preload": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/node-preload/-/node-preload-0.2.1.tgz", diff --git a/package.json b/package.json index ad26b256..0a08c94a 100644 --- a/package.json +++ b/package.json @@ -74,6 +74,7 @@ "eslint-plugin-promise": "^4.3.1", "eslint-plugin-spellcheck": "0.0.17", "expect": "^26.6.2", + "microtime": "^3.0.0", "mocha": "^8.3.0", "mocha-lcov-reporter": "^1.3.0", "nyc": "^15.1.0", diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 9d7e4f0c..7024f099 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -187,13 +187,13 @@ export abstract class AbstractPool< throw new Error( 'Cannot instantiate a pool with a negative number of workers' ) - } else if (!this.isDynamic() && numberOfWorkers === 0) { + } else if (!this.dynamic && numberOfWorkers === 0) { throw new Error('Cannot instantiate a fixed pool with no worker') } } /** @inheritdoc */ - public isDynamic (): boolean { + public get dynamic (): boolean { return false } diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 4a5f720a..f0c3c0e4 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -35,7 +35,7 @@ export class DynamicClusterPool< } /** @inheritdoc */ - public isDynamic (): boolean { + public get dynamic (): boolean { return true } } diff --git a/src/pools/pool-internal.ts b/src/pools/pool-internal.ts index ddee27ea..d641fcf9 100644 --- a/src/pools/pool-internal.ts +++ b/src/pools/pool-internal.ts @@ -42,17 +42,17 @@ export interface IPoolInternal< */ readonly emitter: PoolEmitter - /** - * Maximum number of workers that can be created by this pool. - */ - readonly max?: number - /** * Whether the pool is dynamic or not. * * If it is dynamic, it provides the `max` property. */ - isDynamic(): boolean + readonly dynamic: boolean + + /** + * Maximum number of workers that can be created by this pool. + */ + readonly max?: number /** * Creates a new worker for this pool and sets it up completely. diff --git a/src/pools/selection-strategies.ts b/src/pools/selection-strategies.ts index 5c40a71b..f86442b0 100644 --- a/src/pools/selection-strategies.ts +++ b/src/pools/selection-strategies.ts @@ -90,11 +90,12 @@ class LessRecentlyUsedWorkerChoiceStrategy< /** @inheritdoc */ public choose (): Worker { + const isPoolDynamic = this.pool.dynamic let minNumberOfTasks = Infinity // A worker is always found because it picks the one with fewer tasks let lessRecentlyUsedWorker!: Worker for (const [worker, numberOfTasks] of this.pool.tasks) { - if (numberOfTasks === 0) { + if (!isPoolDynamic && numberOfTasks === 0) { return worker } else if (numberOfTasks < minNumberOfTasks) { minNumberOfTasks = numberOfTasks @@ -105,29 +106,6 @@ class LessRecentlyUsedWorkerChoiceStrategy< } } -/** - * Get the worker choice strategy instance. - * - * @param pool The pool instance. - * @param workerChoiceStrategy The worker choice strategy. - * @returns The worker choice strategy instance. - */ -function getWorkerChoiceStrategy ( - pool: IPoolInternal, - workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN -): IWorkerChoiceStrategy { - switch (workerChoiceStrategy) { - case WorkerChoiceStrategies.ROUND_ROBIN: - return new RoundRobinWorkerChoiceStrategy(pool) - case WorkerChoiceStrategies.LESS_RECENTLY_USED: - return new LessRecentlyUsedWorkerChoiceStrategy(pool) - default: - throw new Error( - `Worker choice strategy '${workerChoiceStrategy}' not found` - ) - } -} - /** * Dynamically choose a worker. * @@ -149,34 +127,17 @@ class DynamicPoolWorkerChoiceStrategy private readonly pool: IPoolInternal, workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN ) { - this.workerChoiceStrategy = getWorkerChoiceStrategy( + this.workerChoiceStrategy = SelectionStrategiesUtils.getWorkerChoiceStrategy( this.pool, workerChoiceStrategy ) } - /** - * Find a free worker based on number of tasks the worker has applied. - * - * If a worker was found that has `0` tasks, it is detected as free and will be returned. - * - * If no free worker was found, `null` will be returned. - * - * @returns A free worker if there was one, otherwise `null`. - */ - private findFreeWorkerBasedOnTasks (): Worker | null { - for (const [worker, numberOfTasks] of this.pool.tasks) { - if (numberOfTasks === 0) { - // A worker is free, use it - return worker - } - } - return null - } - /** @inheritdoc */ public choose (): Worker { - const freeWorker = this.findFreeWorkerBasedOnTasks() + const freeWorker = SelectionStrategiesUtils.findFreeWorkerBasedOnTasks( + this.pool + ) if (freeWorker) { return freeWorker } @@ -239,13 +200,16 @@ export class WorkerChoiceStrategyContext< private getPoolWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN ): IWorkerChoiceStrategy { - if (this.pool.isDynamic()) { + if (this.pool.dynamic) { return new DynamicPoolWorkerChoiceStrategy( this.pool, workerChoiceStrategy ) } - return getWorkerChoiceStrategy(this.pool, workerChoiceStrategy) + return SelectionStrategiesUtils.getWorkerChoiceStrategy( + this.pool, + workerChoiceStrategy + ) } /** @@ -270,3 +234,59 @@ export class WorkerChoiceStrategyContext< return this.workerChoiceStrategy.choose() } } + +/** + * Worker selection strategies helpers. + */ +class SelectionStrategiesUtils { + /** + * Find a free worker based on number of tasks the worker has applied. + * + * If a worker was found that has `0` tasks, it is detected as free and will be returned. + * + * If no free worker was found, `null` will be returned. + * + * @param pool The pool instance. + * @returns A free worker if there was one, otherwise `null`. + */ + public static findFreeWorkerBasedOnTasks< + Worker extends IWorker, + Data, + Response + > (pool: IPoolInternal): Worker | null { + for (const [worker, numberOfTasks] of pool.tasks) { + if (numberOfTasks === 0) { + // A worker is free, use it + return worker + } + } + return null + } + + /** + * Get the worker choice strategy instance. + * + * @param pool The pool instance. + * @param workerChoiceStrategy The worker choice strategy. + * @returns The worker choice strategy instance. + */ + public static getWorkerChoiceStrategy< + Worker extends IWorker, + Data, + Response + > ( + pool: IPoolInternal, + workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN + ): IWorkerChoiceStrategy { + switch (workerChoiceStrategy) { + case WorkerChoiceStrategies.ROUND_ROBIN: + return new RoundRobinWorkerChoiceStrategy(pool) + case WorkerChoiceStrategies.LESS_RECENTLY_USED: + return new LessRecentlyUsedWorkerChoiceStrategy(pool) + default: + throw new Error( + `Worker choice strategy '${workerChoiceStrategy}' not found` + ) + } + } +} diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 04125854..b4fd46c1 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -36,7 +36,7 @@ export class DynamicThreadPool< } /** @inheritdoc */ - public isDynamic (): boolean { + public get dynamic (): boolean { return true } } diff --git a/tests/pools/selection-strategies.test.js b/tests/pools/selection-strategies.test.js index 604dda3d..d631afc6 100644 --- a/tests/pools/selection-strategies.test.js +++ b/tests/pools/selection-strategies.test.js @@ -39,7 +39,7 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify LESS_RECENTLY_USED strategy can be run in a pool', async () => { + it('Verify LESS_RECENTLY_USED strategy can be run in a fixed pool', async () => { const max = 3 const pool = new FixedThreadPool( max, @@ -57,6 +57,26 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) + it('Verify LESS_RECENTLY_USED strategy can be run in a dynamic pool', async () => { + const min = 0 + const max = 3 + const pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED } + ) + // TODO: Create a better test to cover `LessRecentlyUsedWorkerChoiceStrategy#choose` + const promises = [] + for (let i = 0; i < max * 2; i++) { + promises.push(pool.execute({ test: 'test' })) + } + await Promise.all(promises) + + // We need to clean up the resources after our test + await pool.destroy() + }) + it('Verify unknown strategies throw error', () => { const min = 1 const max = 3