X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=192725082942b1aa7b8904c07eaf8a2731865f45;hb=984910dc4377061815ef8cb33a6dc91dfc35be3c;hp=4b1a05ff63f7a9b78ecc66c8541b272cad18a61b;hpb=18fea9f10a9d1fd8bbbc4738284939868faa0db5;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 4b1a05ff..19272508 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -10,8 +10,6 @@ import { round } from '../utils' import { KillBehaviors } from '../worker/worker-options' -import { CircularArray } from '../circular-array' -import { Queue } from '../queue' import { type IPool, PoolEmitter, @@ -20,16 +18,15 @@ import { type PoolOptions, type PoolType, PoolTypes, - type TasksQueueOptions, - type WorkerType, - WorkerTypes + type TasksQueueOptions } from './pool' import type { IWorker, + IWorkerNode, MessageHandler, Task, WorkerInfo, - WorkerNode, + WorkerType, WorkerUsage } from './worker' import { @@ -40,6 +37,7 @@ import { } from './selection-strategies/selection-strategies-types' import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' import { version } from './version' +import { WorkerNode } from './worker-node' /** * Base class that implements some shared logic for all poolifier pools. @@ -54,7 +52,7 @@ export abstract class AbstractPool< Response = unknown > implements IPool { /** @inheritDoc */ - public readonly workerNodes: Array> = [] + public readonly workerNodes: Array> = [] /** @inheritDoc */ public readonly emitter?: PoolEmitter @@ -155,7 +153,25 @@ export abstract class AbstractPool< 'Cannot instantiate a pool with a negative number of workers' ) } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) { - throw new Error('Cannot instantiate a fixed pool with no worker') + throw new RangeError('Cannot instantiate a fixed pool with zero worker') + } + } + + protected checkDynamicPoolSize (min: number, max: number): void { + if (this.type === PoolTypes.dynamic) { + if (min > max) { + throw new RangeError( + 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' + ) + } else if (min === 0 && max === 0) { + throw new RangeError( + 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero' + ) + } else if (min === max) { + throw new RangeError( + 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' + ) + } } } @@ -254,6 +270,8 @@ export abstract class AbstractPool< version, type: this.type, worker: this.worker, + ready: this.ready, + strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, minSize: this.minSize, maxSize: this.maxSize, ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() @@ -383,6 +401,19 @@ export abstract class AbstractPool< } } + private get starting (): boolean { + return ( + !this.full || + (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready)) + ) + } + + private get ready (): boolean { + return ( + this.full && this.workerNodes.every(workerNode => workerNode.info.ready) + ) + } + /** * Gets the approximate pool utilization. * @@ -437,6 +468,17 @@ export abstract class AbstractPool< ?.worker } + private checkMessageWorkerId (message: MessageValue): void { + if ( + message.workerId != null && + this.getWorkerById(message.workerId) == null + ) { + throw new Error( + `Worker message received from unknown worker '${message.workerId}'` + ) + } + } + /** * Gets the given worker its worker node key. * @@ -463,10 +505,7 @@ export abstract class AbstractPool< this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) } for (const workerNode of this.workerNodes) { - this.setWorkerNodeTasksUsage( - workerNode, - this.getInitialWorkerUsage(workerNode.worker) - ) + workerNode.resetUsage() this.setWorkerStatistics(workerNode.worker) } } @@ -551,6 +590,7 @@ export abstract class AbstractPool< // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), timestamp, + workerId: this.getWorkerInfo(workerNodeKey).id as number, id: randomUUID() } const res = new Promise((resolve, reject) => { @@ -869,6 +909,13 @@ export abstract class AbstractPool< protected afterWorkerSetup (worker: Worker): void { // Listen to worker messages. this.registerWorkerMessageListener(worker, this.workerListener()) + // Send startup message to worker. + this.sendToWorker(worker, { + ready: false, + workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number + }) + // Setup worker task statistics computation. + this.setWorkerStatistics(worker) } /** @@ -888,7 +935,7 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { this.redistributeQueuedTasks(worker) } - if (this.opts.restartWorkerOnError === true) { + if (this.opts.restartWorkerOnError === true && !this.starting) { if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) { this.createAndSetupDynamicWorker() } else { @@ -904,8 +951,6 @@ export abstract class AbstractPool< this.pushWorkerNode(worker) - this.setWorkerStatistics(worker) - this.afterWorkerSetup(worker) return worker @@ -946,7 +991,6 @@ export abstract class AbstractPool< */ protected createAndSetupDynamicWorker (): Worker { const worker = this.createAndSetupWorker() - this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true this.registerWorkerMessageListener(worker, message => { const workerNodeKey = this.getWorkerNodeKey(worker) if ( @@ -962,7 +1006,12 @@ export abstract class AbstractPool< void (this.destroyWorker(worker) as Promise) } }) - this.sendToWorker(worker, { dynamic: true }) + const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker)) + workerInfo.dynamic = true + this.sendToWorker(worker, { + checkAlive: true, + workerId: workerInfo.id as number + }) return worker } @@ -973,9 +1022,10 @@ export abstract class AbstractPool< */ protected workerListener (): (message: MessageValue) => void { return message => { - if (message.workerId != null && message.started != null) { - // Worker started message received - this.handleWorkerStartedMessage(message) + this.checkMessageWorkerId(message) + if (message.ready != null && message.workerId != null) { + // Worker ready message received + this.handleWorkerReadyMessage(message) } else if (message.id != null) { // Task execution response received this.handleTaskExecutionResponse(message) @@ -983,18 +1033,12 @@ export abstract class AbstractPool< } } - private handleWorkerStartedMessage (message: MessageValue): void { - // Worker started message received - const worker = this.getWorkerById(message.workerId as number) - if (worker != null) { - this.workerNodes[this.getWorkerNodeKey(worker)].info.started = - message.started as boolean - } else { - throw new Error( - `Worker started message received from unknown worker '${ - message.workerId as number - }'` - ) + private handleWorkerReadyMessage (message: MessageValue): void { + const worker = this.getWorkerById(message.workerId) + this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready = + message.ready as boolean + if (this.emitter != null && this.ready) { + this.emitter.emit(PoolEvents.ready, this.info) } } @@ -1036,19 +1080,6 @@ export abstract class AbstractPool< } } - /** - * Sets the given worker node its tasks usage in the pool. - * - * @param workerNode - The worker node. - * @param workerUsage - The worker usage. - */ - private setWorkerNodeTasksUsage ( - workerNode: WorkerNode, - workerUsage: WorkerUsage - ): void { - workerNode.usage = workerUsage - } - /** * Gets the worker information. * @@ -1065,57 +1096,9 @@ export abstract class AbstractPool< * @returns The worker nodes length. */ private pushWorkerNode (worker: Worker): number { - this.workerNodes.push({ - worker, - info: this.getInitialWorkerInfo(worker), - usage: this.getInitialWorkerUsage(), - tasksQueue: new Queue>() - }) - this.setWorkerNodeTasksUsage( - this.workerNodes[this.getWorkerNodeKey(worker)], - this.getInitialWorkerUsage(worker) - ) - return this.workerNodes.length + return this.workerNodes.push(new WorkerNode(worker, this.worker)) } - /** - * Gets the worker id. - * - * @param worker - The worker. - * @returns The worker id. - */ - private getWorkerId (worker: Worker): number | undefined { - if (this.worker === WorkerTypes.thread) { - return worker.threadId - } else if (this.worker === WorkerTypes.cluster) { - return worker.id - } - } - - // /** - // * Sets the given worker in the pool worker nodes. - // * - // * @param workerNodeKey - The worker node key. - // * @param worker - The worker. - // * @param workerInfo - The worker info. - // * @param workerUsage - The worker usage. - // * @param tasksQueue - The worker task queue. - // */ - // private setWorkerNode ( - // workerNodeKey: number, - // worker: Worker, - // workerInfo: WorkerInfo, - // workerUsage: WorkerUsage, - // tasksQueue: Queue> - // ): void { - // this.workerNodes[workerNodeKey] = { - // worker, - // info: workerInfo, - // usage: workerUsage, - // tasksQueue - // } - // } - /** * Removes the given worker from the pool worker nodes. * @@ -1135,19 +1118,15 @@ export abstract class AbstractPool< } private enqueueTask (workerNodeKey: number, task: Task): number { - return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task) + return this.workerNodes[workerNodeKey].enqueueTask(task) } private dequeueTask (workerNodeKey: number): Task | undefined { - return this.workerNodes[workerNodeKey].tasksQueue.dequeue() + return this.workerNodes[workerNodeKey].dequeueTask() } private tasksQueueSize (workerNodeKey: number): number { - return this.workerNodes[workerNodeKey].tasksQueue.size - } - - private tasksMaxQueueSize (workerNodeKey: number): number { - return this.workerNodes[workerNodeKey].tasksQueue.maxSize + return this.workerNodes[workerNodeKey].tasksQueueSize() } private flushTasksQueue (workerNodeKey: number): void { @@ -1157,7 +1136,7 @@ export abstract class AbstractPool< this.dequeueTask(workerNodeKey) as Task ) } - this.workerNodes[workerNodeKey].tasksQueue.clear() + this.workerNodes[workerNodeKey].clearTasksQueue() } private flushTasksQueues (): void { @@ -1174,53 +1153,8 @@ export abstract class AbstractPool< .runTime.aggregate, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate - } - }) - } - - private getInitialWorkerUsage (worker?: Worker): WorkerUsage { - const getTasksQueueSize = (worker?: Worker): number => { - if (worker == null) { - return 0 - } - return this.tasksQueueSize(this.getWorkerNodeKey(worker)) - } - const getTasksMaxQueueSize = (worker?: Worker): number => { - if (worker == null) { - return 0 - } - return this.tasksMaxQueueSize(this.getWorkerNodeKey(worker)) - } - return { - tasks: { - executed: 0, - executing: 0, - get queued (): number { - return getTasksQueueSize(worker) - }, - get maxQueued (): number { - return getTasksMaxQueueSize(worker) - }, - failed: 0 }, - runTime: { - history: new CircularArray() - }, - waitTime: { - history: new CircularArray() - }, - elu: { - idle: { - history: new CircularArray() - }, - active: { - history: new CircularArray() - } - } - } - } - - private getInitialWorkerInfo (worker: Worker): WorkerInfo { - return { id: this.getWorkerId(worker), dynamic: false, started: true } + workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number + }) } }