repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch 'master' of github.com:poolifier/poolifier into interleaved-weighted...
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index ab93fc007b698f56b029aa4043182c87b330ab3b..4c82ef08a8cdad70602ebf20a64235b1cb3833f0 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-146,6
+146,9
@@
export abstract class AbstractPool<
this.opts.workerChoiceStrategyOptions =
opts.workerChoiceStrategyOptions ??
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
this.opts.workerChoiceStrategyOptions =
opts.workerChoiceStrategyOptions ??
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ this.checkValidWorkerChoiceStrategyOptions(
+ this.opts.workerChoiceStrategyOptions
+ )
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
@@
-179,6
+182,14
@@
export abstract class AbstractPool<
'Invalid worker choice strategy options: must be a plain object'
)
}
'Invalid worker choice strategy options: must be a plain object'
)
}
+ if (
+ workerChoiceStrategyOptions.weights != null &&
+ Object.keys(workerChoiceStrategyOptions.weights).length !== this.size
+ ) {
+ throw new Error(
+ 'Invalid worker choice strategy options: must have a weight for each worker node'
+ )
+ }
}
private checkValidTasksQueueOptions (
}
private checkValidTasksQueueOptions (
@@
-329,7
+340,7
@@
export abstract class AbstractPool<
/** @inheritDoc */
public async execute (data?: Data, name?: string): Promise<Response> {
/** @inheritDoc */
public async execute (data?: Data, name?: string): Promise<Response> {
- const
[workerNodeKey, workerNode]
= this.chooseWorkerNode()
+ const
workerNodeKey
= this.chooseWorkerNode()
const submittedTask: Task<Data> = {
name,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const submittedTask: Task<Data> = {
name,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
@@
-340,7
+351,7
@@
export abstract class AbstractPool<
this.promiseResponseMap.set(submittedTask.id as string, {
resolve,
reject,
this.promiseResponseMap.set(submittedTask.id as string, {
resolve,
reject,
- worker:
workerNode
.worker
+ worker:
this.workerNodes[workerNodeKey]
.worker
})
})
if (
})
})
if (
@@
-354,6
+365,7
@@
export abstract class AbstractPool<
} else {
this.executeTask(workerNodeKey, submittedTask)
}
} else {
this.executeTask(workerNodeKey, submittedTask)
}
+ this.workerChoiceStrategyContext.update(workerNodeKey)
this.checkAndEmitEvents()
// eslint-disable-next-line @typescript-eslint/return-await
return res
this.checkAndEmitEvents()
// eslint-disable-next-line @typescript-eslint/return-await
return res
@@
-428,22
+440,24
@@
export abstract class AbstractPool<
workerTasksUsage.avgRunTime =
workerTasksUsage.runTime / workerTasksUsage.run
}
workerTasksUsage.avgRunTime =
workerTasksUsage.runTime / workerTasksUsage.run
}
- if (this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime) {
- workerTasksUsage.runTimeHistory.push(message.runTime ?? 0)
+ if (
+ this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
+ message.runTime != null
+ ) {
+ workerTasksUsage.runTimeHistory.push(message.runTime)
workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
}
}
workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
}
}
- this.workerChoiceStrategyContext.update(workerNodeKey)
}
/**
* Chooses a worker node for the next task.
*
}
/**
* Chooses a worker node for the next task.
*
- * The default uses a round robin algorithm to distribute the load.
+ * The default
worker choice strategy
uses a round robin algorithm to distribute the load.
*
*
- * @returns
[worker node key, worker node].
+ * @returns
The worker node key
*/
*/
- protected chooseWorkerNode ():
[number, WorkerNode<Worker, Data>]
{
+ protected chooseWorkerNode ():
number
{
let workerNodeKey: number
if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
const workerCreated = this.createAndSetupWorker()
let workerNodeKey: number
if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
const workerCreated = this.createAndSetupWorker()
@@
-463,7
+477,7
@@
export abstract class AbstractPool<
} else {
workerNodeKey = this.workerChoiceStrategyContext.execute()
}
} else {
workerNodeKey = this.workerChoiceStrategyContext.execute()
}
- return
[workerNodeKey, this.workerNodes[workerNodeKey]]
+ return
workerNodeKey
}
/**
}
/**