X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=0b359d93b6006c124c692fda70f9fb21926691f4;hb=10ecf8fd5f751ffb5477284ae1b6935f3c81ec2d;hp=086d7ee82550bc40c46781b83616c1639b32155b;hpb=aa9eede876376ffbb6d8585192f3865849252f5c;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 086d7ee8..0b359d93 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -10,6 +10,7 @@ import { DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, + isAsyncFunction, isKillBehavior, isPlainObject, median, @@ -114,6 +115,7 @@ export abstract class AbstractPool< this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) + this.dequeueTask = this.dequeueTask.bind(this) this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) if (this.opts.enableEvents === true) { @@ -913,7 +915,16 @@ export abstract class AbstractPool< this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void (this.destroyWorkerNode(localWorkerNodeKey) as Promise) + const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this) + if (isAsyncFunction(destroyWorkerNodeBounded)) { + ( + destroyWorkerNodeBounded as (workerNodeKey: number) => Promise + )(localWorkerNodeKey).catch(EMPTY_FUNCTION) + } else { + (destroyWorkerNodeBounded as (workerNodeKey: number) => void)( + localWorkerNodeKey + ) + } } }) const workerInfo = this.getWorkerInfo(workerNodeKey) @@ -985,6 +996,7 @@ export abstract class AbstractPool< while (this.tasksQueueSize(workerNodeKey) > 0) { let targetWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity + let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { const workerInfo = this.getWorkerInfo(workerNodeId) if ( @@ -992,6 +1004,9 @@ export abstract class AbstractPool< workerInfo.ready && workerNode.usage.tasks.queued === 0 ) { + if (workerNode.usage.tasks.executing === 0) { + executeTask = true + } targetWorkerNodeKey = workerNodeId break } @@ -1004,10 +1019,17 @@ export abstract class AbstractPool< targetWorkerNodeKey = workerNodeId } } - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + if (executeTask) { + this.executeTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } else { + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } } }