X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=fb1bf8458ad7a970f27ae7a4dcbb300faf5bf848;hb=dbd73092cca6cabb2b41e18b944656fc43f8757b;hp=fea2e3a1f4cec5b5c8f6187515f707524dc41dfe;hpb=563b18cd8fe9c211f15e9da91a299f2819abea01;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index fea2e3a1..fb1bf845 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -92,14 +92,14 @@ export abstract class AbstractPool< */ protected readonly max?: number - /** - * Whether the pool is starting or not. - */ - private readonly starting: boolean /** * Whether the pool is started or not. */ private started: boolean + /** + * Whether the pool is starting or not. + */ + private starting: boolean /** * The start timestamp of the pool. */ @@ -145,10 +145,11 @@ export abstract class AbstractPool< this.setupHook() - this.starting = true - this.startPool() + this.started = false this.starting = false - this.started = true + if (this.opts.startWorkers === true) { + this.start() + } this.startTimestamp = performance.now() } @@ -212,6 +213,7 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { + this.opts.startWorkers = opts.startWorkers ?? true this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) @@ -314,11 +316,6 @@ export abstract class AbstractPool< `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero` ) } - if (tasksQueueOptions?.queueMaxSize != null) { - throw new Error( - 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead' - ) - } if ( tasksQueueOptions?.size != null && !Number.isSafeInteger(tasksQueueOptions?.size) @@ -334,24 +331,13 @@ export abstract class AbstractPool< } } - private startPool (): void { - while ( - this.workerNodes.reduce( - (accumulator, workerNode) => - !workerNode.info.dynamic ? accumulator + 1 : accumulator, - 0 - ) < this.numberOfWorkers - ) { - this.createAndSetupWorkerNode() - } - } - /** @inheritDoc */ public get info (): PoolInfo { return { version, type: this.type, worker: this.worker, + started: this.started, ready: this.ready, strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, minSize: this.minSize, @@ -675,7 +661,9 @@ export abstract class AbstractPool< return { ...{ size: Math.pow(this.maxSize, 2), - concurrency: 1 + concurrency: 1, + taskStealing: true, + tasksStealingOnBackPressure: true }, ...tasksQueueOptions } @@ -751,7 +739,7 @@ export abstract class AbstractPool< ): Promise { return await new Promise((resolve, reject) => { if (!this.started) { - reject(new Error('Cannot execute a task on destroyed pool')) + reject(new Error('Cannot execute a task on not started pool')) return } if (name != null && typeof name !== 'string') { @@ -772,14 +760,13 @@ export abstract class AbstractPool< } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() - const workerInfo = this.getWorkerInfo(workerNodeKey) const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), transferList, timestamp, - workerId: workerInfo.id as number, + workerId: this.getWorkerInfo(workerNodeKey).id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -799,6 +786,22 @@ export abstract class AbstractPool< }) } + /** @inheritdoc */ + public start (): void { + this.starting = true + while ( + this.workerNodes.reduce( + (accumulator, workerNode) => + !workerNode.info.dynamic ? accumulator + 1 : accumulator, + 0 + ) < this.numberOfWorkers + ) { + this.createAndSetupWorkerNode() + } + this.starting = false + this.started = true + } + /** @inheritDoc */ public async destroy (): Promise { await Promise.all( @@ -1072,8 +1075,8 @@ export abstract class AbstractPool< this.emitter?.emit(PoolEvents.error, error) if ( this.opts.restartWorkerOnError === true && - !this.starting && - this.started + this.started && + !this.starting ) { if (workerInfo.dynamic) { this.createAndSetupDynamicWorkerNode() @@ -1167,10 +1170,14 @@ export abstract class AbstractPool< // Send the statistics message to worker. this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { - this.workerNodes[workerNodeKey].onEmptyQueue = - this.taskStealingOnEmptyQueue.bind(this) - this.workerNodes[workerNodeKey].onBackPressure = - this.tasksStealingOnBackPressure.bind(this) + if (this.opts.tasksQueueOptions?.taskStealing === true) { + this.workerNodes[workerNodeKey].onEmptyQueue = + this.taskStealingOnEmptyQueue.bind(this) + } + if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { + this.workerNodes[workerNodeKey].onBackPressure = + this.tasksStealingOnBackPressure.bind(this) + } } }