X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=6c9a0fd185f7c560f4f7fa5ac597ab784f6d6dbd;hb=2431bdb4c2dc637169bf623a40fc6562f685e56e;hp=53bce7d2b7523f6a819cc5fd2dba3cbf5c84cc6a;hpb=7c8381eb33aaff689afe1944d8643508003cf0b1;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 53bce7d2..6c9a0fd1 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -153,7 +153,19 @@ export abstract class AbstractPool< 'Cannot instantiate a pool with a negative number of workers' ) } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) { - throw new Error('Cannot instantiate a fixed pool with no worker') + throw new RangeError('Cannot instantiate a fixed pool with zero worker') + } + } + + protected checkDynamicPoolSize (min: number, max: number): void { + if (this.type === PoolTypes.dynamic && min > max) { + throw new RangeError( + 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' + ) + } else if (this.type === PoolTypes.dynamic && min === max) { + throw new RangeError( + 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' + ) } } @@ -252,6 +264,8 @@ export abstract class AbstractPool< version, type: this.type, worker: this.worker, + ready: this.ready, + strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, minSize: this.minSize, maxSize: this.maxSize, ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() @@ -381,6 +395,19 @@ export abstract class AbstractPool< } } + private get starting (): boolean { + return ( + !this.full || + (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready)) + ) + } + + private get ready (): boolean { + return ( + this.full && this.workerNodes.every(workerNode => workerNode.info.ready) + ) + } + /** * Gets the approximate pool utilization. * @@ -864,6 +891,13 @@ export abstract class AbstractPool< protected afterWorkerSetup (worker: Worker): void { // Listen to worker messages. this.registerWorkerMessageListener(worker, this.workerListener()) + // Send startup message to worker. + this.sendToWorker(worker, { + ready: false, + workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id + }) + // Setup worker task statistics computation. + this.setWorkerStatistics(worker) } /** @@ -883,7 +917,7 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { this.redistributeQueuedTasks(worker) } - if (this.opts.restartWorkerOnError === true) { + if (this.opts.restartWorkerOnError === true && !this.starting) { if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) { this.createAndSetupDynamicWorker() } else { @@ -899,8 +933,6 @@ export abstract class AbstractPool< this.pushWorkerNode(worker) - this.setWorkerStatistics(worker) - this.afterWorkerSetup(worker) return worker @@ -941,7 +973,6 @@ export abstract class AbstractPool< */ protected createAndSetupDynamicWorker (): Worker { const worker = this.createAndSetupWorker() - this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true this.registerWorkerMessageListener(worker, message => { const workerNodeKey = this.getWorkerNodeKey(worker) if ( @@ -957,6 +988,7 @@ export abstract class AbstractPool< void (this.destroyWorker(worker) as Promise) } }) + this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true this.sendToWorker(worker, { checkAlive: true }) return worker } @@ -968,7 +1000,7 @@ export abstract class AbstractPool< */ protected workerListener (): (message: MessageValue) => void { return message => { - if (message.workerId != null && message.ready != null) { + if (message.ready != null && message.workerId != null) { // Worker ready message received this.handleWorkerReadyMessage(message) } else if (message.id != null) { @@ -990,6 +1022,9 @@ export abstract class AbstractPool< }'` ) } + if (this.emitter != null && this.ready) { + this.emitter.emit(PoolEvents.ready, this.info) + } } private handleTaskExecutionResponse (message: MessageValue): void {