const Benchmark = require('benchmark')
-const { dynamicClusterTest } = require('./cluster/dynamic')
+const {
+ dynamicClusterTest,
+ dynamicClusterTestLessRecentlyUsed
+} = require('./cluster/dynamic')
const { fixedClusterTest } = require('./cluster/fixed')
-const { dynamicThreadTest } = require('./thread/dynamic')
+const {
+ dynamicThreadTest,
+ dynamicThreadTestLessRecentlyUsed
+} = require('./thread/dynamic')
const { fixedThreadTest } = require('./thread/fixed')
-const suite = new Benchmark.Suite()
+const suite = new Benchmark.Suite('poolifier')
const LIST_FORMATTER = new Intl.ListFormat('en-US', {
style: 'long',
async function test () {
// Add tests
suite
- .add('Pioardi:Static:ThreadPool', async function () {
+ .add('Poolifier:Static:ThreadPool', async function () {
await fixedThreadTest()
})
- .add('Pioardi:Dynamic:ThreadPool', async function () {
+ .add('Poolifier:Dynamic:ThreadPool', async function () {
await dynamicThreadTest()
})
- .add('Pioardi:Static:ClusterPool', async function () {
+ .add('Poolifier:Dynamic:ThreadPool:LessRecentlyUsed', async function () {
+ await dynamicThreadTestLessRecentlyUsed()
+ })
+ .add('Poolifier:Static:ClusterPool', async function () {
await fixedClusterTest()
})
- .add('Pioardi:Dynamic:ClusterPool', async function () {
+ .add('Poolifier:Dynamic:ClusterPool', async function () {
await dynamicClusterTest()
})
+ .add('Poolifier:Dynamic:ClusterPool:LessRecentlyUsed', async function () {
+ await dynamicClusterTestLessRecentlyUsed()
+ })
// Add listeners
.on('cycle', function (event) {
console.log(event.target.toString())
// eslint-disable-next-line no-process-exit
process.exit()
})
- .run()
+ .run({ async: true, queued: true })
}
--- /dev/null
+async function runTest (pool, { tasks, workerData }) {
+ return new Promise((resolve, reject) => {
+ let executions = 0
+ for (let i = 0; i <= tasks; i++) {
+ pool
+ .execute(workerData)
+ .then(res => {
+ executions++
+ if (executions === tasks) {
+ return resolve('FINISH')
+ }
+ return null
+ })
+ .catch(err => console.error(err))
+ }
+ })
+}
+
+module.exports = { runTest }
let nextWorkerIndex = 0
-function chooseWorkerTernary () {
+function chooseWorkerTernaryOffByOne () {
nextWorkerIndex =
workers.length - 1 === nextWorkerIndex ? 0 : nextWorkerIndex + 1
return workers[nextWorkerIndex]
}
suite
- .add('Ternary', function () {
+ .add('Ternary off by one', function () {
nextWorkerIndex = 0
- chooseWorkerTernary()
+ chooseWorkerTernaryOffByOne()
})
.add('Ternary with negation', function () {
nextWorkerIndex = 0
-const { DynamicClusterPool } = require('../../../lib/index')
+const {
+ DynamicClusterPool,
+ WorkerChoiceStrategies
+} = require('../../../lib/index')
+const { runTest } = require('../benchmark-utils')
const size = 30
'./benchmarks/internal/cluster/worker.js'
)
+const dynamicPoolLessRecentlyUsed = new DynamicClusterPool(
+ size / 2,
+ size * 3,
+ './benchmarks/internal/cluster/worker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED }
+)
+
async function dynamicClusterTest (
{ tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
) {
- return new Promise((resolve, reject) => {
- let executions = 0
- for (let i = 0; i <= tasks; i++) {
- dynamicPool
- .execute(workerData)
- .then(res => {
- executions++
- if (executions === tasks) {
- return resolve('FINISH')
- }
- return null
- })
- .catch(err => console.error(err))
- }
- })
+ return runTest(dynamicPool, { tasks, workerData })
}
-module.exports = { dynamicClusterTest }
+async function dynamicClusterTestLessRecentlyUsed (
+ { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
+) {
+ return runTest(dynamicPoolLessRecentlyUsed, { tasks, workerData })
+}
+
+module.exports = {
+ dynamicClusterTest,
+ dynamicClusterTestLessRecentlyUsed
+}
const { FixedClusterPool } = require('../../../lib/index')
+const { runTest } = require('../benchmark-utils')
const size = 30
async function fixedClusterTest (
{ tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
) {
- return new Promise((resolve, reject) => {
- let executions = 0
- for (let i = 0; i <= tasks; i++) {
- fixedPool
- .execute(workerData)
- .then(res => {
- executions++
- if (executions === tasks) {
- return resolve('FINISH')
- }
- return null
- })
- .catch(err => {
- console.error(err)
- })
- }
- })
+ return runTest(fixedPool, { tasks, workerData })
}
module.exports = { fixedClusterTest }
}
JSON.stringify(o)
}
- // console.log('This is the main thread ' + isMainThread)
+ // console.log('This is the main thread ' + isMaster)
return { ok: 1 }
}
-const { DynamicThreadPool } = require('../../../lib/index')
+const {
+ DynamicThreadPool,
+ WorkerChoiceStrategies
+} = require('../../../lib/index')
+const { runTest } = require('../benchmark-utils')
const size = 30
-const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './worker.js')
+const dynamicPool = new DynamicThreadPool(
+ size / 2,
+ size * 3,
+ './benchmarks/internal/thread/worker.js'
+)
+
+const dynamicPoolLessRecentlyUsed = new DynamicThreadPool(
+ size / 2,
+ size * 3,
+ './benchmarks/internal/thread/worker.js',
+ { workerChoiceStrategy: DynamicThreadPool.LESS_RECENTLY_USED }
+)
async function dynamicThreadTest (
{ tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
) {
- return new Promise((resolve, reject) => {
- let executions = 0
- for (let i = 0; i <= tasks; i++) {
- dynamicPool
- .execute(workerData)
- .then(res => {
- executions++
- if (executions === tasks) {
- return resolve('FINISH')
- }
- return null
- })
- .catch(err => console.error(err))
- }
- })
+ return runTest(dynamicPool, { tasks, workerData })
+}
+
+async function dynamicThreadTestLessRecentlyUsed (
+ { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
+) {
+ return runTest(dynamicPoolLessRecentlyUsed, { tasks, workerData })
}
-module.exports = { dynamicThreadTest }
+module.exports = {
+ dynamicThreadTest,
+ dynamicThreadTestLessRecentlyUsed
+}
const { FixedThreadPool } = require('../../../lib/index')
+const { runTest } = require('../benchmark-utils')
const size = 30
-const fixedPool = new FixedThreadPool(size, './worker.js')
+const fixedPool = new FixedThreadPool(
+ size,
+ './benchmarks/internal/thread/worker.js'
+)
async function fixedThreadTest (
{ tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
) {
- return new Promise((resolve, reject) => {
- let executions = 0
- for (let i = 0; i <= tasks; i++) {
- fixedPool
- .execute(workerData)
- .then(res => {
- executions++
- if (executions === tasks) {
- return resolve('FINISH')
- }
- return null
- })
- .catch(err => {
- console.error(err)
- })
- }
- })
+ return runTest(fixedPool, { tasks, workerData })
}
module.exports = { fixedThreadTest }
"picomatch": "^2.0.5"
}
},
+ "microtime": {
+ "version": "3.0.0",
+ "resolved": "https://registry.npmjs.org/microtime/-/microtime-3.0.0.tgz",
+ "integrity": "sha512-SirJr7ZL4ow2iWcb54bekS4aWyBQNVcEDBiwAz9D/sTgY59A+uE8UJU15cp5wyZmPBwg/3zf8lyCJ5NUe1nVlQ==",
+ "dev": true,
+ "requires": {
+ "node-addon-api": "^1.2.0",
+ "node-gyp-build": "^3.8.0"
+ }
+ },
"mimic-fn": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/mimic-fn/-/mimic-fn-3.1.0.tgz",
"integrity": "sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw==",
"dev": true
},
+ "node-addon-api": {
+ "version": "1.7.2",
+ "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-1.7.2.tgz",
+ "integrity": "sha512-ibPK3iA+vaY1eEjESkQkM0BbCqFOaZMiXRTtdB0u7b4djtY6JnsjvPdUHVMg6xQt3B8fpTTWHI9A+ADjM9frzg==",
+ "dev": true
+ },
+ "node-gyp-build": {
+ "version": "3.9.0",
+ "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-3.9.0.tgz",
+ "integrity": "sha512-zLcTg6P4AbcHPq465ZMFNXx7XpKKJh+7kkN699NiQWisR2uWYOWNWqRHAmbnmKiL4e9aLSlmy5U7rEMUXV59+A==",
+ "dev": true
+ },
"node-preload": {
"version": "0.2.1",
"resolved": "https://registry.npmjs.org/node-preload/-/node-preload-0.2.1.tgz",
"eslint-plugin-promise": "^4.3.1",
"eslint-plugin-spellcheck": "0.0.17",
"expect": "^26.6.2",
+ "microtime": "^3.0.0",
"mocha": "^8.3.0",
"mocha-lcov-reporter": "^1.3.0",
"nyc": "^15.1.0",
throw new Error(
'Cannot instantiate a pool with a negative number of workers'
)
- } else if (!this.isDynamic() && numberOfWorkers === 0) {
+ } else if (!this.dynamic && numberOfWorkers === 0) {
throw new Error('Cannot instantiate a fixed pool with no worker')
}
}
/** @inheritdoc */
- public isDynamic (): boolean {
+ public get dynamic (): boolean {
return false
}
}
/** @inheritdoc */
- public isDynamic (): boolean {
+ public get dynamic (): boolean {
return true
}
}
*/
readonly emitter: PoolEmitter
- /**
- * Maximum number of workers that can be created by this pool.
- */
- readonly max?: number
-
/**
* Whether the pool is dynamic or not.
*
* If it is dynamic, it provides the `max` property.
*/
- isDynamic(): boolean
+ readonly dynamic: boolean
+
+ /**
+ * Maximum number of workers that can be created by this pool.
+ */
+ readonly max?: number
/**
* Creates a new worker for this pool and sets it up completely.
/** @inheritdoc */
public choose (): Worker {
+ const isPoolDynamic = this.pool.dynamic
let minNumberOfTasks = Infinity
// A worker is always found because it picks the one with fewer tasks
let lessRecentlyUsedWorker!: Worker
for (const [worker, numberOfTasks] of this.pool.tasks) {
- if (numberOfTasks === 0) {
+ if (!isPoolDynamic && numberOfTasks === 0) {
return worker
} else if (numberOfTasks < minNumberOfTasks) {
minNumberOfTasks = numberOfTasks
}
}
-/**
- * Get the worker choice strategy instance.
- *
- * @param pool The pool instance.
- * @param workerChoiceStrategy The worker choice strategy.
- * @returns The worker choice strategy instance.
- */
-function getWorkerChoiceStrategy<Worker extends IWorker, Data, Response> (
- pool: IPoolInternal<Worker, Data, Response>,
- workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
-): IWorkerChoiceStrategy<Worker> {
- switch (workerChoiceStrategy) {
- case WorkerChoiceStrategies.ROUND_ROBIN:
- return new RoundRobinWorkerChoiceStrategy(pool)
- case WorkerChoiceStrategies.LESS_RECENTLY_USED:
- return new LessRecentlyUsedWorkerChoiceStrategy(pool)
- default:
- throw new Error(
- `Worker choice strategy '${workerChoiceStrategy}' not found`
- )
- }
-}
-
/**
* Dynamically choose a worker.
*
private readonly pool: IPoolInternal<Worker, Data, Response>,
workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
) {
- this.workerChoiceStrategy = getWorkerChoiceStrategy(
+ this.workerChoiceStrategy = SelectionStrategiesUtils.getWorkerChoiceStrategy(
this.pool,
workerChoiceStrategy
)
}
- /**
- * Find a free worker based on number of tasks the worker has applied.
- *
- * If a worker was found that has `0` tasks, it is detected as free and will be returned.
- *
- * If no free worker was found, `null` will be returned.
- *
- * @returns A free worker if there was one, otherwise `null`.
- */
- private findFreeWorkerBasedOnTasks (): Worker | null {
- for (const [worker, numberOfTasks] of this.pool.tasks) {
- if (numberOfTasks === 0) {
- // A worker is free, use it
- return worker
- }
- }
- return null
- }
-
/** @inheritdoc */
public choose (): Worker {
- const freeWorker = this.findFreeWorkerBasedOnTasks()
+ const freeWorker = SelectionStrategiesUtils.findFreeWorkerBasedOnTasks(
+ this.pool
+ )
if (freeWorker) {
return freeWorker
}
private getPoolWorkerChoiceStrategy (
workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
): IWorkerChoiceStrategy<Worker> {
- if (this.pool.isDynamic()) {
+ if (this.pool.dynamic) {
return new DynamicPoolWorkerChoiceStrategy(
this.pool,
workerChoiceStrategy
)
}
- return getWorkerChoiceStrategy(this.pool, workerChoiceStrategy)
+ return SelectionStrategiesUtils.getWorkerChoiceStrategy(
+ this.pool,
+ workerChoiceStrategy
+ )
}
/**
return this.workerChoiceStrategy.choose()
}
}
+
+/**
+ * Worker selection strategies helpers.
+ */
+class SelectionStrategiesUtils {
+ /**
+ * Find a free worker based on number of tasks the worker has applied.
+ *
+ * If a worker was found that has `0` tasks, it is detected as free and will be returned.
+ *
+ * If no free worker was found, `null` will be returned.
+ *
+ * @param pool The pool instance.
+ * @returns A free worker if there was one, otherwise `null`.
+ */
+ public static findFreeWorkerBasedOnTasks<
+ Worker extends IWorker,
+ Data,
+ Response
+ > (pool: IPoolInternal<Worker, Data, Response>): Worker | null {
+ for (const [worker, numberOfTasks] of pool.tasks) {
+ if (numberOfTasks === 0) {
+ // A worker is free, use it
+ return worker
+ }
+ }
+ return null
+ }
+
+ /**
+ * Get the worker choice strategy instance.
+ *
+ * @param pool The pool instance.
+ * @param workerChoiceStrategy The worker choice strategy.
+ * @returns The worker choice strategy instance.
+ */
+ public static getWorkerChoiceStrategy<
+ Worker extends IWorker,
+ Data,
+ Response
+ > (
+ pool: IPoolInternal<Worker, Data, Response>,
+ workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+ ): IWorkerChoiceStrategy<Worker> {
+ switch (workerChoiceStrategy) {
+ case WorkerChoiceStrategies.ROUND_ROBIN:
+ return new RoundRobinWorkerChoiceStrategy(pool)
+ case WorkerChoiceStrategies.LESS_RECENTLY_USED:
+ return new LessRecentlyUsedWorkerChoiceStrategy(pool)
+ default:
+ throw new Error(
+ `Worker choice strategy '${workerChoiceStrategy}' not found`
+ )
+ }
+ }
+}
}
/** @inheritdoc */
- public isDynamic (): boolean {
+ public get dynamic (): boolean {
return true
}
}
await pool.destroy()
})
- it('Verify LESS_RECENTLY_USED strategy can be run in a pool', async () => {
+ it('Verify LESS_RECENTLY_USED strategy can be run in a fixed pool', async () => {
const max = 3
const pool = new FixedThreadPool(
max,
await pool.destroy()
})
+ it('Verify LESS_RECENTLY_USED strategy can be run in a dynamic pool', async () => {
+ const min = 0
+ const max = 3
+ const pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED }
+ )
+ // TODO: Create a better test to cover `LessRecentlyUsedWorkerChoiceStrategy#choose`
+ const promises = []
+ for (let i = 0; i < max * 2; i++) {
+ promises.push(pool.execute({ test: 'test' }))
+ }
+ await Promise.all(promises)
+
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
it('Verify unknown strategies throw error', () => {
const min = 1
const max = 3