X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=8363ecba9f8d3c98d9071023e99f4b7b47053e7a;hb=dd6a5ce306e8ffc7dccf8734c79e2ecf10d4d59b;hp=c3565446f254c0292978f1f3eed1c23c1b1ac267;hpb=930dcf12982091cb5658b5a44acd47676fbae631;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c3565446..8363ecba 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -379,6 +379,11 @@ export abstract class AbstractPool< */ protected abstract get busy (): boolean + /** + * Whether worker nodes are executing at least one task. + * + * @returns Worker nodes busyness boolean status. + */ protected internalBusy (): boolean { return ( this.workerNodes.findIndex(workerNode => { @@ -434,7 +439,7 @@ export abstract class AbstractPool< } /** - * Shutdowns the given worker. + * Terminates the given worker. * * @param worker - A worker within `workerNodes`. */ @@ -609,18 +614,28 @@ export abstract class AbstractPool< /** * Chooses a worker node for the next task. * - * The default worker choice strategy uses a round robin algorithm to distribute the load. + * The default worker choice strategy uses a round robin algorithm to distribute the tasks. * * @returns The worker node key */ - protected chooseWorkerNode (): number { + private chooseWorkerNode (): number { if (this.shallCreateDynamicWorker()) { - return this.getWorkerNodeKey(this.createAndSetupDynamicWorker()) + const worker = this.createAndSetupDynamicWorker() + if ( + this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker + ) { + return this.getWorkerNodeKey(worker) + } } return this.workerChoiceStrategyContext.execute() } - protected shallCreateDynamicWorker (): boolean { + /** + * Conditions for dynamic worker creation. + * + * @returns Whether to create a dynamic worker or not. + */ + private shallCreateDynamicWorker (): boolean { return this.type === PoolTypes.dynamic && !this.full && this.internalBusy() } @@ -646,7 +661,9 @@ export abstract class AbstractPool< >(worker: Worker, listener: (message: MessageValue) => void): void /** - * Returns a newly created worker. + * Creates a new worker. + * + * @returns Newly created worker. */ protected abstract createWorker (): Worker @@ -704,12 +721,15 @@ export abstract class AbstractPool< if ( isKillBehavior(KillBehaviors.HARD, message.kill) || (message.kill != null && - this.workerNodes[currentWorkerNodeKey].workerUsage.tasks.executing === - 0) + ((this.opts.enableTasksQueue === false && + this.workerNodes[currentWorkerNodeKey].workerUsage.tasks + .executing === 0) || + (this.opts.enableTasksQueue === true && + this.workerNodes[currentWorkerNodeKey].workerUsage.tasks + .executing === 0 && + this.tasksQueueSize(currentWorkerNodeKey) === 0))) ) { // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - this.flushTasksQueue(currentWorkerNodeKey) - // FIXME: wait for tasks to be finished void (this.destroyWorker(worker) as Promise) } }) @@ -746,6 +766,7 @@ export abstract class AbstractPool< workerNodeKey, this.dequeueTask(workerNodeKey) as Task ) + this.workerChoiceStrategyContext.update(workerNodeKey) } } }