X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fabstract-pool.ts;h=0d3d21832a715191e6efad9723c6a47454ae2215;hb=a5ed75b7c39de907a0047f4c30f2ea219ca4f917;hp=086d7ee82550bc40c46781b83616c1639b32155b;hpb=aa9eede876376ffbb6d8585192f3865849252f5c;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 086d7ee8..0d3d2183 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -10,6 +10,7 @@ import { DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, + isAsyncFunction, isKillBehavior, isPlainObject, median, @@ -114,6 +115,7 @@ export abstract class AbstractPool< this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) + this.dequeueTask = this.dequeueTask.bind(this) this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) if (this.opts.enableEvents === true) { @@ -171,7 +173,15 @@ export abstract class AbstractPool< protected checkDynamicPoolSize (min: number, max: number): void { if (this.type === PoolTypes.dynamic) { - if (min > max) { + if (max == null) { + throw new Error( + 'Cannot instantiate a dynamic pool without specifying the maximum pool size' + ) + } else if (!Number.isSafeInteger(max)) { + throw new TypeError( + 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' + ) + } else if (min > max) { throw new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' ) @@ -913,7 +923,16 @@ export abstract class AbstractPool< this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void (this.destroyWorkerNode(localWorkerNodeKey) as Promise) + const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this) + if (isAsyncFunction(destroyWorkerNodeBounded)) { + ( + destroyWorkerNodeBounded as (workerNodeKey: number) => Promise + )(localWorkerNodeKey).catch(EMPTY_FUNCTION) + } else { + (destroyWorkerNodeBounded as (workerNodeKey: number) => void)( + localWorkerNodeKey + ) + } } }) const workerInfo = this.getWorkerInfo(workerNodeKey) @@ -985,6 +1004,7 @@ export abstract class AbstractPool< while (this.tasksQueueSize(workerNodeKey) > 0) { let targetWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity + let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { const workerInfo = this.getWorkerInfo(workerNodeId) if ( @@ -992,6 +1012,12 @@ export abstract class AbstractPool< workerInfo.ready && workerNode.usage.tasks.queued === 0 ) { + if ( + this.workerNodes[workerNodeId].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + executeTask = true + } targetWorkerNodeKey = workerNodeId break } @@ -1004,10 +1030,17 @@ export abstract class AbstractPool< targetWorkerNodeKey = workerNodeId } } - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + if (executeTask) { + this.executeTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } else { + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } } }