* The promise response map.
*
* - `key`: The message id of each submitted task.
- * - `value`: An object that contains the worker key, the promise resolve and reject callbacks.
+ * - `value`: An object that contains the worker, the promise resolve and reject callbacks.
*
* When we receive a message from the worker we get a map entry with the promise resolve/reject bound to the message.
*/
- protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
- new Map<string, PromiseResponseWrapper<Response>>()
+ protected promiseResponseMap: Map<
+ string,
+ PromiseResponseWrapper<Worker, Response>
+ > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
/**
* Worker choice strategy instance implementing the worker choice algorithm.
this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
this,
() => {
- const workerCreated = this.createAndSetupWorker()
- this.registerWorkerMessageListener(workerCreated, message => {
+ const createdWorker = this.createAndSetupWorker()
+ this.registerWorkerMessageListener(createdWorker, message => {
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
- this.getWorkerTasksUsage(workerCreated)?.running === 0
+ this.getWorkerTasksUsage(createdWorker)?.running === 0
) {
// Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- void this.destroyWorker(workerCreated)
+ void this.destroyWorker(createdWorker)
}
})
- return workerCreated
+ return this.getWorkerKey(createdWorker)
},
this.opts.workerChoiceStrategy
)
workerChoiceStrategy: WorkerChoiceStrategy
): void {
this.opts.workerChoiceStrategy = workerChoiceStrategy
- for (const workerItem of this.workers) {
- this.setWorker(workerItem.worker, {
+ for (const [index, workerItem] of this.workers.entries()) {
+ this.setWorker(index, workerItem.worker, {
run: 0,
running: 0,
runTime: 0,
protected internalGetBusyStatus (): boolean {
return (
this.numberOfRunningTasks >= this.numberOfWorkers &&
- this.findFreeWorker() === false
+ this.findFreeWorkerKey() === false
)
}
/** {@inheritDoc} */
- public findFreeWorker (): Worker | false {
- for (const workerItem of this.workers) {
- if (workerItem.tasksUsage.running === 0) {
- // A worker is free, return the matching worker
- return workerItem.worker
- }
- }
- return false
+ public findFreeWorkerKey (): number | false {
+ const freeWorkerKey = this.workers.findIndex(workerItem => {
+ return workerItem.tasksUsage.running === 0
+ })
+ return freeWorkerKey !== -1 ? freeWorkerKey : false
}
/** {@inheritDoc} */
public async execute (data: Data): Promise<Response> {
- const worker = this.chooseWorker()
+ const [workerKey, worker] = this.chooseWorker()
const messageId = crypto.randomUUID()
- const res = this.internalExecute(this.getWorkerKey(worker), messageId)
+ const res = this.internalExecute(workerKey, worker, messageId)
this.checkAndEmitBusy()
this.sendToWorker(worker, {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
* Hook executed after the worker task promise resolution.
* Can be overridden.
*
- * @param workerKey - The worker key.
+ * @param worker - The worker.
* @param message - The received message.
*/
protected afterPromiseResponseHook (
- workerKey: number,
+ worker: Worker,
message: MessageValue<Response>
): void {
- const workerTasksUsage = this.workers[workerKey].tasksUsage
+ const workerTasksUsage = this.getWorkerTasksUsage(worker) as TasksUsage
--workerTasksUsage.running
++workerTasksUsage.run
if (message.error != null) {
*
* The default implementation uses a round robin algorithm to distribute the load.
*
- * @returns Worker.
+ * @returns [worker key, worker].
*/
- protected chooseWorker (): Worker {
- return this.workerChoiceStrategyContext.execute()
+ protected chooseWorker (): [number, Worker] {
+ const workerKey = this.workerChoiceStrategyContext.execute()
+ return [workerKey, this.workers[workerKey].worker]
}
/**
this.removeWorker(worker)
})
- this.setWorker(worker, {
+ this.pushWorker(worker, {
run: 0,
running: 0,
runTime: 0,
} else {
promiseResponse.resolve(message.data as Response)
}
- this.afterPromiseResponseHook(promiseResponse.workerKey, message)
+ this.afterPromiseResponseHook(promiseResponse.worker, message)
this.promiseResponseMap.delete(message.id)
}
}
private async internalExecute (
workerKey: number,
+ worker: Worker,
messageId: string
): Promise<Response> {
this.beforePromiseResponseHook(workerKey)
return await new Promise<Response>((resolve, reject) => {
- this.promiseResponseMap.set(messageId, { resolve, reject, workerKey })
+ this.promiseResponseMap.set(messageId, { resolve, reject, worker })
})
}
}
}
- /** {@inheritDoc} */
- public getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
+ /**
+ * Gets worker tasks usage.
+ *
+ * @param worker - The worker.
+ * @returns The worker tasks usage.
+ */
+ private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
const workerKey = this.getWorkerKey(worker)
if (workerKey !== -1) {
return this.workers[workerKey].tasksUsage
}
/**
- * Sets the given worker.
+ * Pushes the given worker.
*
* @param worker - The worker.
* @param tasksUsage - The worker tasks usage.
*/
- private setWorker (worker: Worker, tasksUsage: TasksUsage): void {
+ private pushWorker (worker: Worker, tasksUsage: TasksUsage): void {
this.workers.push({
worker,
tasksUsage
})
}
+
+ /**
+ * Sets the given worker.
+ *
+ * @param workerKey - The worker key.
+ * @param worker - The worker.
+ * @param tasksUsage - The worker tasks usage.
+ */
+ private setWorker (
+ workerKey: number,
+ worker: Worker,
+ tasksUsage: TasksUsage
+ ): void {
+ this.workers[workerKey] = {
+ worker,
+ tasksUsage
+ }
+ }
}
readonly numberOfRunningTasks: number
/**
- * Finds a free worker based on the number of tasks the worker has applied.
+ * Finds a free worker key based on the number of tasks the worker has applied.
*
- * If a worker is found with `0` running tasks, it is detected as free and returned.
+ * If a worker is found with `0` running tasks, it is detected as free and its key is returned.
*
* If no free worker is found, `false` is returned.
*
- * @returns A free worker if there is one, otherwise `false`.
+ * @returns A worker key if there is one, otherwise `false`.
*/
- findFreeWorker: () => Worker | false
-
- /**
- * Gets worker tasks usage.
- *
- * @param worker - The worker.
- * @returns The tasks usage on the worker.
- */
- getWorkerTasksUsage: (worker: Worker) => TasksUsage | undefined
+ findFreeWorkerKey: () => number | false
}
Worker extends IPoolWorker,
Data,
Response
-> implements IWorkerChoiceStrategy<Worker> {
+> implements IWorkerChoiceStrategy {
/** {@inheritDoc} */
public readonly isDynamicPool: boolean
/** {@inheritDoc} */
public abstract reset (): boolean
/** {@inheritDoc} */
- public abstract choose (): Worker
+ public abstract choose (): number
}
Data,
Response
> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
- private readonly workerChoiceStrategy: IWorkerChoiceStrategy<Worker>
+ private readonly workerChoiceStrategy: IWorkerChoiceStrategy
/**
* Constructs a worker choice strategy for dynamic pool.
*
* @param pool - The pool instance.
- * @param createDynamicallyWorkerCallback - The worker creation callback for dynamic pool.
- * @param workerChoiceStrategy - The worker choice strategy when the pull is busy.
+ * @param createWorkerCallback - The worker creation callback for dynamic pool.
+ * @param workerChoiceStrategy - The worker choice strategy when the pool is busy.
*/
public constructor (
pool: IPoolInternal<Worker, Data, Response>,
- private readonly createDynamicallyWorkerCallback: () => Worker,
+ private readonly createWorkerCallback: () => number,
workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
) {
super(pool)
}
/** {@inheritDoc} */
- public choose (): Worker {
- const freeWorker = this.pool.findFreeWorker()
- if (freeWorker !== false) {
- return freeWorker
+ public choose (): number {
+ const freeWorkerKey = this.pool.findFreeWorkerKey()
+ if (freeWorkerKey !== false) {
+ return freeWorkerKey
}
if (this.pool.busy) {
}
// All workers are busy, create a new worker
- return this.createDynamicallyWorkerCallback()
+ return this.createWorkerCallback()
}
}
* Worker last virtual task execution timestamp.
*/
private readonly workerLastVirtualTaskTimestamp: Map<
- Worker,
+ number,
WorkerVirtualTaskTimestamp
- > = new Map<Worker, WorkerVirtualTaskTimestamp>()
+ > = new Map<number, WorkerVirtualTaskTimestamp>()
/** {@inheritDoc} */
public reset (): boolean {
}
/** {@inheritDoc} */
- public choose (): Worker {
+ public choose (): number {
let minWorkerVirtualTaskEndTimestamp = Infinity
- let chosenWorker!: Worker
- for (const workerItem of this.pool.workers) {
- const worker = workerItem.worker
- this.computeWorkerLastVirtualTaskTimestamp(worker)
+ let chosenWorkerKey!: number
+ for (const [index] of this.pool.workers.entries()) {
+ this.computeWorkerLastVirtualTaskTimestamp(index)
const workerLastVirtualTaskEndTimestamp =
- this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? 0
+ this.workerLastVirtualTaskTimestamp.get(index)?.end ?? 0
if (
workerLastVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
) {
minWorkerVirtualTaskEndTimestamp = workerLastVirtualTaskEndTimestamp
- chosenWorker = worker
+ chosenWorkerKey = index
}
}
- return chosenWorker
+ return chosenWorkerKey
}
/**
* Computes worker last virtual task timestamp.
*
- * @param worker - The worker.
+ * @param workerKey - The worker key.
*/
- private computeWorkerLastVirtualTaskTimestamp (worker: Worker): void {
+ private computeWorkerLastVirtualTaskTimestamp (workerKey: number): void {
const workerVirtualTaskStartTimestamp = Math.max(
Date.now(),
- this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? -Infinity
+ this.workerLastVirtualTaskTimestamp.get(workerKey)?.end ?? -Infinity
)
- this.workerLastVirtualTaskTimestamp.set(worker, {
+ this.workerLastVirtualTaskTimestamp.set(workerKey, {
start: workerVirtualTaskStartTimestamp,
end:
workerVirtualTaskStartTimestamp +
- (this.pool.getWorkerTasksUsage(worker)?.avgRunTime ?? 0)
+ (this.pool.workers[workerKey].tasksUsage.avgRunTime ?? 0)
})
}
}
}
/** {@inheritDoc} */
- public choose (): Worker {
+ public choose (): number {
let minRunTime = Infinity
- let lessBusyWorker!: Worker
- for (const workerItem of this.pool.workers) {
- const worker = workerItem.worker
- const workerRunTime = this.pool.getWorkerTasksUsage(worker)
- ?.runTime as number
+ let lessBusyWorkerKey!: number
+ for (const [index, workerItem] of this.pool.workers.entries()) {
+ const workerRunTime = workerItem.tasksUsage.runTime
if (!this.isDynamicPool && workerRunTime === 0) {
- return worker
+ return index
} else if (workerRunTime < minRunTime) {
minRunTime = workerRunTime
- lessBusyWorker = worker
+ lessBusyWorkerKey = index
}
}
- return lessBusyWorker
+ return lessBusyWorkerKey
}
}
}
/** {@inheritDoc} */
- public choose (): Worker {
+ public choose (): number {
let minNumberOfTasks = Infinity
- let lessUsedWorker!: Worker
- for (const workerItem of this.pool.workers) {
- const worker = workerItem.worker
- const tasksUsage = this.pool.getWorkerTasksUsage(worker)
- const workerTasks =
- (tasksUsage?.run as number) + (tasksUsage?.running as number)
+ let lessUsedWorkerKey!: number
+ for (const [index, workerItem] of this.pool.workers.entries()) {
+ const tasksUsage = workerItem.tasksUsage
+ const workerTasks = tasksUsage?.run + tasksUsage?.running
if (!this.isDynamicPool && workerTasks === 0) {
- return worker
+ return index
} else if (workerTasks < minNumberOfTasks) {
minNumberOfTasks = workerTasks
- lessUsedWorker = worker
+ lessUsedWorkerKey = index
}
}
- return lessUsedWorker
+ return lessUsedWorkerKey
}
}
}
/** {@inheritDoc} */
- public choose (): Worker {
- const chosenWorker = this.pool.workers[this.nextWorkerId].worker
+ public choose (): number {
+ const chosenWorkerKey = this.nextWorkerId
this.nextWorkerId =
this.nextWorkerId === this.pool.workers.length - 1
? 0
: this.nextWorkerId + 1
- return chosenWorker
+ return chosenWorkerKey
}
}
-import type { IPoolWorker } from '../pool-worker'
-
/**
* Enumeration of worker choice strategies.
*/
/**
* Worker choice strategy interface.
- *
- * @typeParam Worker - Type of worker which manages the strategy.
*/
-export interface IWorkerChoiceStrategy<Worker extends IPoolWorker> {
+export interface IWorkerChoiceStrategy {
/**
* Is the pool attached to the strategy dynamic?.
*/
*/
reset: () => boolean
/**
- * Chooses a worker in the pool.
+ * Chooses a worker in the pool and returns its key.
*/
- choose: () => Worker
+ choose: () => number
}
> (
pool: IPoolInternal<Worker, Data, Response>,
workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
-): IWorkerChoiceStrategy<Worker> {
+): IWorkerChoiceStrategy {
switch (workerChoiceStrategy) {
case WorkerChoiceStrategies.ROUND_ROBIN:
- return new RoundRobinWorkerChoiceStrategy(pool)
+ return new RoundRobinWorkerChoiceStrategy<Worker, Data, Response>(pool)
case WorkerChoiceStrategies.LESS_USED:
- return new LessUsedWorkerChoiceStrategy(pool)
+ return new LessUsedWorkerChoiceStrategy<Worker, Data, Response>(pool)
case WorkerChoiceStrategies.LESS_BUSY:
- return new LessBusyWorkerChoiceStrategy(pool)
+ return new LessBusyWorkerChoiceStrategy<Worker, Data, Response>(pool)
case WorkerChoiceStrategies.FAIR_SHARE:
- return new FairShareWorkerChoiceStrategy(pool)
+ return new FairShareWorkerChoiceStrategy<Worker, Data, Response>(pool)
case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN:
- return new WeightedRoundRobinWorkerChoiceStrategy(pool)
+ return new WeightedRoundRobinWorkerChoiceStrategy<Worker, Data, Response>(
+ pool
+ )
default:
throw new Error(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
/**
* Per worker virtual task runtime map.
*/
- private readonly workersTaskRunTime: Map<Worker, TaskRunTime> = new Map<
- Worker,
+ private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map<
+ number,
TaskRunTime
>()
}
/** {@inheritDoc} */
- public choose (): Worker {
- const chosenWorker = this.pool.workers[this.currentWorkerId].worker
- if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorker)) {
- this.initWorkerTaskRunTime(chosenWorker)
+ public choose (): number {
+ const chosenWorkerKey = this.currentWorkerId
+ if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorkerKey)) {
+ this.initWorkerTaskRunTime(chosenWorkerKey)
}
const workerTaskRunTime =
- this.workersTaskRunTime.get(chosenWorker)?.runTime ?? 0
+ this.workersTaskRunTime.get(chosenWorkerKey)?.runTime ?? 0
const workerTaskWeight =
- this.workersTaskRunTime.get(chosenWorker)?.weight ??
+ this.workersTaskRunTime.get(chosenWorkerKey)?.weight ??
this.defaultWorkerWeight
if (workerTaskRunTime < workerTaskWeight) {
this.setWorkerTaskRunTime(
- chosenWorker,
+ chosenWorkerKey,
workerTaskWeight,
workerTaskRunTime +
- (this.getWorkerVirtualTaskRunTime(chosenWorker) ?? 0)
+ (this.getWorkerVirtualTaskRunTime(chosenWorkerKey) ?? 0)
)
} else {
this.currentWorkerId =
this.currentWorkerId === this.pool.workers.length - 1
? 0
: this.currentWorkerId + 1
- this.setWorkerTaskRunTime(
- this.pool.workers[this.currentWorkerId].worker,
- workerTaskWeight,
- 0
- )
+ this.setWorkerTaskRunTime(this.currentWorkerId, workerTaskWeight, 0)
}
- return chosenWorker
+ return chosenWorkerKey
}
private initWorkersTaskRunTime (): void {
- for (const workerItem of this.pool.workers) {
- this.initWorkerTaskRunTime(workerItem.worker)
+ for (const [index] of this.pool.workers.entries()) {
+ this.initWorkerTaskRunTime(index)
}
}
- private initWorkerTaskRunTime (worker: Worker): void {
- this.setWorkerTaskRunTime(worker, this.defaultWorkerWeight, 0)
+ private initWorkerTaskRunTime (workerKey: number): void {
+ this.setWorkerTaskRunTime(workerKey, this.defaultWorkerWeight, 0)
}
private setWorkerTaskRunTime (
- worker: Worker,
+ workerKey: number,
weight: number,
runTime: number
): void {
- this.workersTaskRunTime.set(worker, {
+ this.workersTaskRunTime.set(workerKey, {
weight,
runTime
})
}
- private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined {
- return this.pool.getWorkerTasksUsage(worker)?.avgRunTime
+ private getWorkerVirtualTaskRunTime (workerKey: number): number {
+ return this.pool.workers[workerKey].tasksUsage.avgRunTime
}
private computeWorkerWeight (): number {
Data,
Response
> {
- private workerChoiceStrategy!: IWorkerChoiceStrategy<Worker>
+ private workerChoiceStrategy!: IWorkerChoiceStrategy
/**
* Worker choice strategy context constructor.
*
* @param pool - The pool instance.
- * @param createDynamicallyWorkerCallback - The worker creation callback for dynamic pool.
+ * @param createWorkerCallback - The worker creation callback for dynamic pool.
* @param workerChoiceStrategy - The worker choice strategy.
*/
public constructor (
private readonly pool: IPoolInternal<Worker, Data, Response>,
- private readonly createDynamicallyWorkerCallback: () => Worker,
+ private readonly createWorkerCallback: () => number,
workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
) {
this.setWorkerChoiceStrategy(workerChoiceStrategy)
*/
private getPoolWorkerChoiceStrategy (
workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
- ): IWorkerChoiceStrategy<Worker> {
+ ): IWorkerChoiceStrategy {
if (this.pool.type === PoolType.DYNAMIC) {
return new DynamicPoolWorkerChoiceStrategy(
this.pool,
- this.createDynamicallyWorkerCallback,
+ this.createWorkerCallback,
workerChoiceStrategy
)
}
*
* @returns The worker choice strategy.
*/
- public getWorkerChoiceStrategy (): IWorkerChoiceStrategy<Worker> {
+ public getWorkerChoiceStrategy (): IWorkerChoiceStrategy {
return this.workerChoiceStrategy
}
/**
* Chooses a worker with the underlying selection strategy.
*
- * @returns The chosen one.
+ * @returns The key of the chosen one.
*/
- public execute (): Worker {
+ public execute (): number {
return this.workerChoiceStrategy.choose()
}
}
import type { Worker as ClusterWorker } from 'node:cluster'
import type { MessagePort } from 'node:worker_threads'
import type { KillBehavior } from './worker/worker-options'
+import type { IPoolWorker } from './pools/pool-worker'
/**
* Make all properties in T non-readonly.
*
* @typeParam Response - Type of execution response. This can only be serializable data.
*/
-export interface PromiseResponseWrapper<Response = unknown> {
+export interface PromiseResponseWrapper<
+ Worker extends IPoolWorker,
+ Response = unknown
+> {
/**
* Resolve callback to fulfill the promise.
*/
*/
readonly reject: (reason?: string) => void
/**
- * The worker handling the promise key .
+ * The worker handling the promise.
*/
- readonly workerKey: number
+ readonly worker: Worker
}
class StubPoolWithRemoveAllWorker extends FixedThreadPool {
removeAllWorker () {
this.workers = []
- this.promiseMap.clear()
+ this.promiseResponseMap.clear()
}
}
class StubPoolWithIsMain extends FixedThreadPool {
expect(workerItem.tasksUsage.running).toBe(0)
expect(workerItem.tasksUsage.runTime).toBe(0)
expect(workerItem.tasksUsage.avgRunTime).toBe(0)
+ expect(workerItem.tasksUsage.error).toBe(0)
}
await pool.destroy()
})
expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2)
expect(workerItem.tasksUsage.runTime).toBe(0)
expect(workerItem.tasksUsage.avgRunTime).toBe(0)
+ expect(workerItem.tasksUsage.error).toBe(0)
}
await Promise.all(promises)
for (const workerItem of pool.workers) {
expect(workerItem.tasksUsage.running).toBe(0)
expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+ expect(workerItem.tasksUsage.error).toBe(0)
}
await pool.destroy()
})
expect(workerItem.tasksUsage.running).toBe(0)
expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+ expect(workerItem.tasksUsage.error).toBe(0)
}
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
for (const workerItem of pool.workers) {
expect(workerItem.tasksUsage.running).toBe(0)
expect(workerItem.tasksUsage.runTime).toBe(0)
expect(workerItem.tasksUsage.avgRunTime).toBe(0)
+ expect(workerItem.tasksUsage.error).toBe(0)
}
await pool.destroy()
})
)
let results = new Set()
for (let i = 0; i < max; i++) {
- results.add(pool.chooseWorker().id)
+ results.add(pool.chooseWorker()[1].id)
}
expect(results.size).toBe(max)
await pool.destroy()
pool = new FixedThreadPool(max, './tests/worker-files/thread/testWorker.js')
results = new Set()
for (let i = 0; i < max; i++) {
- results.add(pool.chooseWorker().threadId)
+ results.add(pool.chooseWorker()[1].threadId)
}
expect(results.size).toBe(max)
await pool.destroy()
expect(pool.opts.workerChoiceStrategy).toBe(
WorkerChoiceStrategies.FAIR_SHARE
)
- for (const worker of pool.workerChoiceStrategyContext
+ for (const workerKey of pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
.workerLastVirtualTaskTimestamp.keys()) {
expect(
pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
- .workerLastVirtualTaskTimestamp.get(worker).start
+ .workerLastVirtualTaskTimestamp.get(workerKey).start
).toBe(0)
expect(
pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
- .workerLastVirtualTaskTimestamp.get(worker).end
+ .workerLastVirtualTaskTimestamp.get(workerKey).end
).toBe(0)
}
// We need to clean up the resources after our test
.workerLastVirtualTaskTimestamp
).toBeUndefined()
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
- for (const worker of pool.workerChoiceStrategyContext
+ for (const workerKey of pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
.workerLastVirtualTaskTimestamp.keys()) {
expect(
pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
- .workerLastVirtualTaskTimestamp.get(worker).start
+ .workerLastVirtualTaskTimestamp.get(workerKey).start
).toBe(0)
expect(
pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
- .workerLastVirtualTaskTimestamp.get(worker).end
+ .workerLastVirtualTaskTimestamp.get(workerKey).end
).toBe(0)
}
await pool.destroy()
.workerChoiceStrategy.workerLastVirtualTaskTimestamp
).toBeUndefined()
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
- for (const worker of pool.workerChoiceStrategyContext
+ for (const workerKey of pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
.workerChoiceStrategy.workerLastVirtualTaskTimestamp.keys()) {
expect(
pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
- .workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(worker).start
+ .workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(workerKey)
+ .start
).toBe(0)
expect(
pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
- .workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(worker).end
+ .workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(workerKey)
+ .end
).toBe(0)
}
// We need to clean up the resources after our test
pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
.defaultWorkerWeight
).toBeGreaterThan(0)
- for (const worker of pool.workerChoiceStrategyContext
+ for (const workerKey of pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
.workersTaskRunTime.keys()) {
expect(
pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
- .workersTaskRunTime.get(worker).weight
+ .workersTaskRunTime.get(workerKey).weight
).toBeGreaterThan(0)
expect(
pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
- .workersTaskRunTime.get(worker).runTime
+ .workersTaskRunTime.get(workerKey).runTime
).toBe(0)
}
// We need to clean up the resources after our test
pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
.defaultWorkerWeight
).toBeGreaterThan(0)
- for (const worker of pool.workerChoiceStrategyContext
+ for (const workerKey of pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
.workersTaskRunTime.keys()) {
expect(
pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
- .workersTaskRunTime.get(worker).runTime
+ .workersTaskRunTime.get(workerKey).runTime
).toBe(0)
}
await pool.destroy()
pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
.workerChoiceStrategy.defaultWorkerWeight
).toBeGreaterThan(0)
- for (const worker of pool.workerChoiceStrategyContext
+ for (const workerKey of pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
.workerChoiceStrategy.workersTaskRunTime.keys()) {
expect(
pool.workerChoiceStrategyContext
.getWorkerChoiceStrategy()
- .workerChoiceStrategy.workersTaskRunTime.get(worker).runTime
+ .workerChoiceStrategy.workersTaskRunTime.get(workerKey).runTime
).toBe(0)
}
// We need to clean up the resources after our test
const WorkerChoiceStrategyStub = sinon.createStubInstance(
RoundRobinWorkerChoiceStrategy,
{
- choose: sinon.stub().returns('worker')
+ choose: sinon.stub().returns(0)
}
)
workerChoiceStrategyContext.workerChoiceStrategy = WorkerChoiceStrategyStub
- const chosenWorker = workerChoiceStrategyContext.execute()
+ const chosenWorkerKey = workerChoiceStrategyContext.execute()
expect(
workerChoiceStrategyContext.getWorkerChoiceStrategy().choose.calledOnce
).toBe(true)
- expect(chosenWorker).toBe('worker')
+ expect(chosenWorkerKey).toBe(0)
})
it('Verify that execute() return the worker chosen by the strategy with dynamic pool', () => {
const WorkerChoiceStrategyStub = sinon.createStubInstance(
RoundRobinWorkerChoiceStrategy,
{
- choose: sinon.stub().returns('worker')
+ choose: sinon.stub().returns(0)
}
)
workerChoiceStrategyContext.workerChoiceStrategy = WorkerChoiceStrategyStub
- const chosenWorker = workerChoiceStrategyContext.execute()
+ const chosenWorkerKey = workerChoiceStrategyContext.execute()
expect(
workerChoiceStrategyContext.getWorkerChoiceStrategy().choose.calledOnce
).toBe(true)
- expect(chosenWorker).toBe('worker')
+ expect(chosenWorkerKey).toBe(0)
})
it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and fixed pool', () => {