From ea7a90d36354a4e1c833271571c6f3eb80428600 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 11 Oct 2022 22:35:06 +0200 Subject: [PATCH] Reset all internal statistics at worker choice strategy change MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/index.ts | 1 + src/pools/abstract-pool-worker.ts | 23 --------- src/pools/abstract-pool.ts | 49 +++++++++++++------ src/pools/pool-internal.ts | 12 ++++- src/pools/pool-worker.ts | 6 +-- .../abstract-worker-choice-strategy.ts | 9 ++-- .../dynamic-pool-worker-choice-strategy.ts | 9 +++- .../fair-share-worker-choice-strategy.ts | 18 ++++--- ...ss-recently-used-worker-choice-strategy.ts | 9 +++- .../round-robin-worker-choice-strategy.ts | 9 +++- .../selection-strategies-types.ts | 16 +++--- .../selection-strategies-utils.ts | 4 +- ...hted-round-robin-worker-choice-strategy.ts | 15 ++++-- .../worker-choice-strategy-context.ts | 5 +- src/utility-types.ts | 4 +- src/worker/abstract-worker.ts | 2 +- src/worker/worker-options.ts | 2 +- 17 files changed, 114 insertions(+), 79 deletions(-) delete mode 100644 src/pools/abstract-pool-worker.ts diff --git a/src/index.ts b/src/index.ts index b0c419ca..6c139a27 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,6 +6,7 @@ export type { ErrorHandler, ExitHandler, IPoolWorker, + MessageHandler, OnlineHandler } from './pools/pool-worker' export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types' diff --git a/src/pools/abstract-pool-worker.ts b/src/pools/abstract-pool-worker.ts deleted file mode 100644 index a27d03ef..00000000 --- a/src/pools/abstract-pool-worker.ts +++ /dev/null @@ -1,23 +0,0 @@ -import type { - ErrorHandler, - ExitHandler, - IPoolWorker, - MessageHandler, - OnlineHandler -} from './pool-worker' - -/** - * Basic class that implement the minimum required for a pool worker. - */ -export abstract class AbstractPoolWorker implements IPoolWorker { - /** @inheritDoc */ - abstract on (event: 'message', handler: MessageHandler): void - /** @inheritDoc */ - abstract on (event: 'error', handler: ErrorHandler): void - /** @inheritDoc */ - abstract on (event: 'online', handler: OnlineHandler): void - /** @inheritDoc */ - abstract on (event: 'exit', handler: ExitHandler): void - /** @inheritDoc */ - abstract once (event: 'exit', handler: ExitHandler): void -} diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 369e5b90..c4181e76 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -4,10 +4,10 @@ import type { } from '../utility-types' import { EMPTY_FUNCTION } from '../utils' import { isKillBehavior, KillBehaviors } from '../worker/worker-options' -import type { AbstractPoolWorker } from './abstract-pool-worker' import type { PoolOptions } from './pool' import type { IPoolInternal, TasksUsage } from './pool-internal' import { PoolEmitter, PoolType } from './pool-internal' +import type { IPoolWorker } from './pool-worker' import { WorkerChoiceStrategies, WorkerChoiceStrategy @@ -18,27 +18,22 @@ const WORKER_NOT_FOUND_TASKS_USAGE_MAP = 'Worker could not be found in worker tasks usage map' /** - * Base class containing some shared logic for all poolifier pools. + * Base class that implements some shared logic for all poolifier pools. * * @template Worker Type of worker which manages this pool. * @template Data Type of data sent to the worker. This can only be serializable data. * @template Response Type of response of execution. This can only be serializable data. */ export abstract class AbstractPool< - Worker extends AbstractPoolWorker, + Worker extends IPoolWorker, Data = unknown, Response = unknown > implements IPoolInternal { /** @inheritDoc */ public readonly workers: Worker[] = [] - /** - * The workers tasks usage map. - * - * `key`: The `Worker` - * `value`: Worker tasks usage statistics. - */ - protected workersTasksUsage: Map = new Map< + /** @inheritDoc */ + public readonly workersTasksUsage: Map = new Map< Worker, TasksUsage >() @@ -182,6 +177,9 @@ export abstract class AbstractPool< workerChoiceStrategy: WorkerChoiceStrategy ): void { this.opts.workerChoiceStrategy = workerChoiceStrategy + for (const worker of this.workers) { + this.resetWorkerTasksUsage(worker) + } this.workerChoiceStrategyContext.setWorkerChoiceStrategy( workerChoiceStrategy ) @@ -355,12 +353,7 @@ export abstract class AbstractPool< this.workers.push(worker) // Init worker tasks usage map - this.workersTasksUsage.set(worker, { - run: 0, - running: 0, - runTime: 0, - avgRunTime: 0 - }) + this.initWorkerTasksUsage(worker) this.afterWorkerSetup(worker) @@ -469,6 +462,20 @@ export abstract class AbstractPool< } } + /** + * Initializes tasks usage statistics. + * + * @param worker The worker. + */ + initWorkerTasksUsage (worker: Worker): void { + this.workersTasksUsage.set(worker, { + run: 0, + running: 0, + runTime: 0, + avgRunTime: 0 + }) + } + /** * Removes worker tasks usage statistics. * @@ -477,4 +484,14 @@ export abstract class AbstractPool< private removeWorkerTasksUsage (worker: Worker): void { this.workersTasksUsage.delete(worker) } + + /** + * Resets worker tasks usage statistics. + * + * @param worker The worker. + */ + private resetWorkerTasksUsage (worker: Worker): void { + this.removeWorkerTasksUsage(worker) + this.initWorkerTasksUsage(worker) + } } diff --git a/src/pools/pool-internal.ts b/src/pools/pool-internal.ts index 2cf84adb..6e2850a8 100644 --- a/src/pools/pool-internal.ts +++ b/src/pools/pool-internal.ts @@ -1,6 +1,6 @@ import EventEmitter from 'events' -import type { AbstractPoolWorker } from './abstract-pool-worker' import type { IPool } from './pool' +import type { IPoolWorker } from './pool-worker' /** * Pool types. @@ -33,7 +33,7 @@ export class PoolEmitter extends EventEmitter {} * @template Response Type of response of execution. */ export interface IPoolInternal< - Worker extends AbstractPoolWorker, + Worker extends IPoolWorker, Data = unknown, Response = unknown > extends IPool { @@ -42,6 +42,14 @@ export interface IPoolInternal< */ readonly workers: Worker[] + /** + * The workers tasks usage map. + * + * `key`: The `Worker` + * `value`: Worker tasks usage statistics. + */ + readonly workersTasksUsage: Map + /** * Emitter on which events can be listened to. * diff --git a/src/pools/pool-worker.ts b/src/pools/pool-worker.ts index d46eda19..c89205ac 100644 --- a/src/pools/pool-worker.ts +++ b/src/pools/pool-worker.ts @@ -28,13 +28,9 @@ export type OnlineHandler = (this: Worker) => void export type ExitHandler = (this: Worker, code: number) => void /** - * Basic interface that describes the minimum required implementation of listener events for a pool worker. + * Interface that describes the minimum required implementation of listener events for a pool worker. */ export interface IPoolWorker { - /** - * Worker identifier. - */ - readonly id?: number /** * Register a listener to the message event. * diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index cd50f700..d624bdd2 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -1,6 +1,6 @@ -import type { AbstractPoolWorker } from '../abstract-pool-worker' import type { IPoolInternal } from '../pool-internal' import { PoolType } from '../pool-internal' +import type { IPoolWorker } from '../pool-worker' import type { IWorkerChoiceStrategy, RequiredStatistics @@ -14,12 +14,12 @@ import type { * @template Response Type of response of execution. This can only be serializable data. */ export abstract class AbstractWorkerChoiceStrategy< - Worker extends AbstractPoolWorker, + Worker extends IPoolWorker, Data, Response > implements IWorkerChoiceStrategy { /** @inheritDoc */ - public isDynamicPool: boolean = this.pool.type === PoolType.DYNAMIC + public readonly isDynamicPool: boolean = this.pool.type === PoolType.DYNAMIC /** @inheritDoc */ public requiredStatistics: RequiredStatistics = { runTime: false @@ -34,6 +34,9 @@ export abstract class AbstractWorkerChoiceStrategy< protected readonly pool: IPoolInternal ) {} + /** @inheritDoc */ + public abstract resetStatistics (): boolean + /** @inheritDoc */ public abstract choose (): Worker } diff --git a/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts b/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts index 68b48e32..731bb91b 100644 --- a/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts @@ -1,5 +1,5 @@ -import type { AbstractPoolWorker } from '../abstract-pool-worker' import type { IPoolInternal } from '../pool-internal' +import type { IPoolWorker } from '../pool-worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, @@ -16,7 +16,7 @@ import { SelectionStrategiesUtils } from './selection-strategies-utils' * @template Response Type of response of execution. This can only be serializable data. */ export class DynamicPoolWorkerChoiceStrategy< - Worker extends AbstractPoolWorker, + Worker extends IPoolWorker, Data, Response > extends AbstractWorkerChoiceStrategy { @@ -42,6 +42,11 @@ export class DynamicPoolWorkerChoiceStrategy< this.requiredStatistics = this.workerChoiceStrategy.requiredStatistics } + /** @inheritDoc */ + public resetStatistics (): boolean { + return this.workerChoiceStrategy.resetStatistics() + } + /** @inheritDoc */ public choose (): Worker { const freeWorker = this.pool.findFreeWorker() diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index 25a013ea..ee9351ec 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -1,4 +1,4 @@ -import type { AbstractPoolWorker } from '../abstract-pool-worker' +import type { IPoolWorker } from '../pool-worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { RequiredStatistics } from './selection-strategies-types' @@ -19,26 +19,32 @@ type WorkerVirtualTaskTimestamp = { * @template Response Type of response of execution. This can only be serializable data. */ export class FairShareWorkerChoiceStrategy< - Worker extends AbstractPoolWorker, + Worker extends IPoolWorker, Data, Response > extends AbstractWorkerChoiceStrategy { /** @inheritDoc */ - public requiredStatistics: RequiredStatistics = { + public readonly requiredStatistics: RequiredStatistics = { runTime: true } /** * Worker last virtual task execution timestamp. */ - private workerLastVirtualTaskTimestamp: Map< + private readonly workerLastVirtualTaskTimestamp: Map< Worker, WorkerVirtualTaskTimestamp > = new Map() + /** @inheritDoc */ + public resetStatistics (): boolean { + this.workerLastVirtualTaskTimestamp.clear() + return true + } + /** @inheritDoc */ public choose (): Worker { - this.updateWorkerLastVirtualTaskTimestamp() + this.computeWorkerLastVirtualTaskTimestamp() let minWorkerVirtualTaskEndTimestamp = Infinity let chosenWorker!: Worker for (const worker of this.pool.workers) { @@ -57,7 +63,7 @@ export class FairShareWorkerChoiceStrategy< /** * Computes workers last virtual task timestamp. */ - private updateWorkerLastVirtualTaskTimestamp () { + private computeWorkerLastVirtualTaskTimestamp () { for (const worker of this.pool.workers) { const workerVirtualTaskStartTimestamp = Math.max( Date.now(), diff --git a/src/pools/selection-strategies/less-recently-used-worker-choice-strategy.ts b/src/pools/selection-strategies/less-recently-used-worker-choice-strategy.ts index 03c6524d..0e2a2bf4 100644 --- a/src/pools/selection-strategies/less-recently-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-recently-used-worker-choice-strategy.ts @@ -1,4 +1,4 @@ -import type { AbstractPoolWorker } from '../abstract-pool-worker' +import type { IPoolWorker } from '../pool-worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' /** @@ -9,10 +9,15 @@ import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' * @template Response Type of response of execution. This can only be serializable data. */ export class LessRecentlyUsedWorkerChoiceStrategy< - Worker extends AbstractPoolWorker, + Worker extends IPoolWorker, Data, Response > extends AbstractWorkerChoiceStrategy { + /** @inheritDoc */ + public resetStatistics (): boolean { + return true + } + /** @inheritDoc */ public choose (): Worker { let minNumberOfRunningTasks = Infinity diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index 69bffea1..082c7d21 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -1,4 +1,4 @@ -import type { AbstractPoolWorker } from '../abstract-pool-worker' +import type { IPoolWorker } from '../pool-worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' /** @@ -9,7 +9,7 @@ import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' * @template Response Type of response of execution. This can only be serializable data. */ export class RoundRobinWorkerChoiceStrategy< - Worker extends AbstractPoolWorker, + Worker extends IPoolWorker, Data, Response > extends AbstractWorkerChoiceStrategy { @@ -18,6 +18,11 @@ export class RoundRobinWorkerChoiceStrategy< */ private nextWorkerIndex: number = 0 + /** @inheritDoc */ + public resetStatistics (): boolean { + return true + } + /** @inheritDoc */ public choose (): Worker { const chosenWorker = this.pool.workers[this.nextWorkerIndex] diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 5b844cc5..f875c290 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -1,4 +1,4 @@ -import type { AbstractPoolWorker } from '../abstract-pool-worker' +import type { IPoolWorker } from '../pool-worker' /** * Enumeration of worker choice strategies. @@ -28,7 +28,7 @@ export const WorkerChoiceStrategies = Object.freeze({ export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies /** - * Tasks usage statistics requirements. + * Pool tasks usage statistics requirements. */ export type RequiredStatistics = { runTime: boolean @@ -39,15 +39,19 @@ export type RequiredStatistics = { * * @template Worker Type of worker which manages the strategy. */ -export interface IWorkerChoiceStrategy { +export interface IWorkerChoiceStrategy { /** * Is the pool attached to the strategy dynamic?. */ - isDynamicPool: boolean + readonly isDynamicPool: boolean /** - * Required tasks usage statistics. + * Required pool tasks usage statistics. */ - requiredStatistics: RequiredStatistics + readonly requiredStatistics: RequiredStatistics + /** + * Resets strategy internal statistics. + */ + resetStatistics(): boolean /** * Chooses a worker in the pool. */ diff --git a/src/pools/selection-strategies/selection-strategies-utils.ts b/src/pools/selection-strategies/selection-strategies-utils.ts index 699bc719..f1f33d69 100644 --- a/src/pools/selection-strategies/selection-strategies-utils.ts +++ b/src/pools/selection-strategies/selection-strategies-utils.ts @@ -1,5 +1,5 @@ -import type { AbstractPoolWorker } from '../abstract-pool-worker' import type { IPoolInternal } from '../pool-internal' +import type { IPoolWorker } from '../pool-worker' import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy' import { LessRecentlyUsedWorkerChoiceStrategy } from './less-recently-used-worker-choice-strategy' import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy' @@ -22,7 +22,7 @@ export class SelectionStrategiesUtils { * @returns The worker choice strategy instance. */ public static getWorkerChoiceStrategy< - Worker extends AbstractPoolWorker, + Worker extends IPoolWorker, Data, Response > ( diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index ed869091..925f318c 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -1,6 +1,6 @@ import { cpus } from 'os' -import type { AbstractPoolWorker } from '../abstract-pool-worker' import type { IPoolInternal } from '../pool-internal' +import type { IPoolWorker } from '../pool-worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { RequiredStatistics } from './selection-strategies-types' @@ -21,12 +21,12 @@ type TaskRunTime = { * @template Response Type of response of execution. This can only be serializable data. */ export class WeightedRoundRobinWorkerChoiceStrategy< - Worker extends AbstractPoolWorker, + Worker extends IPoolWorker, Data, Response > extends AbstractWorkerChoiceStrategy { /** @inheritDoc */ - public requiredStatistics: RequiredStatistics = { + public readonly requiredStatistics: RequiredStatistics = { runTime: true } @@ -45,7 +45,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy< /** * Per worker virtual task runtime map. */ - private workersTaskRunTime: Map = new Map< + private readonly workersTaskRunTime: Map = new Map< Worker, TaskRunTime >() @@ -61,6 +61,13 @@ export class WeightedRoundRobinWorkerChoiceStrategy< this.initWorkersTaskRunTime() } + /** @inheritDoc */ + public resetStatistics (): boolean { + this.workersTaskRunTime.clear() + this.initWorkersTaskRunTime() + return true + } + /** @inheritDoc */ public choose (): Worker { const currentWorker = this.pool.workers[this.currentWorkerIndex] diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index a88a8eed..49144475 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -1,6 +1,6 @@ -import type { AbstractPoolWorker } from '../abstract-pool-worker' import type { IPoolInternal } from '../pool-internal' import { PoolType } from '../pool-internal' +import type { IPoolWorker } from '../pool-worker' import { DynamicPoolWorkerChoiceStrategy } from './dynamic-pool-worker-choice-strategy' import type { IWorkerChoiceStrategy, @@ -17,7 +17,7 @@ import { SelectionStrategiesUtils } from './selection-strategies-utils' * @template Response Type of response of execution. This can only be serializable data. */ export class WorkerChoiceStrategyContext< - Worker extends AbstractPoolWorker, + Worker extends IPoolWorker, Data, Response > { @@ -77,6 +77,7 @@ export class WorkerChoiceStrategyContext< public setWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy ): void { + this.workerChoiceStrategy?.resetStatistics() this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy( workerChoiceStrategy ) diff --git a/src/utility-types.ts b/src/utility-types.ts index 926b3280..7bcd076c 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,6 +1,6 @@ import type { Worker as ClusterWorker } from 'cluster' import type { MessagePort } from 'worker_threads' -import type { AbstractPoolWorker } from './pools/abstract-pool-worker' +import type { IPoolWorker } from './pools/pool-worker' import type { KillBehavior } from './worker/worker-options' /** @@ -50,7 +50,7 @@ export interface MessageValue< * @template Response Type of response of execution. This can only be serializable data. */ export interface PromiseWorkerResponseWrapper< - Worker extends AbstractPoolWorker, + Worker extends IPoolWorker, Response = unknown > { /** diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 9b170330..4aad8e62 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -10,7 +10,7 @@ const DEFAULT_MAX_INACTIVE_TIME = 1000 * 60 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT /** - * Base class containing some shared logic for all poolifier workers. + * Base class that implements some shared logic for all poolifier workers. * * @template MainWorker Type of main worker. * @template Data Type of data this worker receives from pool's execution. This can only be serializable data. diff --git a/src/worker/worker-options.ts b/src/worker/worker-options.ts index 608c3dea..c9a3cdfa 100644 --- a/src/worker/worker-options.ts +++ b/src/worker/worker-options.ts @@ -46,7 +46,7 @@ export interface WorkerOptions { * when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool. * - If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed. * - * @default 60.000 ms + * @default 60000 ms */ maxInactiveTime?: number /** -- 2.34.1