protected removeWorker (worker: Worker): void {
// Clean worker from data structure
this.workers.splice(this.getWorkerIndex(worker), 1)
- this.resetWorkerTasksUsage(worker)
+ this.removeWorkerTasksUsage(worker)
}
/**
}
/**
- * Increase the number of tasks that the given worker has applied.
+ * Increases the number of tasks that the given worker has applied.
*
* @param worker Worker which running tasks is increased.
*/
}
/**
- * Decrease the number of tasks that the given worker has applied.
+ * Decreases the number of tasks that the given worker has applied.
*
* @param worker Worker which running tasks is decreased.
*/
}
/**
- * Step the number of tasks that the given worker has applied.
+ * Steps the number of tasks that the given worker has applied.
*
* @param worker Worker which running tasks are stepped.
* @param step Number of running tasks step.
}
/**
- * Step the number of tasks that the given worker has run.
+ * Steps the number of tasks that the given worker has run.
*
* @param worker Worker which has run tasks.
* @param step Number of run tasks step.
*/
- private stepWorkerRunTasks (worker: Worker, step: number) {
+ private stepWorkerRunTasks (worker: Worker, step: number): void {
const tasksUsage = this.workersTasksUsage.get(worker)
if (tasksUsage !== undefined) {
tasksUsage.run = tasksUsage.run + step
}
/**
- * Update tasks run time for the given worker.
+ * Updates tasks run time for the given worker.
*
* @param worker Worker which run the task.
* @param taskRunTime Worker task run time.
private updateWorkerTasksRunTime (
worker: Worker,
taskRunTime: number | undefined
- ) {
- const tasksUsage = this.workersTasksUsage.get(worker)
- if (tasksUsage !== undefined && tasksUsage.run !== 0) {
- tasksUsage.runTime += taskRunTime ?? 0
- tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
- this.workersTasksUsage.set(worker, tasksUsage)
- } else {
- throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
+ ): void {
+ if (
+ this.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ .requiredStatistics.runTime === true
+ ) {
+ const tasksUsage = this.workersTasksUsage.get(worker)
+ if (tasksUsage !== undefined && tasksUsage.run !== 0) {
+ tasksUsage.runTime += taskRunTime ?? 0
+ tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
+ this.workersTasksUsage.set(worker, tasksUsage)
+ } else {
+ throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
+ }
}
}
/**
- * Reset worker tasks usage statistics.
+ * Removes worker tasks usage statistics.
*
* @param worker The worker.
*/
- private resetWorkerTasksUsage (worker: Worker): void {
+ private removeWorkerTasksUsage (worker: Worker): void {
this.workersTasksUsage.delete(worker)
}
}
import type { AbstractPoolWorker } from '../abstract-pool-worker'
import type { IPoolInternal } from '../pool-internal'
import { PoolType } from '../pool-internal'
-import type { IWorkerChoiceStrategy } from './selection-strategies-types'
+import type {
+ IWorkerChoiceStrategy,
+ RequiredStatistics
+} from './selection-strategies-types'
/**
* Abstract worker choice strategy class.
> implements IWorkerChoiceStrategy<Worker> {
/** @inheritDoc */
public isDynamicPool: boolean = this.pool.type === PoolType.DYNAMIC
+ /** @inheritDoc */
+ public requiredStatistics: RequiredStatistics = {
+ runTime: false
+ }
/**
* Constructs a worker choice strategy attached to the pool.
this.pool,
workerChoiceStrategy
)
+ this.requiredStatistics = this.workerChoiceStrategy.requiredStatistics
}
/** @inheritDoc */
import type { AbstractPoolWorker } from '../abstract-pool-worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+import type { RequiredStatistics } from './selection-strategies-types'
/**
* Worker virtual task timestamp.
Data,
Response
> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+ /** @inheritDoc */
+ public requiredStatistics: RequiredStatistics = {
+ runTime: true
+ }
+
/**
* Worker last virtual task execution timestamp.
*/
*/
export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
+/**
+ * Tasks usage statistics requirements.
+ */
+export type RequiredStatistics = {
+ runTime: boolean
+}
+
/**
* Worker choice strategy interface.
*
* Is the pool attached to the strategy dynamic?.
*/
isDynamicPool: boolean
+ /**
+ * Required tasks usage statistics.
+ */
+ requiredStatistics: RequiredStatistics
/**
* Choose a worker in the pool.
*/
import type { AbstractPoolWorker } from '../abstract-pool-worker'
import type { IPoolInternal } from '../pool-internal'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+import type { RequiredStatistics } from './selection-strategies-types'
/**
* Task run time.
Data,
Response
> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+ /** @inheritDoc */
+ public requiredStatistics: RequiredStatistics = {
+ runTime: true
+ }
+
/**
* Worker index where the previous task was submitted.
*/
)
}
+ /**
+ * Get the worker choice strategy used in the context.
+ *
+ * @returns The worker choice strategy.
+ */
+ public getWorkerChoiceStrategy (): IWorkerChoiceStrategy<Worker> {
+ return this.workerChoiceStrategy
+ }
+
/**
* Set the worker choice strategy to use in the context.
*
pool.destroy()
})
- it('Simulate worker not found during updateWorkerTasksRunTime', () => {
+ it('Simulate worker not found during updateWorkerTasksRunTime with strategy not requiring it', () => {
const pool = new StubPoolWithWorkerTasksUsageMapClear(
numberOfWorkers,
'./tests/worker-files/cluster/testWorker.js',
)
// Simulate worker not found.
pool.removeAllWorker()
+ expect(() => pool.updateWorkerTasksRunTime()).not.toThrowError()
+ pool.destroy()
+ })
+
+ it('Simulate worker not found during updateWorkerTasksRunTime with strategy requiring it', () => {
+ const pool = new StubPoolWithWorkerTasksUsageMapClear(
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.js',
+ {
+ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
+ errorHandler: e => console.error(e)
+ }
+ )
+ // Simulate worker not found.
+ pool.removeAllWorker()
expect(() => pool.updateWorkerTasksRunTime()).toThrowError(
workerNotFoundInTasksUsageMapError
)
await pool.destroy()
})
+ it('Verify ROUND_ROBIN strategy default tasks usage statistics requirements', async () => {
+ const min = 0
+ const max = 3
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
+ expect(
+ pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ .requiredStatistics.runTime
+ ).toBe(false)
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
+ expect(
+ pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ .requiredStatistics.runTime
+ ).toBe(false)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
const max = 3
const pool = new FixedThreadPool(
await pool.destroy()
})
+ it('Verify LESS_RECENTLY_USED strategy default tasks usage statistics requirements', async () => {
+ const min = 0
+ const max = 3
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_RECENTLY_USED)
+ expect(
+ pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ .requiredStatistics.runTime
+ ).toBe(false)
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_RECENTLY_USED)
+ expect(
+ pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ .requiredStatistics.runTime
+ ).toBe(false)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
it('Verify LESS_RECENTLY_USED strategy can be run in a fixed pool', async () => {
const max = 3
const pool = new FixedThreadPool(
await pool.destroy()
})
+ it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => {
+ const min = 0
+ const max = 3
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
+ expect(
+ pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ .requiredStatistics.runTime
+ ).toBe(true)
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
+ expect(
+ pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ .requiredStatistics.runTime
+ ).toBe(true)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => {
const max = 3
const pool = new FixedThreadPool(
await pool.destroy()
})
+ it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => {
+ const min = 0
+ const max = 3
+ let pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
+ expect(
+ pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ .requiredStatistics.runTime
+ ).toBe(true)
+ pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
+ expect(
+ pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+ .requiredStatistics.runTime
+ ).toBe(true)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
const max = 3
const pool = new FixedThreadPool(