DEFAULT_TASK_NAME,
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
+ isAsyncFunction,
isKillBehavior,
isPlainObject,
median,
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
this.executeTask = this.executeTask.bind(this)
this.enqueueTask = this.enqueueTask.bind(this)
+ this.dequeueTask = this.dequeueTask.bind(this)
this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
if (this.opts.enableEvents === true) {
protected checkDynamicPoolSize (min: number, max: number): void {
if (this.type === PoolTypes.dynamic) {
- if (min > max) {
+ if (max == null) {
+ throw new Error(
+ 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
+ )
+ } else if (!Number.isSafeInteger(max)) {
+ throw new TypeError(
+ 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
+ )
+ } else if (min > max) {
throw new RangeError(
'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
)
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
// Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- void (this.destroyWorkerNode(localWorkerNodeKey) as Promise<void>)
+ const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
+ if (isAsyncFunction(destroyWorkerNodeBounded)) {
+ (
+ destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
+ )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
+ } else {
+ (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
+ localWorkerNodeKey
+ )
+ }
}
})
const workerInfo = this.getWorkerInfo(workerNodeKey)
while (this.tasksQueueSize(workerNodeKey) > 0) {
let targetWorkerNodeKey: number = workerNodeKey
let minQueuedTasks = Infinity
+ let executeTask = false
for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
const workerInfo = this.getWorkerInfo(workerNodeId)
if (
workerInfo.ready &&
workerNode.usage.tasks.queued === 0
) {
+ if (
+ this.workerNodes[workerNodeId].usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ executeTask = true
+ }
targetWorkerNodeKey = workerNodeId
break
}
targetWorkerNodeKey = workerNodeId
}
}
- this.enqueueTask(
- targetWorkerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
+ if (executeTask) {
+ this.executeTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ } else {
+ this.enqueueTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
}
}