worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
worker.on('error', error => {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ workerInfo.ready = false
if (this.emitter != null) {
this.emitter.emit(PoolEvents.error, error)
}
if (this.opts.restartWorkerOnError === true && !this.starting) {
- if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
+ if (workerInfo.dynamic) {
this.createAndSetupDynamicWorker()
} else {
this.createAndSetupWorker()
}
}
if (this.opts.enableTasksQueue === true) {
- this.redistributeQueuedTasks(worker)
+ this.redistributeQueuedTasks(workerNodeKey)
}
})
worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
return worker
}
- private redistributeQueuedTasks (worker: Worker): void {
- const workerNodeKey = this.getWorkerNodeKey(worker)
+ private redistributeQueuedTasks (workerNodeKey: number): void {
while (this.tasksQueueSize(workerNodeKey) > 0) {
let targetWorkerNodeKey: number = workerNodeKey
let minQueuedTasks = Infinity
// private toggleFindLastFreeWorkerNodeKey: boolean = false
/**
- * Id of the next worker node.
+ * The next worker node key.
*/
- protected nextWorkerNodeId: number = 0
+ protected nextWorkerNodeKey: number = 0
/** @inheritDoc */
public readonly strategyPolicy: StrategyPolicy = {
this.setTaskStatisticsRequirements(this.opts)
}
- protected workerNodeReady (workerNodeKey: number): boolean {
+ protected isWorkerNodeReady (workerNodeKey: number): boolean {
return this.pool.workerNodes[workerNodeKey].info.ready
}
/** @inheritDoc */
public choose (): number {
+ this.fairShareNextWorkerNodeKey()
+ return this.nextWorkerNodeKey
+ }
+
+ /** @inheritDoc */
+ public remove (workerNodeKey: number): boolean {
+ this.workersVirtualTaskEndTimestamp.splice(workerNodeKey, 1)
+ return true
+ }
+
+ private fairShareNextWorkerNodeKey (): void {
let minWorkerVirtualTaskEndTimestamp = Infinity
for (const [workerNodeKey] of this.pool.workerNodes.entries()) {
if (this.workersVirtualTaskEndTimestamp[workerNodeKey] == null) {
const workerVirtualTaskEndTimestamp =
this.workersVirtualTaskEndTimestamp[workerNodeKey]
if (
- this.workerNodeReady(workerNodeKey) &&
+ this.isWorkerNodeReady(workerNodeKey) &&
workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
) {
minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp
- this.nextWorkerNodeId = workerNodeKey
+ this.nextWorkerNodeKey = workerNodeKey
}
}
- return this.nextWorkerNodeId
- }
-
- /** @inheritDoc */
- public remove (workerNodeKey: number): boolean {
- this.workersVirtualTaskEndTimestamp.splice(workerNodeKey, 1)
- return true
}
/**
/** @inheritDoc */
public reset (): boolean {
- this.nextWorkerNodeId = 0
+ this.nextWorkerNodeKey = 0
this.roundId = 0
return true
}
roundIndex++
) {
for (
- let workerNodeKey = this.nextWorkerNodeId;
+ let workerNodeKey = this.nextWorkerNodeKey;
workerNodeKey < this.pool.workerNodes.length;
workerNodeKey++
) {
}
}
this.roundId = roundId ?? 0
- this.nextWorkerNodeId = workerNodeId ?? 0
- const chosenWorkerNodeKey = this.nextWorkerNodeId
- if (this.nextWorkerNodeId === this.pool.workerNodes.length - 1) {
- this.nextWorkerNodeId = 0
+ this.nextWorkerNodeKey = workerNodeId ?? 0
+ const chosenWorkerNodeKey = this.nextWorkerNodeKey
+ if (this.nextWorkerNodeKey === this.pool.workerNodes.length - 1) {
+ this.nextWorkerNodeKey = 0
this.roundId =
this.roundId === this.roundWeights.length - 1 ? 0 : this.roundId + 1
} else {
- this.nextWorkerNodeId = this.nextWorkerNodeId + 1
+ this.nextWorkerNodeKey = this.nextWorkerNodeKey + 1
}
return chosenWorkerNodeKey
}
/** @inheritDoc */
public remove (workerNodeKey: number): boolean {
- if (this.nextWorkerNodeId === workerNodeKey) {
+ if (this.nextWorkerNodeKey === workerNodeKey) {
if (this.pool.workerNodes.length === 0) {
- this.nextWorkerNodeId = 0
- } else if (this.nextWorkerNodeId > this.pool.workerNodes.length - 1) {
- this.nextWorkerNodeId = this.pool.workerNodes.length - 1
+ this.nextWorkerNodeKey = 0
+ } else if (this.nextWorkerNodeKey > this.pool.workerNodes.length - 1) {
+ this.nextWorkerNodeKey = this.pool.workerNodes.length - 1
this.roundId =
this.roundId === this.roundWeights.length - 1 ? 0 : this.roundId + 1
}
/** @inheritDoc */
public choose (): number {
+ this.leastBusyNextWorkerNodeKey()
+ return this.nextWorkerNodeKey
+ }
+
+ /** @inheritDoc */
+ public remove (): boolean {
+ return true
+ }
+
+ private leastBusyNextWorkerNodeKey (): void {
let minTime = Infinity
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
const workerTime =
(workerNode.usage.runTime?.aggregate ?? 0) +
(workerNode.usage.waitTime?.aggregate ?? 0)
- if (this.workerNodeReady(workerNodeKey) && workerTime === 0) {
- this.nextWorkerNodeId = workerNodeKey
+ if (this.isWorkerNodeReady(workerNodeKey) && workerTime === 0) {
+ this.nextWorkerNodeKey = workerNodeKey
break
- } else if (this.workerNodeReady(workerNodeKey) && workerTime < minTime) {
+ } else if (
+ this.isWorkerNodeReady(workerNodeKey) &&
+ workerTime < minTime
+ ) {
minTime = workerTime
- this.nextWorkerNodeId = workerNodeKey
+ this.nextWorkerNodeKey = workerNodeKey
}
}
- return this.nextWorkerNodeId
- }
-
- /** @inheritDoc */
- public remove (): boolean {
- return true
}
}
/** @inheritDoc */
public choose (): number {
+ this.leastEluNextWorkerNodeKey()
+ return this.nextWorkerNodeKey
+ }
+
+ /** @inheritDoc */
+ public remove (): boolean {
+ return true
+ }
+
+ private leastEluNextWorkerNodeKey (): void {
let minWorkerElu = Infinity
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
const workerUsage = workerNode.usage
const workerElu = workerUsage.elu?.active?.aggregate ?? 0
- if (this.workerNodeReady(workerNodeKey) && workerElu === 0) {
- this.nextWorkerNodeId = workerNodeKey
+ if (this.isWorkerNodeReady(workerNodeKey) && workerElu === 0) {
+ this.nextWorkerNodeKey = workerNodeKey
break
} else if (
- this.workerNodeReady(workerNodeKey) &&
+ this.isWorkerNodeReady(workerNodeKey) &&
workerElu < minWorkerElu
) {
minWorkerElu = workerElu
- this.nextWorkerNodeId = workerNodeKey
+ this.nextWorkerNodeKey = workerNodeKey
}
}
- return this.nextWorkerNodeId
- }
-
- /** @inheritDoc */
- public remove (): boolean {
- return true
}
}
/** @inheritDoc */
public choose (): number {
+ this.leastUsedNextWorkerNodeKey()
+ return this.nextWorkerNodeKey
+ }
+
+ /** @inheritDoc */
+ public remove (): boolean {
+ return true
+ }
+
+ private leastUsedNextWorkerNodeKey (): void {
let minNumberOfTasks = Infinity
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
const workerTaskStatistics = workerNode.usage.tasks
workerTaskStatistics.executed +
workerTaskStatistics.executing +
workerTaskStatistics.queued
- if (this.workerNodeReady(workerNodeKey) && workerTasks === 0) {
- this.nextWorkerNodeId = workerNodeKey
+ if (this.isWorkerNodeReady(workerNodeKey) && workerTasks === 0) {
+ this.nextWorkerNodeKey = workerNodeKey
break
} else if (
- this.workerNodeReady(workerNodeKey) &&
+ this.isWorkerNodeReady(workerNodeKey) &&
workerTasks < minNumberOfTasks
) {
minNumberOfTasks = workerTasks
- this.nextWorkerNodeId = workerNodeKey
+ this.nextWorkerNodeKey = workerNodeKey
}
}
- return this.nextWorkerNodeId
- }
-
- /** @inheritDoc */
- public remove (): boolean {
- return true
}
}
/** @inheritDoc */
public reset (): boolean {
- this.nextWorkerNodeId = 0
+ this.nextWorkerNodeKey = 0
return true
}
/** @inheritDoc */
public choose (): number {
- const chosenWorkerNodeKey = this.nextWorkerNodeId
- this.nextWorkerNodeId =
- this.nextWorkerNodeId === this.pool.workerNodes.length - 1
- ? 0
- : this.nextWorkerNodeId + 1
+ const chosenWorkerNodeKey = this.nextWorkerNodeKey
+ this.roundRobinNextWorkerNodeKey()
return chosenWorkerNodeKey
}
/** @inheritDoc */
public remove (workerNodeKey: number): boolean {
- if (this.nextWorkerNodeId === workerNodeKey) {
+ if (this.nextWorkerNodeKey === workerNodeKey) {
if (this.pool.workerNodes.length === 0) {
- this.nextWorkerNodeId = 0
- } else if (this.nextWorkerNodeId > this.pool.workerNodes.length - 1) {
- this.nextWorkerNodeId = this.pool.workerNodes.length - 1
+ this.nextWorkerNodeKey = 0
+ } else if (this.nextWorkerNodeKey > this.pool.workerNodes.length - 1) {
+ this.nextWorkerNodeKey = this.pool.workerNodes.length - 1
}
}
return true
}
+
+ private roundRobinNextWorkerNodeKey (): void {
+ this.nextWorkerNodeKey =
+ this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
+ ? 0
+ : this.nextWorkerNodeKey + 1
+ }
}
/** @inheritDoc */
public reset (): boolean {
- this.nextWorkerNodeId = 0
+ this.nextWorkerNodeKey = 0
this.workerVirtualTaskRunTime = 0
return true
}
/** @inheritDoc */
public choose (): number {
- const chosenWorkerNodeKey = this.nextWorkerNodeId
- const workerVirtualTaskRunTime = this.workerVirtualTaskRunTime
- const workerWeight =
- this.opts.weights?.[chosenWorkerNodeKey] ?? this.defaultWorkerWeight
- if (workerVirtualTaskRunTime < workerWeight) {
- this.workerVirtualTaskRunTime =
- workerVirtualTaskRunTime +
- this.getWorkerTaskRunTime(chosenWorkerNodeKey)
- } else {
- this.nextWorkerNodeId =
- this.nextWorkerNodeId === this.pool.workerNodes.length - 1
- ? 0
- : this.nextWorkerNodeId + 1
- this.workerVirtualTaskRunTime = 0
- }
+ const chosenWorkerNodeKey = this.nextWorkerNodeKey
+ this.weightedRoundRobinNextWorkerNodeKey()
return chosenWorkerNodeKey
}
/** @inheritDoc */
public remove (workerNodeKey: number): boolean {
- if (this.nextWorkerNodeId === workerNodeKey) {
+ if (this.nextWorkerNodeKey === workerNodeKey) {
if (this.pool.workerNodes.length === 0) {
- this.nextWorkerNodeId = 0
- } else if (this.nextWorkerNodeId > this.pool.workerNodes.length - 1) {
- this.nextWorkerNodeId = this.pool.workerNodes.length - 1
+ this.nextWorkerNodeKey = 0
+ } else if (this.nextWorkerNodeKey > this.pool.workerNodes.length - 1) {
+ this.nextWorkerNodeKey = this.pool.workerNodes.length - 1
}
this.workerVirtualTaskRunTime = 0
}
return true
}
+
+ private weightedRoundRobinNextWorkerNodeKey (): void {
+ const workerVirtualTaskRunTime = this.workerVirtualTaskRunTime
+ const workerWeight =
+ this.opts.weights?.[this.nextWorkerNodeKey] ?? this.defaultWorkerWeight
+ if (workerVirtualTaskRunTime < workerWeight) {
+ this.workerVirtualTaskRunTime =
+ workerVirtualTaskRunTime +
+ this.getWorkerTaskRunTime(this.nextWorkerNodeKey)
+ } else {
+ this.nextWorkerNodeKey =
+ this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
+ ? 0
+ : this.nextWorkerNodeKey + 1
+ this.workerVirtualTaskRunTime = 0
+ }
+ }
}
expect(
longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
longRunningPool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBeLessThan(longRunningPool.workerNodes.length)
// We need to clean up the resources after our test
await longRunningPool.destroy()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
} else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
expect(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.ROUND_ROBIN
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
// We need to clean up the resources after our test
await pool.destroy()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.ROUND_ROBIN
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
// We need to clean up the resources after our test
await pool.destroy()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBeDefined()
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
await pool.destroy()
pool = new DynamicThreadPool(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBeDefined()
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
// We need to clean up the resources after our test
await pool.destroy()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBeDefined()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBeDefined()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBeDefined()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBeDefined()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
)
const resetResult = strategy.reset()
expect(resetResult).toBe(true)
- expect(strategy.nextWorkerNodeId).toBe(0)
+ expect(strategy.nextWorkerNodeKey).toBe(0)
expect(strategy.workerVirtualTaskRunTime).toBe(0)
})
})
expect(
longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
longRunningPool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeId
+ ).nextWorkerNodeKey
).toBeLessThan(longRunningPool.workerNodes.length)
// We need to clean up the resources after our test
await longRunningPool.destroy()