X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=6b95be9482664deaa1131eced2ccbe3ccb48ca8a;hb=de868be6cc9bcfb6d341ffb14e6407c24a1a0e17;hp=9b0ec92f3df1f9a2d5cdec26608cde6595a9cf96;hpb=42c677c1fd2cacd8bb79c8be0aa024ac93c7159a;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 9b0ec92f..6b95be94 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1043,7 +1043,14 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorkerNode (workerNodeKey: number): Promise + protected async destroyWorkerNode (workerNodeKey: number): Promise { + this.flagWorkerNodeAsNotReady(workerNodeKey) + this.flushTasksQueue(workerNodeKey) + // FIXME: wait for tasks to be finished + const workerNode = this.workerNodes[workerNodeKey] + await this.sendKillMessageToWorker(workerNodeKey) + await workerNode.terminate() + } /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -1258,55 +1265,56 @@ export abstract class AbstractPool< transferList?: TransferListItem[] ): void - /** - * Creates a new worker. - * - * @returns Newly created worker. - */ - protected abstract createWorker (): Worker - /** * Creates a new, completely set up worker node. * * @returns New, completely set up worker node key. */ protected createAndSetupWorkerNode (): number { - const worker = this.createWorker() - - worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) - worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) - worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', error => { - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) - this.flagWorkerNodeAsNotReady(workerNodeKey) - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerNode = this.createWorkerNode() + workerNode.registerWorkerEventHandler( + 'online', + this.opts.onlineHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler( + 'message', + this.opts.messageHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler( + 'error', + this.opts.errorHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler('error', (error: Error) => { + workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) - this.workerNodes[workerNodeKey].closeChannel() if ( this.started && !this.starting && !this.destroying && this.opts.restartWorkerOnError === true ) { - if (workerInfo.dynamic) { + if (workerNode.info.dynamic) { this.createAndSetupDynamicWorkerNode() } else { this.createAndSetupWorkerNode() } } if (this.started && this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(workerNodeKey) + this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } + workerNode.terminate().catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) }) - worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) - worker.once('exit', () => { - this.removeWorkerNode(worker) + workerNode.registerWorkerEventHandler( + 'exit', + this.opts.exitHandler ?? EMPTY_FUNCTION + ) + workerNode.registerOnceWorkerEventHandler('exit', () => { + this.removeWorkerNode(workerNode) }) - - const workerNodeKey = this.addWorkerNode(worker) - + const workerNodeKey = this.addWorkerNode(workerNode) this.afterWorkerNodeSetup(workerNodeKey) - return workerNodeKey } @@ -1806,23 +1814,38 @@ export abstract class AbstractPool< } /** - * Adds the given worker in the pool worker nodes. + * Creates a worker node. * - * @param worker - The worker. - * @returns The added worker node key. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. + * @returns The created worker node. */ - private addWorkerNode (worker: Worker): number { + private createWorkerNode (): IWorkerNode { const workerNode = new WorkerNode( - worker, - this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) + this.worker, + this.filePath, + { + env: this.opts.env, + workerOptions: this.opts.workerOptions, + tasksQueueBackPressureSize: + this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) + } ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true } + return workerNode + } + + /** + * Adds the given worker node in the pool worker nodes. + * + * @param workerNode - The worker node. + * @returns The added worker node key. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. + */ + private addWorkerNode (workerNode: IWorkerNode): number { this.workerNodes.push(workerNode) - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey === -1) { throw new Error('Worker added not found in worker nodes') } @@ -1830,12 +1853,12 @@ export abstract class AbstractPool< } /** - * Removes the given worker from the pool worker nodes. + * Removes the worker node from the pool worker nodes. * - * @param worker - The worker. + * @param workerNode - The worker node. */ - private removeWorkerNode (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + private removeWorkerNode (workerNode: IWorkerNode): void { + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext.remove(workerNodeKey)