ErrorHandler,
ExitHandler,
IPoolWorker,
+ MessageHandler,
OnlineHandler
} from './pools/pool-worker'
export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types'
+++ /dev/null
-import type {
- ErrorHandler,
- ExitHandler,
- IPoolWorker,
- MessageHandler,
- OnlineHandler
-} from './pool-worker'
-
-/**
- * Basic class that implement the minimum required for a pool worker.
- */
-export abstract class AbstractPoolWorker implements IPoolWorker {
- /** @inheritDoc */
- abstract on (event: 'message', handler: MessageHandler<this>): void
- /** @inheritDoc */
- abstract on (event: 'error', handler: ErrorHandler<this>): void
- /** @inheritDoc */
- abstract on (event: 'online', handler: OnlineHandler<this>): void
- /** @inheritDoc */
- abstract on (event: 'exit', handler: ExitHandler<this>): void
- /** @inheritDoc */
- abstract once (event: 'exit', handler: ExitHandler<this>): void
-}
} from '../utility-types'
import { EMPTY_FUNCTION } from '../utils'
import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
-import type { AbstractPoolWorker } from './abstract-pool-worker'
import type { PoolOptions } from './pool'
import type { IPoolInternal, TasksUsage } from './pool-internal'
import { PoolEmitter, PoolType } from './pool-internal'
+import type { IPoolWorker } from './pool-worker'
import {
WorkerChoiceStrategies,
WorkerChoiceStrategy
'Worker could not be found in worker tasks usage map'
/**
- * Base class containing some shared logic for all poolifier pools.
+ * Base class that implements some shared logic for all poolifier pools.
*
* @template Worker Type of worker which manages this pool.
* @template Data Type of data sent to the worker. This can only be serializable data.
* @template Response Type of response of execution. This can only be serializable data.
*/
export abstract class AbstractPool<
- Worker extends AbstractPoolWorker,
+ Worker extends IPoolWorker,
Data = unknown,
Response = unknown
> implements IPoolInternal<Worker, Data, Response> {
/** @inheritDoc */
public readonly workers: Worker[] = []
- /**
- * The workers tasks usage map.
- *
- * `key`: The `Worker`
- * `value`: Worker tasks usage statistics.
- */
- protected workersTasksUsage: Map<Worker, TasksUsage> = new Map<
+ /** @inheritDoc */
+ public readonly workersTasksUsage: Map<Worker, TasksUsage> = new Map<
Worker,
TasksUsage
>()
workerChoiceStrategy: WorkerChoiceStrategy
): void {
this.opts.workerChoiceStrategy = workerChoiceStrategy
+ for (const worker of this.workers) {
+ this.resetWorkerTasksUsage(worker)
+ }
this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
workerChoiceStrategy
)
this.workers.push(worker)
// Init worker tasks usage map
- this.workersTasksUsage.set(worker, {
- run: 0,
- running: 0,
- runTime: 0,
- avgRunTime: 0
- })
+ this.initWorkerTasksUsage(worker)
this.afterWorkerSetup(worker)
}
}
+ /**
+ * Initializes tasks usage statistics.
+ *
+ * @param worker The worker.
+ */
+ initWorkerTasksUsage (worker: Worker): void {
+ this.workersTasksUsage.set(worker, {
+ run: 0,
+ running: 0,
+ runTime: 0,
+ avgRunTime: 0
+ })
+ }
+
/**
* Removes worker tasks usage statistics.
*
private removeWorkerTasksUsage (worker: Worker): void {
this.workersTasksUsage.delete(worker)
}
+
+ /**
+ * Resets worker tasks usage statistics.
+ *
+ * @param worker The worker.
+ */
+ private resetWorkerTasksUsage (worker: Worker): void {
+ this.removeWorkerTasksUsage(worker)
+ this.initWorkerTasksUsage(worker)
+ }
}
import EventEmitter from 'events'
-import type { AbstractPoolWorker } from './abstract-pool-worker'
import type { IPool } from './pool'
+import type { IPoolWorker } from './pool-worker'
/**
* Pool types.
* @template Response Type of response of execution.
*/
export interface IPoolInternal<
- Worker extends AbstractPoolWorker,
+ Worker extends IPoolWorker,
Data = unknown,
Response = unknown
> extends IPool<Data, Response> {
*/
readonly workers: Worker[]
+ /**
+ * The workers tasks usage map.
+ *
+ * `key`: The `Worker`
+ * `value`: Worker tasks usage statistics.
+ */
+ readonly workersTasksUsage: Map<Worker, TasksUsage>
+
/**
* Emitter on which events can be listened to.
*
export type ExitHandler<Worker> = (this: Worker, code: number) => void
/**
- * Basic interface that describes the minimum required implementation of listener events for a pool worker.
+ * Interface that describes the minimum required implementation of listener events for a pool worker.
*/
export interface IPoolWorker {
- /**
- * Worker identifier.
- */
- readonly id?: number
/**
* Register a listener to the message event.
*
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
import type { IPoolInternal } from '../pool-internal'
import { PoolType } from '../pool-internal'
+import type { IPoolWorker } from '../pool-worker'
import type {
IWorkerChoiceStrategy,
RequiredStatistics
* @template Response Type of response of execution. This can only be serializable data.
*/
export abstract class AbstractWorkerChoiceStrategy<
- Worker extends AbstractPoolWorker,
+ Worker extends IPoolWorker,
Data,
Response
> implements IWorkerChoiceStrategy<Worker> {
/** @inheritDoc */
- public isDynamicPool: boolean = this.pool.type === PoolType.DYNAMIC
+ public readonly isDynamicPool: boolean = this.pool.type === PoolType.DYNAMIC
/** @inheritDoc */
public requiredStatistics: RequiredStatistics = {
runTime: false
protected readonly pool: IPoolInternal<Worker, Data, Response>
) {}
+ /** @inheritDoc */
+ public abstract resetStatistics (): boolean
+
/** @inheritDoc */
public abstract choose (): Worker
}
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
import type { IPoolInternal } from '../pool-internal'
+import type { IPoolWorker } from '../pool-worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
* @template Response Type of response of execution. This can only be serializable data.
*/
export class DynamicPoolWorkerChoiceStrategy<
- Worker extends AbstractPoolWorker,
+ Worker extends IPoolWorker,
Data,
Response
> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
this.requiredStatistics = this.workerChoiceStrategy.requiredStatistics
}
+ /** @inheritDoc */
+ public resetStatistics (): boolean {
+ return this.workerChoiceStrategy.resetStatistics()
+ }
+
/** @inheritDoc */
public choose (): Worker {
const freeWorker = this.pool.findFreeWorker()
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolWorker } from '../pool-worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type { RequiredStatistics } from './selection-strategies-types'
* @template Response Type of response of execution. This can only be serializable data.
*/
export class FairShareWorkerChoiceStrategy<
- Worker extends AbstractPoolWorker,
+ Worker extends IPoolWorker,
Data,
Response
> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
/** @inheritDoc */
- public requiredStatistics: RequiredStatistics = {
+ public readonly requiredStatistics: RequiredStatistics = {
runTime: true
}
/**
* Worker last virtual task execution timestamp.
*/
- private workerLastVirtualTaskTimestamp: Map<
+ private readonly workerLastVirtualTaskTimestamp: Map<
Worker,
WorkerVirtualTaskTimestamp
> = new Map<Worker, WorkerVirtualTaskTimestamp>()
+ /** @inheritDoc */
+ public resetStatistics (): boolean {
+ this.workerLastVirtualTaskTimestamp.clear()
+ return true
+ }
+
/** @inheritDoc */
public choose (): Worker {
- this.updateWorkerLastVirtualTaskTimestamp()
+ this.computeWorkerLastVirtualTaskTimestamp()
let minWorkerVirtualTaskEndTimestamp = Infinity
let chosenWorker!: Worker
for (const worker of this.pool.workers) {
/**
* Computes workers last virtual task timestamp.
*/
- private updateWorkerLastVirtualTaskTimestamp () {
+ private computeWorkerLastVirtualTaskTimestamp () {
for (const worker of this.pool.workers) {
const workerVirtualTaskStartTimestamp = Math.max(
Date.now(),
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolWorker } from '../pool-worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
/**
* @template Response Type of response of execution. This can only be serializable data.
*/
export class LessRecentlyUsedWorkerChoiceStrategy<
- Worker extends AbstractPoolWorker,
+ Worker extends IPoolWorker,
Data,
Response
> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+ /** @inheritDoc */
+ public resetStatistics (): boolean {
+ return true
+ }
+
/** @inheritDoc */
public choose (): Worker {
let minNumberOfRunningTasks = Infinity
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolWorker } from '../pool-worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
/**
* @template Response Type of response of execution. This can only be serializable data.
*/
export class RoundRobinWorkerChoiceStrategy<
- Worker extends AbstractPoolWorker,
+ Worker extends IPoolWorker,
Data,
Response
> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
*/
private nextWorkerIndex: number = 0
+ /** @inheritDoc */
+ public resetStatistics (): boolean {
+ return true
+ }
+
/** @inheritDoc */
public choose (): Worker {
const chosenWorker = this.pool.workers[this.nextWorkerIndex]
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolWorker } from '../pool-worker'
/**
* Enumeration of worker choice strategies.
export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
/**
- * Tasks usage statistics requirements.
+ * Pool tasks usage statistics requirements.
*/
export type RequiredStatistics = {
runTime: boolean
*
* @template Worker Type of worker which manages the strategy.
*/
-export interface IWorkerChoiceStrategy<Worker extends AbstractPoolWorker> {
+export interface IWorkerChoiceStrategy<Worker extends IPoolWorker> {
/**
* Is the pool attached to the strategy dynamic?.
*/
- isDynamicPool: boolean
+ readonly isDynamicPool: boolean
/**
- * Required tasks usage statistics.
+ * Required pool tasks usage statistics.
*/
- requiredStatistics: RequiredStatistics
+ readonly requiredStatistics: RequiredStatistics
+ /**
+ * Resets strategy internal statistics.
+ */
+ resetStatistics(): boolean
/**
* Chooses a worker in the pool.
*/
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
import type { IPoolInternal } from '../pool-internal'
+import type { IPoolWorker } from '../pool-worker'
import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy'
import { LessRecentlyUsedWorkerChoiceStrategy } from './less-recently-used-worker-choice-strategy'
import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
* @returns The worker choice strategy instance.
*/
public static getWorkerChoiceStrategy<
- Worker extends AbstractPoolWorker,
+ Worker extends IPoolWorker,
Data,
Response
> (
import { cpus } from 'os'
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
import type { IPoolInternal } from '../pool-internal'
+import type { IPoolWorker } from '../pool-worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type { RequiredStatistics } from './selection-strategies-types'
* @template Response Type of response of execution. This can only be serializable data.
*/
export class WeightedRoundRobinWorkerChoiceStrategy<
- Worker extends AbstractPoolWorker,
+ Worker extends IPoolWorker,
Data,
Response
> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
/** @inheritDoc */
- public requiredStatistics: RequiredStatistics = {
+ public readonly requiredStatistics: RequiredStatistics = {
runTime: true
}
/**
* Per worker virtual task runtime map.
*/
- private workersTaskRunTime: Map<Worker, TaskRunTime> = new Map<
+ private readonly workersTaskRunTime: Map<Worker, TaskRunTime> = new Map<
Worker,
TaskRunTime
>()
this.initWorkersTaskRunTime()
}
+ /** @inheritDoc */
+ public resetStatistics (): boolean {
+ this.workersTaskRunTime.clear()
+ this.initWorkersTaskRunTime()
+ return true
+ }
+
/** @inheritDoc */
public choose (): Worker {
const currentWorker = this.pool.workers[this.currentWorkerIndex]
-import type { AbstractPoolWorker } from '../abstract-pool-worker'
import type { IPoolInternal } from '../pool-internal'
import { PoolType } from '../pool-internal'
+import type { IPoolWorker } from '../pool-worker'
import { DynamicPoolWorkerChoiceStrategy } from './dynamic-pool-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
* @template Response Type of response of execution. This can only be serializable data.
*/
export class WorkerChoiceStrategyContext<
- Worker extends AbstractPoolWorker,
+ Worker extends IPoolWorker,
Data,
Response
> {
public setWorkerChoiceStrategy (
workerChoiceStrategy: WorkerChoiceStrategy
): void {
+ this.workerChoiceStrategy?.resetStatistics()
this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy(
workerChoiceStrategy
)
import type { Worker as ClusterWorker } from 'cluster'
import type { MessagePort } from 'worker_threads'
-import type { AbstractPoolWorker } from './pools/abstract-pool-worker'
+import type { IPoolWorker } from './pools/pool-worker'
import type { KillBehavior } from './worker/worker-options'
/**
* @template Response Type of response of execution. This can only be serializable data.
*/
export interface PromiseWorkerResponseWrapper<
- Worker extends AbstractPoolWorker,
+ Worker extends IPoolWorker,
Response = unknown
> {
/**
const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT
/**
- * Base class containing some shared logic for all poolifier workers.
+ * Base class that implements some shared logic for all poolifier workers.
*
* @template MainWorker Type of main worker.
* @template Data Type of data this worker receives from pool's execution. This can only be serializable data.
* when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool.
* - If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed.
*
- * @default 60.000 ms
+ * @default 60000 ms
*/
maxInactiveTime?: number
/**