X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=fd8628100fe560b11c1b48765c0102d9bdb806e5;hb=301b5d97a2c9bcd278819b4f81c42c949da66d63;hp=75b5cf796c8a6c198c48bb8ad843bd6eae33ba29;hpb=78099a150dc54d7adab495195fa5f133fd54e114;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 75b5cf79..fd862810 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -4,9 +4,9 @@ import { EMPTY_FUNCTION, median } from '../utils' import { KillBehaviors, isKillBehavior } from '../worker/worker-options' import { PoolEvents, type PoolOptions } from './pool' import { PoolEmitter } from './pool' -import type { IPoolInternal, TasksUsage, WorkerType } from './pool-internal' +import type { IPoolInternal } from './pool-internal' import { PoolType } from './pool-internal' -import type { IPoolWorker } from './pool-worker' +import type { IWorker, Task, TasksUsage, WorkerNode } from './worker' import { WorkerChoiceStrategies, type WorkerChoiceStrategy @@ -22,12 +22,12 @@ import { CircularArray } from '../circular-array' * @typeParam Response - Type of response of execution. This can only be serializable data. */ export abstract class AbstractPool< - Worker extends IPoolWorker, + Worker extends IWorker, Data = unknown, Response = unknown > implements IPoolInternal { /** @inheritDoc */ - public readonly workers: Array> = [] + public readonly workerNodes: Array> = [] /** @inheritDoc */ public readonly emitter?: PoolEmitter @@ -75,7 +75,7 @@ export abstract class AbstractPool< this.checkFilePath(this.filePath) this.checkPoolOptions(this.opts) - this.chooseWorker.bind(this) + this.chooseWorkerNode.bind(this) this.internalExecute.bind(this) this.checkAndEmitFull.bind(this) this.checkAndEmitBusy.bind(this) @@ -152,13 +152,15 @@ export abstract class AbstractPool< } /** - * Gets the given worker key. + * Gets the given worker its worker node key. * * @param worker - The worker. - * @returns The worker key if the worker is found in the pool, `-1` otherwise. + * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise. */ - private getWorkerKey (worker: Worker): number { - return this.workers.findIndex(workerItem => workerItem.worker === worker) + private getWorkerNodeKey (worker: Worker): number { + return this.workerNodes.findIndex( + workerNode => workerNode.worker === worker + ) } /** @inheritDoc */ @@ -167,16 +169,21 @@ export abstract class AbstractPool< ): void { this.checkValidWorkerChoiceStrategy(workerChoiceStrategy) this.opts.workerChoiceStrategy = workerChoiceStrategy - for (const [index, workerItem] of this.workers.entries()) { - this.setWorker(index, workerItem.worker, { - run: 0, - running: 0, - runTime: 0, - runTimeHistory: new CircularArray(), - avgRunTime: 0, - medRunTime: 0, - error: 0 - }) + for (const [index, workerNode] of this.workerNodes.entries()) { + this.setWorkerNode( + index, + workerNode.worker, + { + run: 0, + running: 0, + runTime: 0, + runTimeHistory: new CircularArray(), + avgRunTime: 0, + medRunTime: 0, + error: 0 + }, + workerNode.tasksQueue + ) } this.workerChoiceStrategyContext.setWorkerChoiceStrategy( workerChoiceStrategy @@ -192,29 +199,37 @@ export abstract class AbstractPool< protected internalBusy (): boolean { return ( this.numberOfRunningTasks >= this.numberOfWorkers && - this.findFreeWorkerKey() === -1 + this.findFreeWorkerNodeKey() === -1 ) } /** @inheritDoc */ - public findFreeWorkerKey (): number { - return this.workers.findIndex(workerItem => { - return workerItem.tasksUsage.running === 0 + public findFreeWorkerNodeKey (): number { + return this.workerNodes.findIndex(workerNode => { + return workerNode.tasksUsage?.running === 0 }) } /** @inheritDoc */ public async execute (data: Data): Promise { - const [workerKey, worker] = this.chooseWorker() - const messageId = crypto.randomUUID() - const res = this.internalExecute(workerKey, worker, messageId) - this.checkAndEmitFull() - this.checkAndEmitBusy() - this.sendToWorker(worker, { + const [workerNodeKey, workerNode] = this.chooseWorkerNode() + const submittedTask: Task = { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), - id: messageId - }) + id: crypto.randomUUID() + } + const res = this.internalExecute(workerNodeKey, workerNode, submittedTask) + let currentTask: Task + // FIXME: Add sensible conditions to start tasks queuing on the worker node + if (this.tasksQueueLength(workerNodeKey) > 0) { + currentTask = this.dequeueTask(workerNodeKey) as Task + this.enqueueTask(workerNodeKey, submittedTask) + } else { + currentTask = submittedTask + } + this.sendToWorker(workerNode.worker, currentTask) + this.checkAndEmitFull() + this.checkAndEmitBusy() // eslint-disable-next-line @typescript-eslint/return-await return res } @@ -222,22 +237,21 @@ export abstract class AbstractPool< /** @inheritDoc */ public async destroy (): Promise { await Promise.all( - this.workers.map(async workerItem => { - await this.destroyWorker(workerItem.worker) + this.workerNodes.map(async workerNode => { + await this.destroyWorker(workerNode.worker) }) ) } /** - * Shutdowns given worker in the pool. + * Shutdowns the given worker. * - * @param worker - A worker within `workers`. + * @param worker - A worker within `workerNodes`. */ protected abstract destroyWorker (worker: Worker): void | Promise /** - * Setup hook that can be overridden by a Poolifier pool implementation - * to run code before workers are created in the abstract constructor. + * Setup hook to run code before worker node are created in the abstract constructor. * Can be overridden * * @virtual @@ -255,10 +269,10 @@ export abstract class AbstractPool< * Hook executed before the worker task promise resolution. * Can be overridden. * - * @param workerKey - The worker key. + * @param workerNodeKey - The worker node key. */ - protected beforePromiseResponseHook (workerKey: number): void { - ++this.workers[workerKey].tasksUsage.running + protected beforePromiseResponseHook (workerNodeKey: number): void { + ++this.workerNodes[workerNodeKey].tasksUsage.running } /** @@ -295,35 +309,35 @@ export abstract class AbstractPool< } /** - * Chooses a worker for the next task. + * Chooses a worker node for the next task. * * The default uses a round robin algorithm to distribute the load. * - * @returns [worker key, worker]. + * @returns [worker node key, worker node]. */ - protected chooseWorker (): [number, Worker] { - let workerKey: number + protected chooseWorkerNode (): [number, WorkerNode] { + let workerNodeKey: number if ( this.type === PoolType.DYNAMIC && !this.full && - this.findFreeWorkerKey() === -1 + this.findFreeWorkerNodeKey() === -1 ) { - const createdWorker = this.createAndSetupWorker() - this.registerWorkerMessageListener(createdWorker, message => { + const workerCreated = this.createAndSetupWorker() + this.registerWorkerMessageListener(workerCreated, message => { if ( isKillBehavior(KillBehaviors.HARD, message.kill) || (message.kill != null && - this.getWorkerTasksUsage(createdWorker)?.running === 0) + this.getWorkerTasksUsage(workerCreated)?.running === 0) ) { // Kill message received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void this.destroyWorker(createdWorker) + void this.destroyWorker(workerCreated) } }) - workerKey = this.getWorkerKey(createdWorker) + workerNodeKey = this.getWorkerNodeKey(workerCreated) } else { - workerKey = this.workerChoiceStrategyContext.execute() + workerNodeKey = this.workerChoiceStrategyContext.execute() } - return [workerKey, this.workers[workerKey].worker] + return [workerNodeKey, this.workerNodes[workerNodeKey]] } /** @@ -338,7 +352,7 @@ export abstract class AbstractPool< ): void /** - * Registers a listener callback on a given worker. + * Registers a listener callback on the given worker. * * @param worker - The worker which should register a listener. * @param listener - The message listener callback. @@ -353,17 +367,16 @@ export abstract class AbstractPool< protected abstract createWorker (): Worker /** - * Function that can be hooked up when a worker has been newly created and moved to the workers registry. + * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. * * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default. * * @param worker - The newly created worker. - * @virtual */ protected abstract afterWorkerSetup (worker: Worker): void /** - * Creates a new worker for this pool and sets it up completely. + * Creates a new worker and sets it up completely in the pool worker nodes. * * @returns New, completely set up worker. */ @@ -375,18 +388,10 @@ export abstract class AbstractPool< worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { - this.removeWorker(worker) + this.removeWorkerNode(worker) }) - this.pushWorker(worker, { - run: 0, - running: 0, - runTime: 0, - runTimeHistory: new CircularArray(), - avgRunTime: 0, - medRunTime: 0, - error: 0 - }) + this.pushWorkerNode(worker) this.afterWorkerSetup(worker) @@ -417,13 +422,17 @@ export abstract class AbstractPool< } private async internalExecute ( - workerKey: number, - worker: Worker, - messageId: string + workerNodeKey: number, + workerNode: WorkerNode, + task: Task ): Promise { - this.beforePromiseResponseHook(workerKey) + this.beforePromiseResponseHook(workerNodeKey) return await new Promise((resolve, reject) => { - this.promiseResponseMap.set(messageId, { resolve, reject, worker }) + this.promiseResponseMap.set(task.id, { + resolve, + reject, + worker: workerNode.worker + }) }) } @@ -444,58 +453,82 @@ export abstract class AbstractPool< } /** - * Gets the given worker tasks usage in the pool. + * Gets the given worker its tasks usage in the pool. * * @param worker - The worker. * @returns The worker tasks usage. */ private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined { - const workerKey = this.getWorkerKey(worker) - if (workerKey !== -1) { - return this.workers[workerKey].tasksUsage + const workerNodeKey = this.getWorkerNodeKey(worker) + if (workerNodeKey !== -1) { + return this.workerNodes[workerNodeKey].tasksUsage } - throw new Error('Worker could not be found in the pool') + throw new Error('Worker could not be found in the pool worker nodes') } /** - * Pushes the given worker in the pool. + * Pushes the given worker in the pool worker nodes. * * @param worker - The worker. - * @param tasksUsage - The worker tasks usage. + * @returns The worker nodes length. */ - private pushWorker (worker: Worker, tasksUsage: TasksUsage): void { - this.workers.push({ + private pushWorkerNode (worker: Worker): number { + return this.workerNodes.push({ worker, - tasksUsage + tasksUsage: { + run: 0, + running: 0, + runTime: 0, + runTimeHistory: new CircularArray(), + avgRunTime: 0, + medRunTime: 0, + error: 0 + }, + tasksQueue: [] }) } /** - * Sets the given worker in the pool. + * Sets the given worker in the pool worker nodes. * - * @param workerKey - The worker key. + * @param workerNodeKey - The worker node key. * @param worker - The worker. * @param tasksUsage - The worker tasks usage. + * @param tasksQueue - The worker task queue. */ - private setWorker ( - workerKey: number, + private setWorkerNode ( + workerNodeKey: number, worker: Worker, - tasksUsage: TasksUsage + tasksUsage: TasksUsage, + tasksQueue: Array> ): void { - this.workers[workerKey] = { + this.workerNodes[workerNodeKey] = { worker, - tasksUsage + tasksUsage, + tasksQueue } } /** - * Removes the given worker from the pool. + * Removes the given worker from the pool worker nodes. * - * @param worker - The worker that will be removed. + * @param worker - The worker. */ - protected removeWorker (worker: Worker): void { - const workerKey = this.getWorkerKey(worker) - this.workers.splice(workerKey, 1) - this.workerChoiceStrategyContext.remove(workerKey) + protected removeWorkerNode (worker: Worker): void { + const workerNodeKey = this.getWorkerNodeKey(worker) + this.workerNodes.splice(workerNodeKey, 1) + this.workerChoiceStrategyContext.remove(workerNodeKey) + } + + protected enqueueTask (workerNodeKey: number, task: Task): void { + this.workerNodes[workerNodeKey].tasksQueue.push(task) + } + + protected dequeueTask (workerNodeKey: number): Task | undefined { + return this.workerNodes[workerNodeKey].tasksQueue.shift() + } + + protected tasksQueueLength (workerNodeKey: number): number { + return this.workerNodes[workerNodeKey].tasksQueue.length } }