X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=30b4d01f9846cd2c2327d5a52e4a6be594e8d62c;hb=23a6c28d3cc6edb778653a4d9d9a4d36f9a961c3;hp=56dd7fa9296265078f863d342338da0457309919;hpb=aad6fb64175759059c70ddcded4a01b2accd9e8c;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 56dd7fa9..30b4d01f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -10,7 +10,6 @@ import { DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, - isAsyncFunction, isKillBehavior, isPlainObject, median, @@ -335,16 +334,20 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.executing, 0 ), - queuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + workerNode.usage.tasks.queued, - 0 - ), - maxQueuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), - 0 - ), + ...(this.opts.enableTasksQueue === true && { + queuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.queued, + 0 + ) + }), + ...(this.opts.enableTasksQueue === true && { + maxQueuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), + 0 + ) + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -618,9 +621,10 @@ export abstract class AbstractPool< */ protected internalBusy (): boolean { return ( - this.workerNodes.findIndex(workerNode => { - return workerNode.usage.tasks.executing === 0 - }) === -1 + this.workerNodes.findIndex( + workerNode => + workerNode.info.ready && workerNode.usage.tasks.executing === 0 + ) === -1 ) } @@ -643,14 +647,14 @@ export abstract class AbstractPool< workerNodeKey }) if ( - this.opts.enableTasksQueue === true && - (this.busy || - this.workerNodes[workerNodeKey].usage.tasks.executing >= + this.opts.enableTasksQueue === false || + (this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number)) ) { - this.enqueueTask(workerNodeKey, task) - } else { this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) } this.checkAndEmitEvents() }) @@ -659,16 +663,8 @@ export abstract class AbstractPool< /** @inheritDoc */ public async destroy (): Promise { await Promise.all( - this.workerNodes.map(async (workerNode, workerNodeKey) => { - this.flushTasksQueue(workerNodeKey) - // FIXME: wait for tasks to be finished - const workerExitPromise = new Promise(resolve => { - workerNode.worker.on('exit', () => { - resolve() - }) - }) + this.workerNodes.map(async (_, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) - await workerExitPromise }) ) } @@ -678,9 +674,7 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorkerNode ( - workerNodeKey: number - ): void | Promise + protected abstract destroyWorkerNode (workerNodeKey: number): Promise /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -910,6 +904,7 @@ export abstract class AbstractPool< message.workerId ) const workerUsage = this.workerNodes[localWorkerNodeKey].usage + // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || (message.kill != null && @@ -919,17 +914,7 @@ export abstract class AbstractPool< workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this) - if (isAsyncFunction(destroyWorkerNodeBounded)) { - ( - destroyWorkerNodeBounded as (workerNodeKey: number) => Promise - )(localWorkerNodeKey).catch(EMPTY_FUNCTION) - } else { - (destroyWorkerNodeBounded as (workerNodeKey: number) => void)( - localWorkerNodeKey - ) - } + this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION) } }) const workerInfo = this.getWorkerInfo(workerNodeKey) @@ -1050,10 +1035,10 @@ export abstract class AbstractPool< return message => { this.checkMessageWorkerId(message) if (message.ready != null) { - // Worker ready response received + // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.id != null) { - // Task execution response received + // Task execution response received from worker this.handleTaskExecutionResponse(message) } } @@ -1173,7 +1158,7 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - private flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { this.executeTask( workerNodeKey,