X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=48d0ed9afed3cd9071aaf1e854f598f4118810e1;hb=48487131ad37630e3021c3e4feee1b311d8bcd11;hp=fc18d6b6b4b0492b36fb8a168ec34fc5b96b0f6c;hpb=02fd3265f178909511619be1593c3ff9774c8685;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index fc18d6b6..48d0ed9a 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -10,8 +10,6 @@ import { round } from '../utils' import { KillBehaviors } from '../worker/worker-options' -import { CircularArray } from '../circular-array' -import { Queue } from '../queue' import { type IPool, PoolEmitter, @@ -20,16 +18,15 @@ import { type PoolOptions, type PoolType, PoolTypes, - type TasksQueueOptions, - type WorkerType, - WorkerTypes + type TasksQueueOptions } from './pool' import type { IWorker, + IWorkerNode, MessageHandler, Task, WorkerInfo, - WorkerNode, + WorkerType, WorkerUsage } from './worker' import { @@ -40,6 +37,7 @@ import { } from './selection-strategies/selection-strategies-types' import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' import { version } from './version' +import { WorkerNode } from './worker-node' /** * Base class that implements some shared logic for all poolifier pools. @@ -54,7 +52,7 @@ export abstract class AbstractPool< Response = unknown > implements IPool { /** @inheritDoc */ - public readonly workerNodes: Array> = [] + public readonly workerNodes: Array> = [] /** @inheritDoc */ public readonly emitter?: PoolEmitter @@ -301,31 +299,83 @@ export abstract class AbstractPool< ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime.aggregate && { runTime: { - minimum: Math.min( - ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.minimum ?? Infinity + minimum: round( + Math.min( + ...this.workerNodes.map( + workerNode => workerNode.usage.runTime?.minimum ?? Infinity + ) ) ), - maximum: Math.max( - ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.maximum ?? -Infinity + maximum: round( + Math.max( + ...this.workerNodes.map( + workerNode => workerNode.usage.runTime?.maximum ?? -Infinity + ) ) - ) + ), + average: round( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + (workerNode.usage.runTime?.aggregate ?? 0), + 0 + ) / + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + (workerNode.usage.tasks?.executed ?? 0), + 0 + ) + ), + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .runTime.median && { + median: round( + median( + this.workerNodes.map( + workerNode => workerNode.usage.runTime?.median ?? 0 + ) + ) + ) + }) } }), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.aggregate && { waitTime: { - minimum: Math.min( - ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.minimum ?? Infinity + minimum: round( + Math.min( + ...this.workerNodes.map( + workerNode => workerNode.usage.waitTime?.minimum ?? Infinity + ) ) ), - maximum: Math.max( - ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity + maximum: round( + Math.max( + ...this.workerNodes.map( + workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity + ) ) - ) + ), + average: round( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + (workerNode.usage.waitTime?.aggregate ?? 0), + 0 + ) / + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + (workerNode.usage.tasks?.executed ?? 0), + 0 + ) + ), + ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .waitTime.median && { + median: round( + median( + this.workerNodes.map( + workerNode => workerNode.usage.waitTime?.median ?? 0 + ) + ) + ) + }) } }) } @@ -411,10 +461,7 @@ export abstract class AbstractPool< this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) } for (const workerNode of this.workerNodes) { - this.setWorkerNodeTasksUsage( - workerNode, - this.getInitialWorkerUsage(workerNode.worker) - ) + workerNode.resetUsage() this.setWorkerStatistics(workerNode.worker) } } @@ -602,8 +649,9 @@ export abstract class AbstractPool< ): void { const workerTaskStatistics = workerUsage.tasks --workerTaskStatistics.executing - ++workerTaskStatistics.executed - if (message.taskError != null) { + if (message.taskError == null) { + ++workerTaskStatistics.executed + } else { ++workerTaskStatistics.failed } } @@ -633,8 +681,7 @@ export abstract class AbstractPool< workerUsage.tasks.executed !== 0 ) { workerUsage.runTime.average = - workerUsage.runTime.aggregate / - (workerUsage.tasks.executed - workerUsage.tasks.failed) + workerUsage.runTime.aggregate / workerUsage.tasks.executed } if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime @@ -673,8 +720,7 @@ export abstract class AbstractPool< workerUsage.tasks.executed !== 0 ) { workerUsage.waitTime.average = - workerUsage.waitTime.aggregate / - (workerUsage.tasks.executed - workerUsage.tasks.failed) + workerUsage.waitTime.aggregate / workerUsage.tasks.executed } if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements() @@ -730,17 +776,14 @@ export abstract class AbstractPool< .average && workerUsage.tasks.executed !== 0 ) { - const executedTasks = - workerUsage.tasks.executed - workerUsage.tasks.failed workerUsage.elu.idle.average = - workerUsage.elu.idle.aggregate / executedTasks + workerUsage.elu.idle.aggregate / workerUsage.tasks.executed workerUsage.elu.active.average = - workerUsage.elu.active.aggregate / executedTasks + workerUsage.elu.active.aggregate / workerUsage.tasks.executed } if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu - .median && - message.taskPerformance?.elu != null + .median ) { workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle) workerUsage.elu.active.history.push( @@ -838,31 +881,7 @@ export abstract class AbstractPool< this.emitter.emit(PoolEvents.error, error) } if (this.opts.enableTasksQueue === true) { - const workerNodeKey = this.getWorkerNodeKey(worker) - while (this.tasksQueueSize(workerNodeKey) > 0) { - let targetWorkerNodeKey: number = workerNodeKey - let minQueuedTasks = Infinity - for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - if ( - workerNodeId !== workerNodeKey && - workerNode.usage.tasks.queued === 0 - ) { - targetWorkerNodeKey = workerNodeId - break - } - if ( - workerNodeId !== workerNodeKey && - workerNode.usage.tasks.queued < minQueuedTasks - ) { - minQueuedTasks = workerNode.usage.tasks.queued - targetWorkerNodeKey = workerNodeId - } - } - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) - } + this.redistributeQueuedTasks(worker) } if (this.opts.restartWorkerOnError === true) { if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) { @@ -887,6 +906,34 @@ export abstract class AbstractPool< return worker } + private redistributeQueuedTasks (worker: Worker): void { + const workerNodeKey = this.getWorkerNodeKey(worker) + while (this.tasksQueueSize(workerNodeKey) > 0) { + let targetWorkerNodeKey: number = workerNodeKey + let minQueuedTasks = Infinity + for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { + if ( + workerNodeId !== workerNodeKey && + workerNode.usage.tasks.queued === 0 + ) { + targetWorkerNodeKey = workerNodeId + break + } + if ( + workerNodeId !== workerNodeKey && + workerNode.usage.tasks.queued < minQueuedTasks + ) { + minQueuedTasks = workerNode.usage.tasks.queued + targetWorkerNodeKey = workerNodeId + } + } + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } + } + /** * Creates a new dynamic worker and sets it up completely in the pool worker nodes. * @@ -910,6 +957,7 @@ export abstract class AbstractPool< void (this.destroyWorker(worker) as Promise) } }) + this.sendToWorker(worker, { checkAlive: true }) return worker } @@ -983,19 +1031,6 @@ export abstract class AbstractPool< } } - /** - * Sets the given worker node its tasks usage in the pool. - * - * @param workerNode - The worker node. - * @param workerUsage - The worker usage. - */ - private setWorkerNodeTasksUsage ( - workerNode: WorkerNode, - workerUsage: WorkerUsage - ): void { - workerNode.usage = workerUsage - } - /** * Gets the worker information. * @@ -1012,57 +1047,9 @@ export abstract class AbstractPool< * @returns The worker nodes length. */ private pushWorkerNode (worker: Worker): number { - this.workerNodes.push({ - worker, - info: this.getInitialWorkerInfo(worker), - usage: this.getInitialWorkerUsage(), - tasksQueue: new Queue>() - }) - this.setWorkerNodeTasksUsage( - this.workerNodes[this.getWorkerNodeKey(worker)], - this.getInitialWorkerUsage(worker) - ) - return this.workerNodes.length + return this.workerNodes.push(new WorkerNode(worker, this.worker)) } - /** - * Gets the worker id. - * - * @param worker - The worker. - * @returns The worker id. - */ - private getWorkerId (worker: Worker): number | undefined { - if (this.worker === WorkerTypes.thread) { - return worker.threadId - } else if (this.worker === WorkerTypes.cluster) { - return worker.id - } - } - - // /** - // * Sets the given worker in the pool worker nodes. - // * - // * @param workerNodeKey - The worker node key. - // * @param worker - The worker. - // * @param workerInfo - The worker info. - // * @param workerUsage - The worker usage. - // * @param tasksQueue - The worker task queue. - // */ - // private setWorkerNode ( - // workerNodeKey: number, - // worker: Worker, - // workerInfo: WorkerInfo, - // workerUsage: WorkerUsage, - // tasksQueue: Queue> - // ): void { - // this.workerNodes[workerNodeKey] = { - // worker, - // info: workerInfo, - // usage: workerUsage, - // tasksQueue - // } - // } - /** * Removes the given worker from the pool worker nodes. * @@ -1082,19 +1069,15 @@ export abstract class AbstractPool< } private enqueueTask (workerNodeKey: number, task: Task): number { - return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task) + return this.workerNodes[workerNodeKey].enqueueTask(task) } private dequeueTask (workerNodeKey: number): Task | undefined { - return this.workerNodes[workerNodeKey].tasksQueue.dequeue() + return this.workerNodes[workerNodeKey].dequeueTask() } private tasksQueueSize (workerNodeKey: number): number { - return this.workerNodes[workerNodeKey].tasksQueue.size - } - - private tasksMaxQueueSize (workerNodeKey: number): number { - return this.workerNodes[workerNodeKey].tasksQueue.maxSize + return this.workerNodes[workerNodeKey].tasksQueueSize() } private flushTasksQueue (workerNodeKey: number): void { @@ -1104,7 +1087,7 @@ export abstract class AbstractPool< this.dequeueTask(workerNodeKey) as Task ) } - this.workerNodes[workerNodeKey].tasksQueue.clear() + this.workerNodes[workerNodeKey].clearTasksQueue() } private flushTasksQueues (): void { @@ -1124,60 +1107,4 @@ export abstract class AbstractPool< } }) } - - private getInitialWorkerUsage (worker?: Worker): WorkerUsage { - const getTasksQueueSize = (worker?: Worker): number => { - if (worker == null) { - return 0 - } - // FIXME: Workaround tasks queue initialization race issue. - try { - return this.tasksQueueSize(this.getWorkerNodeKey(worker)) - } catch { - return 0 - } - } - const getTasksMaxQueueSize = (worker?: Worker): number => { - if (worker == null) { - return 0 - } - // FIXME: Workaround tasks queue initialization race issue. - try { - return this.tasksMaxQueueSize(this.getWorkerNodeKey(worker)) - } catch { - return 0 - } - } - return { - tasks: { - executed: 0, - executing: 0, - get queued (): number { - return getTasksQueueSize(worker) - }, - get maxQueued (): number { - return getTasksMaxQueueSize(worker) - }, - failed: 0 - }, - runTime: { - history: new CircularArray() - }, - waitTime: { - history: new CircularArray() - }, - elu: { - idle: { - history: new CircularArray() - }, - active: { - history: new CircularArray() - } - } - } - } - - private getInitialWorkerInfo (worker: Worker): WorkerInfo { - return { id: this.getWorkerId(worker), dynamic: false, started: true } - } }