skipWords: [
'christopher',
'comparator',
+ 'cpu',
+ 'cpus',
'ecma',
'enum',
'fibonacci',
'inheritDoc',
'jsdoc',
+ 'os',
'poolifier',
'readonly',
'serializable',
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [2.3.0] - 2022-dd-mm
+
+### Added
+
+- Pool worker choice strategies:
+ - `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN` strategy based on weighted round robin scheduling algorithm using tasks execution time for now.
+ - `WorkerChoiceStrategies.FAIR_SHARE` strategy based on fair share scheduling algorithm using tasks execution time for now.
+
## [2.2.2] - 2022-09-10
### Fixed
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in this pool in a round robbin fashion
- `WorkerChoiceStrategies.LESS_RECENTLY_USED`: Submit tasks to the less recently used worker in the pool
+ - `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN` Submit tasks to worker using a weighted round robin scheduling algorithm based on tasks execution time for now
+ - `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker using a fair share tasks scheduling algorithm
Default: `WorkerChoiceStrategies.ROUND_ROBIN`
const Benchmark = require('benchmark')
const {
dynamicClusterTest,
+ dynamicClusterTestFairShare,
dynamicClusterTestLessRecentlyUsed
} = require('./cluster/dynamic')
const {
fixedClusterTest,
+ fixedClusterTestFairShare,
fixedClusterTestLessRecentlyUsed
} = require('./cluster/fixed')
const {
dynamicThreadTest,
+ dynamicThreadTestFairShare,
dynamicThreadTestLessRecentlyUsed
} = require('./thread/dynamic')
const {
fixedThreadTest,
+ fixedThreadTestFairShare,
fixedThreadTestLessRecentlyUsed
} = require('./thread/fixed')
const { LIST_FORMATTER } = require('./benchmark-utils')
.add('Poolifier:Fixed:ThreadPool:LessRecentlyUsed', async function () {
await fixedThreadTestLessRecentlyUsed()
})
+ .add('Poolifier:Fixed:ThreadPool:FairShare', async function () {
+ await fixedThreadTestFairShare()
+ })
.add('Poolifier:Dynamic:ThreadPool', async function () {
await dynamicThreadTest()
})
.add('Poolifier:Dynamic:ThreadPool:LessRecentlyUsed', async function () {
await dynamicThreadTestLessRecentlyUsed()
})
+ .add('Poolifier:Dynamic:ThreadPool:FairShare', async function () {
+ await dynamicThreadTestFairShare()
+ })
.add('Poolifier:Fixed:ClusterPool', async function () {
await fixedClusterTest()
})
.add('Poolifier:Fixed:ClusterPool:LessRecentlyUsed', async function () {
await fixedClusterTestLessRecentlyUsed()
})
+ .add('Poolifier:Fixed:ClusterPool:FairShare', async function () {
+ await fixedClusterTestFairShare()
+ })
.add('Poolifier:Dynamic:ClusterPool', async function () {
await dynamicClusterTest()
})
.add('Poolifier:Dynamic:ClusterPool:LessRecentlyUsed', async function () {
await dynamicClusterTestLessRecentlyUsed()
})
+ .add('Poolifier:Dynamic:ClusterPool:FairShare', async function () {
+ await dynamicClusterTestFairShare()
+ })
// Add listeners
.on('cycle', function (event) {
console.log(event.target.toString())
}
return null
})
- .catch(err => console.error(err))
+ .catch(err => {
+ console.error(err)
+ return reject(err)
+ })
}
})
}
{ workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED }
)
+const dynamicPoolFairShare = new DynamicClusterPool(
+ size / 2,
+ size * 3,
+ './benchmarks/internal/cluster/worker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
+)
+
async function dynamicClusterTest (
{ tasks, workerData } = { tasks: numberOfTasks, workerData: { proof: 'ok' } }
) {
return runPoolifierTest(dynamicPoolLessRecentlyUsed, { tasks, workerData })
}
+async function dynamicClusterTestFairShare (
+ { tasks, workerData } = { tasks: numberOfTasks, workerData: { proof: 'ok' } }
+) {
+ return runPoolifierTest(dynamicPoolFairShare, { tasks, workerData })
+}
+
module.exports = {
dynamicClusterTest,
+ dynamicClusterTestFairShare,
dynamicClusterTestLessRecentlyUsed
}
{ workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED }
)
+const fixedPoolFairShare = new FixedClusterPool(
+ size,
+ './benchmarks/internal/cluster/worker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
+)
+
async function fixedClusterTest (
{ tasks, workerData } = { tasks: numberOfTasks, workerData: { proof: 'ok' } }
) {
return runPoolifierTest(fixedPoolLessRecentlyUsed, { tasks, workerData })
}
-module.exports = { fixedClusterTest, fixedClusterTestLessRecentlyUsed }
+async function fixedClusterTestFairShare (
+ { tasks, workerData } = { tasks: numberOfTasks, workerData: { proof: 'ok' } }
+) {
+ return runPoolifierTest(fixedPoolFairShare, { tasks, workerData })
+}
+
+module.exports = {
+ fixedClusterTest,
+ fixedClusterTestFairShare,
+ fixedClusterTestLessRecentlyUsed
+}
'use strict'
+const { isMaster } = require('cluster')
const { ClusterWorker } = require('../../../lib/index')
const { jsonIntegerSerialization } = require('../benchmark-utils')
+const debug = false
+
function yourFunction (data) {
jsonIntegerSerialization(1000)
- // console.log('This is the main thread ' + isMaster)
+ debug === true && console.debug('This is the main thread ' + isMaster)
return { ok: 1 }
}
{ workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED }
)
+const dynamicPoolFairShare = new DynamicThreadPool(
+ size / 2,
+ size * 3,
+ './benchmarks/internal/thread/worker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
+)
+
async function dynamicThreadTest (
{ tasks, workerData } = { tasks: numberOfTasks, workerData: { proof: 'ok' } }
) {
return runPoolifierTest(dynamicPoolLessRecentlyUsed, { tasks, workerData })
}
+async function dynamicThreadTestFairShare (
+ { tasks, workerData } = { tasks: numberOfTasks, workerData: { proof: 'ok' } }
+) {
+ return runPoolifierTest(dynamicPoolFairShare, { tasks, workerData })
+}
+
module.exports = {
dynamicThreadTest,
+ dynamicThreadTestFairShare,
dynamicThreadTestLessRecentlyUsed
}
{ workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED }
)
+const fixedPoolFairShare = new FixedThreadPool(
+ size,
+ './benchmarks/internal/thread/worker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
+)
+
async function fixedThreadTest (
{ tasks, workerData } = { tasks: numberOfTasks, workerData: { proof: 'ok' } }
) {
return runPoolifierTest(fixedPoolLessRecentlyUsed, { tasks, workerData })
}
-module.exports = { fixedThreadTest, fixedThreadTestLessRecentlyUsed }
+async function fixedThreadTestFairShare (
+ { tasks, workerData } = { tasks: numberOfTasks, workerData: { proof: 'ok' } }
+) {
+ return runPoolifierTest(fixedPoolFairShare, { tasks, workerData })
+}
+
+module.exports = {
+ fixedThreadTest,
+ fixedThreadTestFairShare,
+ fixedThreadTestLessRecentlyUsed
+}
'use strict'
+const { isMainThread } = require('worker_threads')
const { ThreadWorker } = require('../../../lib/index')
const { jsonIntegerSerialization } = require('../benchmark-utils')
+const debug = false
+
function yourFunction (data) {
jsonIntegerSerialization(1000)
- // console.log('This is the main thread ' + isMainThread)
+ debug === true && console.debug('This is the main thread ' + isMainThread)
return { ok: 1 }
}
...(isDevelopmentBuild && { preserveModulesRoot: 'src' }),
...(!isDevelopmentBuild && { plugins: [terser({ numWorkers: 2 })] })
},
- external: ['async_hooks', 'cluster', 'events', 'worker_threads'],
+ external: ['async_hooks', 'cluster', 'events', 'os', 'worker_threads'],
plugins: [
ts({
tsconfig: isDevelopmentBuild
--- /dev/null
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+
+/**
+ * Worker virtual task timestamp.
+ */
+type WorkerVirtualTaskTimestamp = {
+ start: number
+ end: number
+}
+
+/**
+ * Selects the next worker with a fair share scheduling algorithm.
+ * Loosely modeled after the fair queueing algorithm: https://en.wikipedia.org/wiki/Fair_queuing.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export class FairShareWorkerChoiceStrategy<
+ Worker extends AbstractPoolWorker,
+ Data,
+ Response
+> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+ /**
+ * Worker last virtual task execution timestamp.
+ */
+ private workerLastVirtualTaskTimestamp: Map<
+ Worker,
+ WorkerVirtualTaskTimestamp
+ > = new Map<Worker, WorkerVirtualTaskTimestamp>()
+
+ /** @inheritDoc */
+ public choose (): Worker {
+ this.updateWorkerLastVirtualTaskTimestamp()
+ let minWorkerVirtualTaskEndTimestamp = Infinity
+ let chosenWorker!: Worker
+ for (const worker of this.pool.workers) {
+ const workerLastVirtualTaskEndTimestamp =
+ this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? 0
+ if (
+ workerLastVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
+ ) {
+ minWorkerVirtualTaskEndTimestamp = workerLastVirtualTaskEndTimestamp
+ chosenWorker = worker
+ }
+ }
+ return chosenWorker
+ }
+
+ /**
+ * Compute workers last virtual task timestamp.
+ */
+ private updateWorkerLastVirtualTaskTimestamp () {
+ for (const worker of this.pool.workers) {
+ const workerVirtualTaskStartTimestamp = Math.max(
+ Date.now(),
+ this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? 0
+ )
+ const workerVirtualTaskEndTimestamp =
+ workerVirtualTaskStartTimestamp +
+ (this.pool.getWorkerAverageTasksRunTime(worker) ?? 0)
+ this.workerLastVirtualTaskTimestamp.set(worker, {
+ start: workerVirtualTaskStartTimestamp,
+ end: workerVirtualTaskEndTimestamp
+ })
+ }
+ }
+}
* Less recently used worker selection strategy.
*/
LESS_RECENTLY_USED: 'LESS_RECENTLY_USED',
+ /**
+ * Fair share worker selection strategy.
+ */
+ FAIR_SHARE: 'FAIR_SHARE',
/**
* Weighted round robin worker selection strategy.
*/
import type { AbstractPoolWorker } from '../abstract-pool-worker'
import type { IPoolInternal } from '../pool-internal'
+import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy'
import { LessRecentlyUsedWorkerChoiceStrategy } from './less-recently-used-worker-choice-strategy'
import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
import type {
return new RoundRobinWorkerChoiceStrategy(pool)
case WorkerChoiceStrategies.LESS_RECENTLY_USED:
return new LessRecentlyUsedWorkerChoiceStrategy(pool)
+ case WorkerChoiceStrategies.FAIR_SHARE:
+ return new FairShareWorkerChoiceStrategy(pool)
case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN:
return new WeightedRoundRobinWorkerChoiceStrategy(pool)
default:
>()
/**
- * Constructs a worker choice strategy that selects based a weighted round robin scheduling algorithm.
+ * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
*
* @param pool The pool instance.
*/
const {
LessRecentlyUsedWorkerChoiceStrategy
} = require('../../../lib/pools/selection-strategies/less-recently-used-worker-choice-strategy')
+const {
+ FairShareWorkerChoiceStrategy
+} = require('../../../lib/pools/selection-strategies/fair-share-worker-choice-strategy')
+// const {
+// WeightedRoundRobinWorkerChoiceStrategy
+// } = require('../../../lib/pools/selection-strategies/weighted-round-robin-choice-strategy')
describe('Selection strategies utils test suite', () => {
let pool
expect(strategy).toBeInstanceOf(LessRecentlyUsedWorkerChoiceStrategy)
})
+ it('Verify that getWorkerChoiceStrategy() can return FAIR_SHARE strategy', () => {
+ const strategy = SelectionStrategiesUtils.getWorkerChoiceStrategy(
+ pool,
+ WorkerChoiceStrategies.FAIR_SHARE
+ )
+ expect(strategy).toBeInstanceOf(FairShareWorkerChoiceStrategy)
+ })
+
+ // it('Verify that getWorkerChoiceStrategy() can return WEIGHTED_ROUND_ROBIN strategy', () => {
+ // const strategy = SelectionStrategiesUtils.getWorkerChoiceStrategy(
+ // pool,
+ // WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+ // )
+ // expect(strategy).toBeInstanceOf(WeightedRoundRobinWorkerChoiceStrategy)
+ // })
+
it('Verify that getWorkerChoiceStrategy() throw error on unknown strategy', () => {
expect(() => {
SelectionStrategiesUtils.getWorkerChoiceStrategy(pool, 'UNKNOWN_STRATEGY')
it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
expect(WorkerChoiceStrategies.LESS_RECENTLY_USED).toBe('LESS_RECENTLY_USED')
+ expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
'WEIGHTED_ROUND_ROBIN'
)
await pool.destroy()
})
+ it('Verify FAIR_SHARE strategy is taken at pool creation', async () => {
+ const max = 3
+ const pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
+ )
+ expect(pool.opts.workerChoiceStrategy).toBe(
+ WorkerChoiceStrategies.FAIR_SHARE
+ )
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify FAIR_SHARE strategy can be set after pool creation', async () => {
+ const max = 3
+ const pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
+ expect(pool.opts.workerChoiceStrategy).toBe(
+ WorkerChoiceStrategies.FAIR_SHARE
+ )
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
+ const max = 3
+ const pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
+ )
+ // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
+ const promises = []
+ for (let i = 0; i < max * 2; i++) {
+ promises.push(pool.execute({ test: 'test' }))
+ }
+ await Promise.all(promises)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify FAIR_SHARE strategy can be run in a dynamic pool', async () => {
+ const min = 0
+ const max = 3
+ const pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
+ )
+ // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
+ const promises = []
+ for (let i = 0; i < max * 2; i++) {
+ promises.push(pool.execute({ test: 'test' }))
+ }
+ await Promise.all(promises)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
it('Verify WEIGHTED_ROUND_ROBIN strategy is taken at pool creation', async () => {
const max = 3
const pool = new FixedThreadPool(
DynamicThreadPool,
WorkerChoiceStrategies
} = require('../../../lib/index')
+const {
+ WorkerChoiceStrategyContext
+} = require('../../../lib/pools/selection-strategies/worker-choice-strategy-context')
const {
RoundRobinWorkerChoiceStrategy
} = require('../../../lib/pools/selection-strategies/round-robin-worker-choice-strategy')
LessRecentlyUsedWorkerChoiceStrategy
} = require('../../../lib/pools/selection-strategies/less-recently-used-worker-choice-strategy')
const {
- WorkerChoiceStrategyContext
-} = require('../../../lib/pools/selection-strategies/worker-choice-strategy-context')
+ FairShareWorkerChoiceStrategy
+} = require('../../../lib/pools/selection-strategies/fair-share-worker-choice-strategy')
+// const {
+// WeightedRoundRobinWorkerChoiceStrategy
+// } = require('../../../lib/pools/selection-strategies/weighted-round-robin-choice-strategy')
const {
DynamicPoolWorkerChoiceStrategy
} = require('../../../lib/pools/selection-strategies/dynamic-pool-worker-choice-strategy')
)
})
- it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and fixed pool', () => {
+ it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and dynamic pool', () => {
const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
dynamicPool
)
)
})
- it('Verify that setWorkerChoiceStrategy() works with LESS_RECENTLY_USED and fixed pool', () => {
+ it('Verify that setWorkerChoiceStrategy() works with LESS_RECENTLY_USED and dynamic pool', () => {
const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
dynamicPool
)
DynamicPoolWorkerChoiceStrategy
)
})
+
+ it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and fixed pool', () => {
+ const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ fixedPool
+ )
+ workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ WorkerChoiceStrategies.FAIR_SHARE
+ )
+ expect(workerChoiceStrategyContext.workerChoiceStrategy).toBeInstanceOf(
+ FairShareWorkerChoiceStrategy
+ )
+ })
+
+ it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and dynamic pool', () => {
+ const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ dynamicPool
+ )
+ workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ WorkerChoiceStrategies.FAIR_SHARE
+ )
+ expect(workerChoiceStrategyContext.workerChoiceStrategy).toBeInstanceOf(
+ DynamicPoolWorkerChoiceStrategy
+ )
+ })
+
+ // it('Verify that setWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and fixed pool', () => {
+ // const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ // fixedPool
+ // )
+ // workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ // WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+ // )
+ // expect(workerChoiceStrategyContext.workerChoiceStrategy).toBeInstanceOf(
+ // WeightedRoundRobinWorkerChoiceStrategy
+ // )
+ // })
+
+ // it('Verify that setWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and dynamic pool', () => {
+ // const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ // dynamicPool
+ // )
+ // workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ // WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+ // )
+ // expect(workerChoiceStrategyContext.workerChoiceStrategy).toBeInstanceOf(
+ // DynamicPoolWorkerChoiceStrategy
+ // )
+ // })
})