X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fabstract-pool.ts;h=635b8af459363dc022671f089492f155b3470695;hb=9b358e72dbd061a7b94708d7d3c64e9dbbefaab4;hp=282979852bf564de38942b3d8c86f1dddf0c1831;hpb=07e0c9e591f9fa6715ba94e52c647b7ee3d2b9c7;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 28297985..635b8af4 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -56,7 +56,8 @@ import { checkFilePath, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, - updateMeasurementStatistics + updateMeasurementStatistics, + waitWorkerNodeEvents } from './utils' /** @@ -1045,9 +1046,9 @@ export abstract class AbstractPool< */ protected async destroyWorkerNode (workerNodeKey: number): Promise { this.flagWorkerNodeAsNotReady(workerNodeKey) - this.flushTasksQueue(workerNodeKey) - // FIXME: wait for tasks to be finished + const flushedTasks = this.flushTasksQueue(workerNodeKey) const workerNode = this.workerNodes[workerNodeKey] + await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks) await this.sendKillMessageToWorker(workerNodeKey) await workerNode.terminate() } @@ -1285,7 +1286,6 @@ export abstract class AbstractPool< this.opts.errorHandler ?? EMPTY_FUNCTION ) workerNode.registerWorkerEventHandler('error', (error: Error) => { - const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker) workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) if ( @@ -1301,7 +1301,7 @@ export abstract class AbstractPool< } } if (this.started && this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(workerNodeKey) + this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } workerNode.terminate().catch(error => { this.emitter?.emit(PoolEvents.error, error) @@ -1312,7 +1312,7 @@ export abstract class AbstractPool< this.opts.exitHandler ?? EMPTY_FUNCTION ) workerNode.registerOnceWorkerEventHandler('exit', () => { - this.removeWorkerNode(workerNode.worker) + this.removeWorkerNode(workerNode) }) const workerNodeKey = this.addWorkerNode(workerNode) this.afterWorkerNodeSetup(workerNodeKey) @@ -1740,6 +1740,7 @@ export abstract class AbstractPool< const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse + const workerNode = this.workerNodes[workerNodeKey] if (workerError != null) { this.emitter?.emit(PoolEvents.taskError, workerError) asyncResource != null @@ -1758,8 +1759,9 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) this.workerChoiceStrategyContext.update(workerNodeKey) this.promiseResponseMap.delete(taskId as string) + workerNode.emit('taskFinished', taskId) if (this.opts.enableTasksQueue === true) { - const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks + const workerNodeTasksUsage = workerNode.usage.tasks if ( this.tasksQueueSize(workerNodeKey) > 0 && workerNodeTasksUsage.executing < @@ -1775,7 +1777,7 @@ export abstract class AbstractPool< this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - this.workerNodes[workerNodeKey].emit('idleWorkerNode', { + workerNode.emit('idleWorkerNode', { workerId: workerId as number, workerNodeKey }) @@ -1854,12 +1856,12 @@ export abstract class AbstractPool< } /** - * Removes the worker node associated to the given worker from the pool worker nodes. + * Removes the worker node from the pool worker nodes. * - * @param worker - The worker. + * @param workerNode - The worker node. */ - private removeWorkerNode (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + private removeWorkerNode (workerNode: IWorkerNode): void { + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext.remove(workerNodeKey) @@ -1913,14 +1915,17 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - protected flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): number { + let flushedTasks = 0 while (this.tasksQueueSize(workerNodeKey) > 0) { this.executeTask( workerNodeKey, this.dequeueTask(workerNodeKey) as Task ) + ++flushedTasks } this.workerNodes[workerNodeKey].clearTasksQueue() + return flushedTasks } private flushTasksQueues (): void {