Response = unknown
> implements IPoolInternal<Worker, Data, Response> {
/** {@inheritDoc} */
- public readonly workers: Map<number, WorkerType<Worker>> = new Map<
- number,
- WorkerType<Worker>
- >()
+ public readonly workers: Array<WorkerType<Worker>> = []
/** {@inheritDoc} */
public readonly emitter?: PoolEmitter
- /**
- * Id of the next worker.
- */
- protected nextWorkerId: number = 0
-
/**
* The promise map.
*
* @param worker - The worker.
* @returns The worker key.
*/
- private getWorkerKey (worker: Worker): number | undefined {
- return [...this.workers].find(([, value]) => value.worker === worker)?.[0]
+ private getWorkerKey (worker: Worker): number {
+ return this.workers.findIndex(workerItem => workerItem.worker === worker)
}
/** {@inheritDoc} */
workerChoiceStrategy: WorkerChoiceStrategy
): void {
this.opts.workerChoiceStrategy = workerChoiceStrategy
- for (const [key, value] of this.workers) {
- this.setWorker(key, value.worker, {
+ for (const workerItem of this.workers) {
+ this.setWorker(workerItem.worker, {
run: 0,
running: 0,
runTime: 0,
/** {@inheritDoc} */
public findFreeWorker (): Worker | false {
- for (const value of this.workers.values()) {
- if (value.tasksUsage.running === 0) {
+ for (const workerItem of this.workers) {
+ if (workerItem.tasksUsage.running === 0) {
// A worker is free, return the matching worker
- return value.worker
+ return workerItem.worker
}
}
return false
/** {@inheritDoc} */
public async destroy (): Promise<void> {
await Promise.all(
- [...this.workers].map(async ([, value]) => {
- await this.destroyWorker(value.worker)
+ this.workers.map(async workerItem => {
+ await this.destroyWorker(workerItem.worker)
})
)
}
* @param worker - The worker that will be removed.
*/
protected removeWorker (worker: Worker): void {
- this.workers.delete(this.getWorkerKey(worker) as number)
- --this.nextWorkerId
+ this.workers.splice(this.getWorkerKey(worker), 1)
}
/**
this.removeWorker(worker)
})
- this.setWorker(this.nextWorkerId, worker, {
+ this.setWorker(worker, {
run: 0,
running: 0,
runTime: 0,
avgRunTime: 0
})
- ++this.nextWorkerId
this.afterWorkerSetup(worker)
/** {@inheritDoc} */
public getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
const workerKey = this.getWorkerKey(worker)
- if (workerKey !== undefined) {
- return (this.workers.get(workerKey) as WorkerType<Worker>).tasksUsage
+ if (workerKey !== -1) {
+ return this.workers[workerKey].tasksUsage
}
throw new Error('Worker could not be found in the pool')
}
/**
* Sets the given worker.
*
- * @param workerKey - The worker key.
* @param worker - The worker.
* @param tasksUsage - The worker tasks usage.
*/
- private setWorker (
- workerKey: number,
- worker: Worker,
- tasksUsage: TasksUsage
- ): void {
- this.workers.set(workerKey, {
+ private setWorker (worker: Worker, tasksUsage: TasksUsage): void {
+ this.workers.push({
worker,
tasksUsage
})
/** {@inheritDoc} */
public get busy (): boolean {
- return this.workers.size === this.max
+ return this.workers.length === this.max
}
}
Response = unknown
> extends IPool<Data, Response> {
/**
- * Pool workers map.
+ * Pool workers item array.
*/
- readonly workers: Map<number, WorkerType<Worker>>
+ readonly workers: Array<WorkerType<Worker>>
/**
* Pool type.
public choose (): Worker {
let minWorkerVirtualTaskEndTimestamp = Infinity
let chosenWorker!: Worker
- for (const value of this.pool.workers.values()) {
- const worker = value.worker
+ for (const workerItem of this.pool.workers) {
+ const worker = workerItem.worker
this.computeWorkerLastVirtualTaskTimestamp(worker)
const workerLastVirtualTaskEndTimestamp =
this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? 0
public choose (): Worker {
let minRunTime = Infinity
let lessBusyWorker!: Worker
- for (const value of this.pool.workers.values()) {
- const worker = value.worker
+ for (const workerItem of this.pool.workers) {
+ const worker = workerItem.worker
const workerRunTime = this.pool.getWorkerTasksUsage(worker)
?.runTime as number
if (!this.isDynamicPool && workerRunTime === 0) {
public choose (): Worker {
let minNumberOfTasks = Infinity
let lessUsedWorker!: Worker
- for (const value of this.pool.workers.values()) {
- const worker = value.worker
+ for (const workerItem of this.pool.workers) {
+ const worker = workerItem.worker
const tasksUsage = this.pool.getWorkerTasksUsage(worker)
const workerTasks =
(tasksUsage?.run as number) + (tasksUsage?.running as number)
/** {@inheritDoc} */
public choose (): Worker {
- const chosenWorker = this.pool.workers.get(this.nextWorkerId)
- ?.worker as Worker
+ const chosenWorker = this.pool.workers[this.nextWorkerId]?.worker
this.nextWorkerId =
- this.nextWorkerId === this.pool.workers.size - 1
+ this.nextWorkerId === this.pool.workers.length - 1
? 0
: this.nextWorkerId + 1
return chosenWorker
/** {@inheritDoc} */
public choose (): Worker {
- const chosenWorker = this.pool.workers.get(this.currentWorkerId)
- ?.worker as Worker
+ const chosenWorker = this.pool.workers[this.currentWorkerId]?.worker
if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorker)) {
this.initWorkerTaskRunTime(chosenWorker)
}
)
} else {
this.currentWorkerId =
- this.currentWorkerId === this.pool.workers.size - 1
+ this.currentWorkerId === this.pool.workers.length - 1
? 0
: this.currentWorkerId + 1
this.setWorkerTaskRunTime(
- this.pool.workers.get(this.currentWorkerId)?.worker as Worker,
+ this.pool.workers[this.currentWorkerId]?.worker,
workerTaskWeight,
0
)
}
private initWorkersTaskRunTime (): void {
- for (const value of this.pool.workers.values()) {
- this.initWorkerTaskRunTime(value.worker)
+ for (const workerItem of this.pool.workers) {
+ this.initWorkerTaskRunTime(workerItem.worker)
}
}
/** {@inheritDoc} */
public get busy (): boolean {
- return this.workers.size === this.max
+ return this.workers.length === this.max
}
}
)
class StubPoolWithRemoveAllWorker extends FixedThreadPool {
removeAllWorker () {
- this.workers = new Map()
+ this.workers = []
this.promiseMap.clear()
}
}
numberOfWorkers,
'./tests/worker-files/cluster/testWorker.js'
)
- for (const value of pool.workers.values()) {
- expect(value.tasksUsage).toBeDefined()
- expect(value.tasksUsage.run).toBe(0)
- expect(value.tasksUsage.running).toBe(0)
- expect(value.tasksUsage.runTime).toBe(0)
- expect(value.tasksUsage.avgRunTime).toBe(0)
+ for (const workerItem of pool.workers) {
+ expect(workerItem.tasksUsage).toBeDefined()
+ expect(workerItem.tasksUsage.run).toBe(0)
+ expect(workerItem.tasksUsage.running).toBe(0)
+ expect(workerItem.tasksUsage.runTime).toBe(0)
+ expect(workerItem.tasksUsage.avgRunTime).toBe(0)
}
await pool.destroy()
})
for (let i = 0; i < numberOfWorkers * 2; i++) {
promises.push(pool.execute())
}
- for (const value of pool.workers.values()) {
- expect(value.tasksUsage).toBeDefined()
- expect(value.tasksUsage.run).toBe(0)
- expect(value.tasksUsage.running).toBe(numberOfWorkers * 2)
- expect(value.tasksUsage.runTime).toBe(0)
- expect(value.tasksUsage.avgRunTime).toBe(0)
+ for (const workerItem of pool.workers) {
+ expect(workerItem.tasksUsage).toBeDefined()
+ expect(workerItem.tasksUsage.run).toBe(0)
+ expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2)
+ expect(workerItem.tasksUsage.runTime).toBe(0)
+ expect(workerItem.tasksUsage.avgRunTime).toBe(0)
}
await Promise.all(promises)
- for (const value of pool.workers.values()) {
- expect(value.tasksUsage).toBeDefined()
- expect(value.tasksUsage.run).toBe(numberOfWorkers * 2)
- expect(value.tasksUsage.running).toBe(0)
- expect(value.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
- expect(value.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+ for (const workerItem of pool.workers) {
+ expect(workerItem.tasksUsage).toBeDefined()
+ expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
+ expect(workerItem.tasksUsage.running).toBe(0)
+ expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
+ expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
}
await pool.destroy()
})
promises.push(pool.execute())
}
await Promise.all(promises)
- for (const value of pool.workers.values()) {
- expect(value.tasksUsage).toBeDefined()
- expect(value.tasksUsage.run).toBe(numberOfWorkers * 2)
- expect(value.tasksUsage.running).toBe(0)
- expect(value.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
- expect(value.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+ for (const workerItem of pool.workers) {
+ expect(workerItem.tasksUsage).toBeDefined()
+ expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
+ expect(workerItem.tasksUsage.running).toBe(0)
+ expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
+ expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
}
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
- for (const value of pool.workers.values()) {
- expect(value.tasksUsage).toBeDefined()
- expect(value.tasksUsage.run).toBe(0)
- expect(value.tasksUsage.running).toBe(0)
- expect(value.tasksUsage.runTime).toBe(0)
- expect(value.tasksUsage.avgRunTime).toBe(0)
+ for (const workerItem of pool.workers) {
+ expect(workerItem.tasksUsage).toBeDefined()
+ expect(workerItem.tasksUsage.run).toBe(0)
+ expect(workerItem.tasksUsage.running).toBe(0)
+ expect(workerItem.tasksUsage.runTime).toBe(0)
+ expect(workerItem.tasksUsage.avgRunTime).toBe(0)
}
await pool.destroy()
})
for (let i = 0; i < max * 2; i++) {
pool.execute()
}
- expect(pool.workers.size).toBeLessThanOrEqual(max)
- expect(pool.workers.size).toBeGreaterThan(min)
+ expect(pool.workers.length).toBeLessThanOrEqual(max)
+ expect(pool.workers.length).toBeGreaterThan(min)
// The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
// So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
expect(poolBusy).toBe(max + 1)
})
it('Verify scale worker up and down is working', async () => {
- expect(pool.workers.size).toBe(min)
+ expect(pool.workers.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
pool.execute()
}
- expect(pool.workers.size).toBeGreaterThan(min)
+ expect(pool.workers.length).toBeGreaterThan(min)
await TestUtils.waitExits(pool, max - min)
- expect(pool.workers.size).toBe(min)
+ expect(pool.workers.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
pool.execute()
}
- expect(pool.workers.size).toBeGreaterThan(min)
+ expect(pool.workers.length).toBeGreaterThan(min)
await TestUtils.waitExits(pool, max - min)
- expect(pool.workers.size).toBe(min)
+ expect(pool.workers.length).toBe(min)
})
it('Shutdown test', async () => {
exitHandler: () => console.log('long running worker exited')
}
)
- expect(longRunningPool.workers.size).toBe(min)
+ expect(longRunningPool.workers.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
longRunningPool.execute()
}
- expect(longRunningPool.workers.size).toBe(max)
+ expect(longRunningPool.workers.length).toBe(max)
await TestUtils.waitExits(longRunningPool, max - min)
// Here we expect the workers to be at the max size since that the task is still running
- expect(longRunningPool.workers.size).toBe(min)
+ expect(longRunningPool.workers.length).toBe(min)
// We need to clean up the resources after our test
await longRunningPool.destroy()
})
exitHandler: () => console.log('long running worker exited')
}
)
- expect(longRunningPool.workers.size).toBe(min)
+ expect(longRunningPool.workers.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
longRunningPool.execute()
}
- expect(longRunningPool.workers.size).toBe(max)
+ expect(longRunningPool.workers.length).toBe(max)
await TestUtils.sleep(1500)
// Here we expect the workers to be at the max size since that the task is still running
- expect(longRunningPool.workers.size).toBe(max)
+ expect(longRunningPool.workers.length).toBe(max)
// We need to clean up the resources after our test
await longRunningPool.destroy()
})
for (let i = 0; i < max * 2; i++) {
pool.execute()
}
- expect(pool.workers.size).toBeLessThanOrEqual(max)
- expect(pool.workers.size).toBeGreaterThan(min)
+ expect(pool.workers.length).toBeLessThanOrEqual(max)
+ expect(pool.workers.length).toBeGreaterThan(min)
// The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
// So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
expect(poolBusy).toBe(max + 1)
})
it('Verify scale thread up and down is working', async () => {
- expect(pool.workers.size).toBe(min)
+ expect(pool.workers.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
pool.execute()
}
- expect(pool.workers.size).toBe(max)
+ expect(pool.workers.length).toBe(max)
await TestUtils.waitExits(pool, max - min)
- expect(pool.workers.size).toBe(min)
+ expect(pool.workers.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
pool.execute()
}
- expect(pool.workers.size).toBe(max)
+ expect(pool.workers.length).toBe(max)
await TestUtils.waitExits(pool, max - min)
- expect(pool.workers.size).toBe(min)
+ expect(pool.workers.length).toBe(min)
})
it('Shutdown test', async () => {
exitHandler: () => console.log('long running worker exited')
}
)
- expect(longRunningPool.workers.size).toBe(min)
+ expect(longRunningPool.workers.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
longRunningPool.execute()
}
- expect(longRunningPool.workers.size).toBe(max)
+ expect(longRunningPool.workers.length).toBe(max)
await TestUtils.waitExits(longRunningPool, max - min)
- expect(longRunningPool.workers.size).toBe(min)
+ expect(longRunningPool.workers.length).toBe(min)
// We need to clean up the resources after our test
await longRunningPool.destroy()
})
exitHandler: () => console.log('long running worker exited')
}
)
- expect(longRunningPool.workers.size).toBe(min)
+ expect(longRunningPool.workers.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
longRunningPool.execute()
}
- expect(longRunningPool.workers.size).toBe(max)
+ expect(longRunningPool.workers.length).toBe(max)
await TestUtils.sleep(1500)
// Here we expect the workers to be at the max size since that the task is still running
- expect(longRunningPool.workers.size).toBe(max)
+ expect(longRunningPool.workers.length).toBe(max)
// We need to clean up the resources after our test
await longRunningPool.destroy()
})
static async waitExits (pool, numberOfExitEventsToWait) {
return new Promise(resolve => {
let exitEvents = 0
- for (const value of pool.workers.values()) {
- value.worker.on('exit', () => {
+ for (const workerItem of pool.workers) {
+ workerItem.worker.on('exit', () => {
++exitEvents
if (exitEvents === numberOfExitEventsToWait) {
resolve(exitEvents)