## [Unreleased]
+### Fixed
+
+- Fix worker choice strategy retries mechanism on some edge cases.
+
## [2.6.30] - 2023-08-19
### Fixed
if (this.shallCreateDynamicWorker()) {
const workerNodeKey = this.createAndSetupDynamicWorkerNode()
if (
- this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
+ this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
) {
return workerNodeKey
}
workerId: workerInfo.id as number
})
workerInfo.dynamic = true
- if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
+ if (
+ this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
+ this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ ) {
workerInfo.ready = true
}
this.checkAndEmitDynamicWorkerCreationEvents()
/**
* The next worker node key.
*/
- protected nextWorkerNodeKey: number = 0
+ protected nextWorkerNodeKey: number | undefined = 0
/** @inheritDoc */
public readonly strategyPolicy: StrategyPolicy = {
- useDynamicWorker: false
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: false
}
/** @inheritDoc */
public abstract update (workerNodeKey: number): boolean
/** @inheritDoc */
- public abstract choose (): number
+ public abstract choose (): number | undefined
/** @inheritDoc */
public abstract remove (workerNodeKey: number): boolean
: this.pool.workerNodes[workerNodeKey].usage.elu.active?.average ?? 0
}
+ /**
+ * Assign to nextWorkerNodeKey property the chosen worker node key.
+ *
+ * @param chosenWorkerNodeKey - The chosen worker node key.
+ */
+ protected assignChosenWorkerNodeKey (
+ chosenWorkerNodeKey: number | undefined
+ ): void {
+ if (chosenWorkerNodeKey != null) {
+ this.nextWorkerNodeKey = chosenWorkerNodeKey
+ } else {
+ this.nextWorkerNodeKey = undefined
+ }
+ }
+
protected computeDefaultWorkerWeight (): number {
let cpusCycleTimeWeight = 0
for (const cpu of cpus()) {
import {
type IWorkerChoiceStrategy,
Measurements,
+ type StrategyPolicy,
type TaskStatisticsRequirements,
type WorkerChoiceStrategyOptions
} from './selection-strategies-types'
>
extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
implements IWorkerChoiceStrategy {
+ /** @inheritDoc */
+ public readonly strategyPolicy: StrategyPolicy = {
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
+ }
+
/** @inheritDoc */
public readonly taskStatisticsRequirements: TaskStatisticsRequirements = {
runTime: {
}
/** @inheritDoc */
- public choose (): number {
- return this.fairShareNextWorkerNodeKey()
+ public choose (): number | undefined {
+ const chosenWorkerNodeKey = this.fairShareNextWorkerNodeKey()
+ this.assignChosenWorkerNodeKey(chosenWorkerNodeKey)
+ return this.nextWorkerNodeKey
}
/** @inheritDoc */
return true
}
- private fairShareNextWorkerNodeKey (): number {
+ private fairShareNextWorkerNodeKey (): number | undefined {
let minWorkerVirtualTaskEndTimestamp = Infinity
+ let chosenWorkerNodeKey: number | undefined
for (const [workerNodeKey] of this.pool.workerNodes.entries()) {
if (this.workersVirtualTaskEndTimestamp[workerNodeKey] == null) {
this.computeWorkerVirtualTaskEndTimestamp(workerNodeKey)
workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
) {
minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp
- this.nextWorkerNodeKey = workerNodeKey
+ chosenWorkerNodeKey = workerNodeKey
}
}
- return this.nextWorkerNodeKey
+ return chosenWorkerNodeKey
}
/**
implements IWorkerChoiceStrategy {
/** @inheritDoc */
public readonly strategyPolicy: StrategyPolicy = {
- useDynamicWorker: true
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
}
/**
}
/** @inheritDoc */
- public choose (): number {
+ public choose (): number | undefined {
let roundId: number | undefined
let workerNodeId: number | undefined
for (
roundIndex++
) {
for (
- let workerNodeKey = this.nextWorkerNodeKey;
+ let workerNodeKey = this.nextWorkerNodeKey ?? 0;
workerNodeKey < this.pool.workerNodes.length;
workerNodeKey++
) {
}
}
}
- this.roundId = roundId ?? 0
- this.nextWorkerNodeKey = workerNodeId ?? 0
+ this.roundId = roundId as number
+ this.nextWorkerNodeKey = workerNodeId
const chosenWorkerNodeKey = this.nextWorkerNodeKey
if (this.nextWorkerNodeKey === this.pool.workerNodes.length - 1) {
this.nextWorkerNodeKey = 0
this.roundId =
this.roundId === this.roundWeights.length - 1 ? 0 : this.roundId + 1
} else {
- this.nextWorkerNodeKey = this.nextWorkerNodeKey + 1
+ this.nextWorkerNodeKey = (this.nextWorkerNodeKey ?? 0) + 1
}
return chosenWorkerNodeKey
}
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
+ StrategyPolicy,
TaskStatisticsRequirements,
WorkerChoiceStrategyOptions
} from './selection-strategies-types'
>
extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
implements IWorkerChoiceStrategy {
+ /** @inheritDoc */
+ public readonly strategyPolicy: StrategyPolicy = {
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
+ }
+
/** @inheritDoc */
public readonly taskStatisticsRequirements: TaskStatisticsRequirements = {
runTime: {
}
/** @inheritDoc */
- public choose (): number {
- return this.leastBusyNextWorkerNodeKey()
+ public choose (): number | undefined {
+ const chosenWorkerNodeKey = this.leastBusyNextWorkerNodeKey()
+ this.assignChosenWorkerNodeKey(chosenWorkerNodeKey)
+ return this.nextWorkerNodeKey
}
/** @inheritDoc */
return true
}
- private leastBusyNextWorkerNodeKey (): number {
+ private leastBusyNextWorkerNodeKey (): number | undefined {
let minTime = Infinity
+ let chosenWorkerNodeKey: number | undefined
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
const workerTime =
(workerNode.usage.runTime?.aggregate ?? 0) +
(workerNode.usage.waitTime?.aggregate ?? 0)
if (this.isWorkerNodeEligible(workerNodeKey) && workerTime === 0) {
- this.nextWorkerNodeKey = workerNodeKey
+ chosenWorkerNodeKey = workerNodeKey
break
} else if (
this.isWorkerNodeEligible(workerNodeKey) &&
workerTime < minTime
) {
minTime = workerTime
- this.nextWorkerNodeKey = workerNodeKey
+ chosenWorkerNodeKey = workerNodeKey
}
}
- return this.nextWorkerNodeKey
+ return chosenWorkerNodeKey
}
}
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
+ StrategyPolicy,
TaskStatisticsRequirements,
WorkerChoiceStrategyOptions
} from './selection-strategies-types'
>
extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
implements IWorkerChoiceStrategy {
+ /** @inheritDoc */
+ public readonly strategyPolicy: StrategyPolicy = {
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
+ }
+
/** @inheritDoc */
public readonly taskStatisticsRequirements: TaskStatisticsRequirements = {
runTime: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
}
/** @inheritDoc */
- public choose (): number {
- return this.leastEluNextWorkerNodeKey()
+ public choose (): number | undefined {
+ const chosenWorkerNodeKey = this.leastEluNextWorkerNodeKey()
+ this.assignChosenWorkerNodeKey(chosenWorkerNodeKey)
+ return this.nextWorkerNodeKey
}
/** @inheritDoc */
return true
}
- private leastEluNextWorkerNodeKey (): number {
+ private leastEluNextWorkerNodeKey (): number | undefined {
let minWorkerElu = Infinity
+ let chosenWorkerNodeKey: number | undefined
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
const workerUsage = workerNode.usage
const workerElu = workerUsage.elu?.active?.aggregate ?? 0
if (this.isWorkerNodeEligible(workerNodeKey) && workerElu === 0) {
- this.nextWorkerNodeKey = workerNodeKey
+ chosenWorkerNodeKey = workerNodeKey
break
} else if (
this.isWorkerNodeEligible(workerNodeKey) &&
workerElu < minWorkerElu
) {
minWorkerElu = workerElu
- this.nextWorkerNodeKey = workerNodeKey
+ chosenWorkerNodeKey = workerNodeKey
}
}
- return this.nextWorkerNodeKey
+ return chosenWorkerNodeKey
}
}
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
+ StrategyPolicy,
WorkerChoiceStrategyOptions
} from './selection-strategies-types'
>
extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
implements IWorkerChoiceStrategy {
+ /** @inheritDoc */
+ public readonly strategyPolicy: StrategyPolicy = {
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
+ }
+
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
}
/** @inheritDoc */
- public choose (): number {
- return this.leastUsedNextWorkerNodeKey()
+ public choose (): number | undefined {
+ const chosenWorkerNodeKey = this.leastUsedNextWorkerNodeKey()
+ this.assignChosenWorkerNodeKey(chosenWorkerNodeKey)
+ return this.nextWorkerNodeKey
}
/** @inheritDoc */
return true
}
- private leastUsedNextWorkerNodeKey (): number {
+ private leastUsedNextWorkerNodeKey (): number | undefined {
let minNumberOfTasks = Infinity
+ let chosenWorkerNodeKey: number | undefined
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
const workerTaskStatistics = workerNode.usage.tasks
const workerTasks =
workerTaskStatistics.executing +
workerTaskStatistics.queued
if (this.isWorkerNodeEligible(workerNodeKey) && workerTasks === 0) {
- this.nextWorkerNodeKey = workerNodeKey
+ chosenWorkerNodeKey = workerNodeKey
break
} else if (
this.isWorkerNodeEligible(workerNodeKey) &&
workerTasks < minNumberOfTasks
) {
minNumberOfTasks = workerTasks
- this.nextWorkerNodeKey = workerNodeKey
+ chosenWorkerNodeKey = workerNodeKey
}
}
- return this.nextWorkerNodeKey
+ return chosenWorkerNodeKey
}
}
implements IWorkerChoiceStrategy {
/** @inheritDoc */
public readonly strategyPolicy: StrategyPolicy = {
- useDynamicWorker: true
+ dynamicWorkerUsage: true,
+ dynamicWorkerReady: true
}
/** @inheritDoc */
}
/** @inheritDoc */
- public choose (): number {
+ public choose (): number | undefined {
const chosenWorkerNodeKey = this.nextWorkerNodeKey
- do {
- this.roundRobinNextWorkerNodeKey()
- } while (!this.isWorkerNodeEligible(this.nextWorkerNodeKey))
+ this.roundRobinNextWorkerNodeKey()
+ if (!this.isWorkerNodeEligible(this.nextWorkerNodeKey as number)) {
+ this.nextWorkerNodeKey = undefined
+ }
return chosenWorkerNodeKey
}
return true
}
- private roundRobinNextWorkerNodeKey (): number {
+ private roundRobinNextWorkerNodeKey (): number | undefined {
this.nextWorkerNodeKey =
this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
? 0
- : this.nextWorkerNodeKey + 1
+ : (this.nextWorkerNodeKey ?? 0) + 1
return this.nextWorkerNodeKey
}
}
*/
export interface StrategyPolicy {
/**
- * Expects direct usage of the newly created dynamic worker.
+ * Expects tasks execution on the newly created dynamic worker.
*/
- readonly useDynamicWorker: boolean
+ readonly dynamicWorkerUsage: boolean
+ /**
+ * Expects the newly created dynamic worker to be flagged as ready.
+ */
+ readonly dynamicWorkerReady: boolean
}
/**
readonly update: (workerNodeKey: number) => boolean
/**
* Chooses a worker node in the pool and returns its key.
+ * If the worker node is not eligible, `undefined` is returned.
*
- * @returns The worker node key.
+ * @returns The worker node key or `undefined`.
*/
- readonly choose: () => number
+ readonly choose: () => number | undefined
/**
* Removes the worker node key from strategy internals.
*
implements IWorkerChoiceStrategy {
/** @inheritDoc */
public readonly strategyPolicy: StrategyPolicy = {
- useDynamicWorker: true
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
}
/** @inheritDoc */
}
/** @inheritDoc */
- public choose (): number {
+ public choose (): number | undefined {
const chosenWorkerNodeKey = this.nextWorkerNodeKey
- do {
- this.weightedRoundRobinNextWorkerNodeKey()
- } while (!this.isWorkerNodeEligible(this.nextWorkerNodeKey))
+ this.weightedRoundRobinNextWorkerNodeKey()
+ if (!this.isWorkerNodeEligible(this.nextWorkerNodeKey as number)) {
+ this.nextWorkerNodeKey = undefined
+ }
return chosenWorkerNodeKey
}
return true
}
- private weightedRoundRobinNextWorkerNodeKey (): number {
+ private weightedRoundRobinNextWorkerNodeKey (): number | undefined {
const workerVirtualTaskRunTime = this.workerVirtualTaskRunTime
const workerWeight =
- this.opts.weights?.[this.nextWorkerNodeKey] ?? this.defaultWorkerWeight
+ this.opts.weights?.[this.nextWorkerNodeKey ?? 0] ??
+ this.defaultWorkerWeight
if (workerVirtualTaskRunTime < workerWeight) {
this.workerVirtualTaskRunTime =
workerVirtualTaskRunTime +
- this.getWorkerTaskRunTime(this.nextWorkerNodeKey)
+ this.getWorkerTaskRunTime(this.nextWorkerNodeKey ?? 0)
} else {
this.nextWorkerNodeKey =
this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
? 0
- : this.nextWorkerNodeKey + 1
+ : (this.nextWorkerNodeKey ?? 0) + 1
this.workerVirtualTaskRunTime = 0
}
return this.nextWorkerNodeKey
const { expect } = require('expect')
+const sinon = require('sinon')
const {
DynamicClusterPool,
DynamicThreadPool,
})
it.skip("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
- const pool = new DynamicThreadPool(
- Math.floor(numberOfWorkers / 2),
+ const pool = new FixedThreadPool(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.js',
{
enableTasksQueue: true
}
)
+ for (const workerNode of pool.workerNodes) {
+ workerNode.hasBackPressure = sinon
+ .stub()
+ .onFirstCall()
+ .returns(true)
+ .returns(false)
+ }
const promises = new Set()
let poolBackPressure = 0
let poolInfo
++poolBackPressure
poolInfo = info
})
- for (let i = 0; i < Math.pow(numberOfWorkers, 2); i++) {
+ for (let i = 0; i < numberOfWorkers * 2; i++) {
promises.add(pool.execute())
}
+ // console.log(pool.info.backPressure)
await Promise.all(promises)
+ // console.log(pool.info.backPressure)
expect(poolBackPressure).toBe(1)
expect(poolInfo).toStrictEqual({
version,
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: true
+ dynamicWorkerUsage: true,
+ dynamicWorkerReady: true
})
await pool.destroy()
pool = new DynamicThreadPool(
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: true
+ dynamicWorkerUsage: true,
+ dynamicWorkerReady: true
})
// We need to clean up the resources after our test
await pool.destroy()
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: false
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
})
await pool.destroy()
pool = new DynamicThreadPool(
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: false
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
})
// We need to clean up the resources after our test
await pool.destroy()
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: false
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
})
await pool.destroy()
pool = new DynamicThreadPool(
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: false
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
})
// We need to clean up the resources after our test
await pool.destroy()
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: false
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
})
await pool.destroy()
pool = new DynamicThreadPool(
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: false
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
})
// We need to clean up the resources after our test
await pool.destroy()
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: false
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
})
await pool.destroy()
pool = new DynamicThreadPool(
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: false
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
})
// We need to clean up the resources after our test
await pool.destroy()
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: true
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
})
await pool.destroy()
pool = new DynamicThreadPool(
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: true
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
})
// We need to clean up the resources after our test
await pool.destroy()
maxQueued: 0,
failed: 0
},
- runTime: {
- aggregate: expect.any(Number),
- maximum: expect.any(Number),
- minimum: expect.any(Number),
- average: expect.any(Number),
+ runTime: expect.objectContaining({
history: expect.any(CircularArray)
- },
+ }),
waitTime: {
history: expect.any(CircularArray)
},
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
max * maxMultiplier
)
- expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
- expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
+ if (workerNode.usage.runTime.aggregate == null) {
+ expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.runTime.average == null) {
+ expect(workerNode.usage.runTime.average).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
+ }
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
maxQueued: 0,
failed: 0
},
- runTime: {
- aggregate: expect.any(Number),
- maximum: expect.any(Number),
- minimum: expect.any(Number),
- median: expect.any(Number),
+ runTime: expect.objectContaining({
history: expect.any(CircularArray)
- },
+ }),
waitTime: {
history: expect.any(CircularArray)
},
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
max * maxMultiplier
)
- expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
- expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
+ if (workerNode.usage.runTime.aggregate == null) {
+ expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.runTime.median == null) {
+ expect(workerNode.usage.runTime.median).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
+ }
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: true
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
})
await pool.destroy()
pool = new DynamicThreadPool(
{ workerChoiceStrategy }
)
expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
- useDynamicWorker: true
+ dynamicWorkerUsage: false,
+ dynamicWorkerReady: true
})
// We need to clean up the resources after our test
await pool.destroy()
for (const workerNode of pool.workerNodes) {
expect(workerNode.usage).toStrictEqual({
tasks: {
- executed: maxMultiplier,
+ executed: expect.any(Number),
executing: 0,
queued: 0,
maxQueued: 0,