X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=11cc9cc15ef419346827e5f627150f04e73ba158;hb=55d7d6002049be09a06b08da26febe2e8bfa494b;hp=670d490daa18c0cd4e3e03217df69275aa35b824;hpb=c329fd41c48904770df633b6d5ea2b3d37f3eafd;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 670d490d..11cc9cc1 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -55,6 +55,7 @@ import { checkFilePath, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, + getDefaultTasksQueueOptions, updateEluWorkerUsage, updateRunTimeWorkerUsage, updateTaskStatisticsWorkerUsage, @@ -519,18 +520,6 @@ export abstract class AbstractPool< } } - /** - * Gets the given worker its worker node key. - * - * @param worker - The worker. - * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. - */ - private getWorkerNodeKeyByWorker (worker: Worker): number { - return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker - ) - } - /** * Gets the worker node key given its worker id. * @@ -618,12 +607,7 @@ export abstract class AbstractPool< tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - ...{ - size: Math.pow(this.maxSize, 2), - concurrency: 1, - taskStealing: true, - tasksStealingOnBackPressure: true - }, + ...getDefaultTasksQueueOptions(this.maxSize), ...tasksQueueOptions } } @@ -1021,6 +1005,10 @@ export abstract class AbstractPool< workerNodeKey: number ): Promise { await new Promise((resolve, reject) => { + if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) { + reject(new Error(`Invalid worker node key '${workerNodeKey}'`)) + return + } const killMessageListener = (message: MessageValue): void => { this.checkMessageWorkerId(message) if (message.kill === 'success') { @@ -1050,7 +1038,13 @@ export abstract class AbstractPool< this.flagWorkerNodeAsNotReady(workerNodeKey) const flushedTasks = this.flushTasksQueue(workerNodeKey) const workerNode = this.workerNodes[workerNodeKey] - await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks) + await waitWorkerNodeEvents( + workerNode, + 'taskFinished', + flushedTasks, + this.opts.tasksQueueOptions?.tasksFinishedTimeout ?? + getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout + ) await this.sendKillMessageToWorker(workerNodeKey) await workerNode.terminate() } @@ -1257,7 +1251,7 @@ export abstract class AbstractPool< if (this.started && this.opts.enableTasksQueue === true) { this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } - workerNode.terminate().catch(error => { + workerNode?.terminate().catch(error => { this.emitter?.emit(PoolEvents.error, error) }) }) @@ -1433,6 +1427,9 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { + if (workerNodeKey === -1) { + return + } if (this.workerNodes.length <= 1) { return } @@ -1713,7 +1710,7 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) this.promiseResponseMap.delete(taskId as string) workerNode?.emit('taskFinished', taskId) - if (this.opts.enableTasksQueue === true) { + if (this.opts.enableTasksQueue === true && !this.destroying) { const workerNodeTasksUsage = workerNode.usage.tasks if ( this.tasksQueueSize(workerNodeKey) > 0 && @@ -1782,7 +1779,8 @@ export abstract class AbstractPool< env: this.opts.env, workerOptions: this.opts.workerOptions, tasksQueueBackPressureSize: - this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) + this.opts.tasksQueueOptions?.size ?? + getDefaultTasksQueueOptions(this.maxSize).size } ) // Flag the worker node as ready at pool startup.