+import { AsyncResource } from 'node:async_hooks'
import { randomUUID } from 'node:crypto'
+import { EventEmitterAsyncResource } from 'node:events'
import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
-import { EventEmitterAsyncResource } from 'node:events'
-import { AsyncResource } from 'node:async_hooks'
+
+import { defaultBucketSize } from '../priority-queue.js'
import type {
MessageValue,
PromiseResponseWrapper,
- Task
-} from '../utility-types'
+ Task,
+ TaskFunctionProperties,
+} from '../utility-types.js'
import {
+ average,
+ buildTaskFunctionProperties,
DEFAULT_TASK_NAME,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
- average,
exponentialDelay,
isKillBehavior,
isPlainObject,
median,
min,
round,
- sleep
-} from '../utils'
-import { KillBehaviors } from '../worker/worker-options'
-import type { TaskFunction } from '../worker/task-functions'
+ sleep,
+} from '../utils.js'
+import type {
+ TaskFunction,
+ TaskFunctionObject,
+} from '../worker/task-functions.js'
+import { KillBehaviors } from '../worker/worker-options.js'
import {
type IPool,
PoolEvents,
type PoolOptions,
type PoolType,
PoolTypes,
- type TasksQueueOptions
-} from './pool'
-import type {
- IWorker,
- IWorkerNode,
- TaskStatistics,
- WorkerInfo,
- WorkerNodeEventDetail,
- WorkerType,
- WorkerUsage
-} from './worker'
+ type TasksQueueOptions,
+} from './pool.js'
import {
- type MeasurementStatisticsRequirements,
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
- type WorkerChoiceStrategyOptions
-} from './selection-strategies/selection-strategies-types'
-import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
-import { version } from './version'
-import { WorkerNode } from './worker-node'
+ type WorkerChoiceStrategyOptions,
+} from './selection-strategies/selection-strategies-types.js'
+import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
import {
checkFilePath,
+ checkValidPriority,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
- updateMeasurementStatistics
-} from './utils'
+ getDefaultTasksQueueOptions,
+ updateEluWorkerUsage,
+ updateRunTimeWorkerUsage,
+ updateTaskStatisticsWorkerUsage,
+ updateWaitTimeWorkerUsage,
+ waitWorkerNodeEvents,
+} from './utils.js'
+import { version } from './version.js'
+import type {
+ IWorker,
+ IWorkerNode,
+ WorkerInfo,
+ WorkerNodeEventDetail,
+ WorkerType,
+} from './worker.js'
+import { WorkerNode } from './worker-node.js'
/**
* Base class that implements some shared logic for all poolifier pools.
- *
* @typeParam Worker - Type of worker which manages this pool.
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
Response = unknown
> implements IPool<Worker, Data, Response> {
/** @inheritDoc */
- public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
+ public readonly workerNodes: IWorkerNode<Worker, Data>[] = []
/** @inheritDoc */
public emitter?: EventEmitterAsyncResource
- /**
- * Dynamic pool maximum size property placeholder.
- */
- protected readonly max?: number
-
/**
* The task execution response promise map:
* - `key`: The message id of each submitted task.
- * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
+ * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
*
* When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
*/
- protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
- new Map<string, PromiseResponseWrapper<Response>>()
+ protected promiseResponseMap: Map<
+ `${string}-${string}-${string}-${string}-${string}`,
+ PromiseResponseWrapper<Response>
+ > = new Map<
+ `${string}-${string}-${string}-${string}-${string}`,
+ PromiseResponseWrapper<Response>
+ >()
/**
- * Worker choice strategy context referencing a worker choice algorithm implementation.
+ * Worker choice strategies context referencing worker choice algorithms implementation.
*/
- protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
- Worker,
- Data,
- Response
+ protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
+ Worker,
+ Data,
+ Response
>
/**
* The task functions added at runtime map:
* - `key`: The task function name.
- * - `value`: The task function itself.
+ * - `value`: The task function object.
*/
- private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
+ private readonly taskFunctions: Map<
+ string,
+ TaskFunctionObject<Data, Response>
+ >
/**
* Whether the pool is started or not.
* Whether the pool is destroying or not.
*/
private destroying: boolean
+ /**
+ * Whether the minimum number of workers is starting or not.
+ */
+ private startingMinimumNumberOfWorkers: boolean
/**
* Whether the pool ready event has been emitted or not.
*/
/**
* The start timestamp of the pool.
*/
- private readonly startTimestamp
+ private startTimestamp?: number
/**
* Constructs a new poolifier pool.
- *
- * @param numberOfWorkers - Number of workers that this pool should manage.
+ * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
* @param filePath - Path to the worker file.
* @param opts - Options for the pool.
+ * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
*/
public constructor (
- protected readonly numberOfWorkers: number,
+ protected readonly minimumNumberOfWorkers: number,
protected readonly filePath: string,
- protected readonly opts: PoolOptions<Worker>
+ protected readonly opts: PoolOptions<Worker>,
+ protected readonly maximumNumberOfWorkers?: number
) {
if (!this.isMain()) {
throw new Error(
'Cannot start a pool from a worker with the same type as the pool'
)
}
+ this.checkPoolType()
checkFilePath(this.filePath)
- this.checkNumberOfWorkers(this.numberOfWorkers)
+ this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
this.checkPoolOptions(this.opts)
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
this.enqueueTask = this.enqueueTask.bind(this)
if (this.opts.enableEvents === true) {
- this.initializeEventEmitter()
+ this.initEventEmitter()
}
- this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
- Worker,
- Data,
- Response
+ this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
+ Worker,
+ Data,
+ Response
>(
this,
- this.opts.workerChoiceStrategy,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ [this.opts.workerChoiceStrategy!],
this.opts.workerChoiceStrategyOptions
)
this.setupHook()
- this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+ this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
this.started = false
this.starting = false
this.destroying = false
this.readyEventEmitted = false
+ this.startingMinimumNumberOfWorkers = false
if (this.opts.startWorkers === true) {
this.start()
}
+ }
- this.startTimestamp = performance.now()
+ private checkPoolType (): void {
+ if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
+ throw new Error(
+ 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+ )
+ }
}
- private checkNumberOfWorkers (numberOfWorkers: number): void {
- if (numberOfWorkers == null) {
+ private checkMinimumNumberOfWorkers (
+ minimumNumberOfWorkers: number | undefined
+ ): void {
+ if (minimumNumberOfWorkers == null) {
throw new Error(
'Cannot instantiate a pool without specifying the number of workers'
)
- } else if (!Number.isSafeInteger(numberOfWorkers)) {
+ } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
throw new TypeError(
'Cannot instantiate a pool with a non safe integer number of workers'
)
- } else if (numberOfWorkers < 0) {
+ } else if (minimumNumberOfWorkers < 0) {
throw new RangeError(
'Cannot instantiate a pool with a negative number of workers'
)
- } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
+ } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
throw new RangeError('Cannot instantiate a fixed pool with zero worker')
}
}
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
- checkValidWorkerChoiceStrategy(
- opts.workerChoiceStrategy as WorkerChoiceStrategy
- )
+ checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategyOptions(
- opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
+ opts.workerChoiceStrategyOptions
)
- this.opts.workerChoiceStrategyOptions = {
- ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
- ...opts.workerChoiceStrategyOptions
+ if (opts.workerChoiceStrategyOptions != null) {
+ this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
}
this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
- checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
+ checkValidTasksQueueOptions(opts.tasksQueueOptions)
this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
- opts.tasksQueueOptions as TasksQueueOptions
+ opts.tasksQueueOptions
)
}
} else {
}
private checkValidWorkerChoiceStrategyOptions (
- workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
): void {
if (
workerChoiceStrategyOptions != null &&
'Invalid worker choice strategy options: must be a plain object'
)
}
- if (
- workerChoiceStrategyOptions?.retries != null &&
- !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
- ) {
- throw new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- }
- if (
- workerChoiceStrategyOptions?.retries != null &&
- workerChoiceStrategyOptions.retries < 0
- ) {
- throw new RangeError(
- `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
- )
- }
if (
workerChoiceStrategyOptions?.weights != null &&
- Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
+ Object.keys(workerChoiceStrategyOptions.weights).length !==
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
) {
throw new Error(
'Invalid worker choice strategy options: must have a weight for each worker node'
}
}
- private initializeEventEmitter (): void {
+ private initEventEmitter (): void {
this.emitter = new EventEmitterAsyncResource({
- name: `poolifier:${this.type}-${this.worker}-pool`
+ name: `poolifier:${this.type}-${this.worker}-pool`,
})
}
worker: this.worker,
started: this.started,
ready: this.ready,
- strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
- minSize: this.minSize,
- maxSize: this.maxSize,
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .runTime.aggregate &&
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .waitTime.aggregate && { utilization: round(this.utilization) }),
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ defaultStrategy: this.opts.workerChoiceStrategy!,
+ strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0,
+ minSize: this.minimumNumberOfWorkers,
+ maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate === true &&
+ this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && {
+ utilization: round(this.utilization),
+ }),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
: accumulator,
0
),
+ ...(this.opts.enableTasksQueue === true && {
+ stealingWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.info.stealing ? accumulator + 1 : accumulator,
+ 0
+ ),
+ }),
busyWorkerNodes: this.workerNodes.reduce(
- (accumulator, workerNode) =>
- workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
+ (accumulator, _, workerNodeKey) =>
+ this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
0
),
executedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.queued,
0
- )
+ ),
}),
...(this.opts.enableTasksQueue === true && {
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
+ accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
0
- )
+ ),
}),
...(this.opts.enableTasksQueue === true && {
- backPressure: this.hasBackPressure()
+ backPressure: this.hasBackPressure(),
}),
...(this.opts.enableTasksQueue === true && {
stolenTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.stolen,
0
- )
+ ),
}),
failedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.failed,
0
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .runTime.aggregate && {
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate === true && {
runTime: {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime?.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.runTime.average && {
average: round(
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.runTime.history),
+ accumulator.concat(
+ workerNode.usage.runTime.history.toArray()
+ ),
[]
)
)
- )
+ ),
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.runTime.median && {
median: round(
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.runTime.history),
+ accumulator.concat(
+ workerNode.usage.runTime.history.toArray()
+ ),
[]
)
)
- )
- })
- }
+ ),
+ }),
+ },
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .waitTime.aggregate && {
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .waitTime.aggregate === true && {
waitTime: {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.average && {
average: round(
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.waitTime.history),
+ accumulator.concat(
+ workerNode.usage.waitTime.history.toArray()
+ ),
[]
)
)
- )
+ ),
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.median && {
median: round(
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.waitTime.history),
+ accumulator.concat(
+ workerNode.usage.waitTime.history.toArray()
+ ),
[]
)
)
- )
- })
- }
- })
+ ),
+ }),
+ },
+ }),
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .elu.aggregate === true && {
+ elu: {
+ idle: {
+ minimum: round(
+ min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.idle.minimum ??
+ Number.POSITIVE_INFINITY
+ )
+ )
+ ),
+ maximum: round(
+ max(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.idle.maximum ??
+ Number.NEGATIVE_INFINITY
+ )
+ )
+ ),
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .elu.average && {
+ average: round(
+ average(
+ this.workerNodes.reduce<number[]>(
+ (accumulator, workerNode) =>
+ accumulator.concat(
+ workerNode.usage.elu.idle.history.toArray()
+ ),
+ []
+ )
+ )
+ ),
+ }),
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .elu.median && {
+ median: round(
+ median(
+ this.workerNodes.reduce<number[]>(
+ (accumulator, workerNode) =>
+ accumulator.concat(
+ workerNode.usage.elu.idle.history.toArray()
+ ),
+ []
+ )
+ )
+ ),
+ }),
+ },
+ active: {
+ minimum: round(
+ min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.active.minimum ??
+ Number.POSITIVE_INFINITY
+ )
+ )
+ ),
+ maximum: round(
+ max(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.active.maximum ??
+ Number.NEGATIVE_INFINITY
+ )
+ )
+ ),
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .elu.average && {
+ average: round(
+ average(
+ this.workerNodes.reduce<number[]>(
+ (accumulator, workerNode) =>
+ accumulator.concat(
+ workerNode.usage.elu.active.history.toArray()
+ ),
+ []
+ )
+ )
+ ),
+ }),
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .elu.median && {
+ median: round(
+ median(
+ this.workerNodes.reduce<number[]>(
+ (accumulator, workerNode) =>
+ accumulator.concat(
+ workerNode.usage.elu.active.history.toArray()
+ ),
+ []
+ )
+ )
+ ),
+ }),
+ },
+ utilization: {
+ average: round(
+ average(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.elu.utilization ?? 0
+ )
+ )
+ ),
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.elu.utilization ?? 0
+ )
+ )
+ ),
+ },
+ },
+ }),
}
}
/**
- * The pool readiness boolean status.
+ * Whether the pool is ready or not.
+ * @returns The pool readiness boolean status.
*/
private get ready (): boolean {
+ if (this.empty) {
+ return false
+ }
return (
this.workerNodes.reduce(
(accumulator, workerNode) =>
? accumulator + 1
: accumulator,
0
- ) >= this.minSize
+ ) >= this.minimumNumberOfWorkers
)
}
+ /**
+ * Whether the pool is empty or not.
+ * @returns The pool emptiness boolean status.
+ */
+ protected get empty (): boolean {
+ return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
+ }
+
/**
* The approximate pool utilization.
- *
* @returns The pool utilization.
*/
private get utilization (): number {
+ if (this.startTimestamp == null) {
+ return 0
+ }
const poolTimeCapacity =
- (performance.now() - this.startTimestamp) * this.maxSize
+ (performance.now() - this.startTimestamp) *
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
+ accumulator + (workerNode.usage.runTime.aggregate ?? 0),
0
)
const totalTasksWaitTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
+ accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
0
)
return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
*/
protected abstract get worker (): WorkerType
- /**
- * The pool minimum size.
- */
- protected get minSize (): number {
- return this.numberOfWorkers
- }
-
- /**
- * The pool maximum size.
- */
- protected get maxSize (): number {
- return this.max ?? this.numberOfWorkers
- }
-
/**
* Checks if the worker id sent in the received message from a worker is valid.
- *
* @param message - The received message.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
*/
throw new Error('Worker message received without worker id')
} else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
throw new Error(
- `Worker message received from unknown worker '${message.workerId}'`
+ `Worker message received from unknown worker '${message.workerId.toString()}'`
)
}
}
- /**
- * Gets the given worker its worker node key.
- *
- * @param worker - The worker.
- * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
- */
- private getWorkerNodeKeyByWorker (worker: Worker): number {
- return this.workerNodes.findIndex(
- workerNode => workerNode.worker === worker
- )
- }
-
/**
* Gets the worker node key given its worker id.
- *
* @param workerId - The worker id.
* @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
*/
workerChoiceStrategy: WorkerChoiceStrategy,
workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
): void {
+ let requireSync = false
checkValidWorkerChoiceStrategy(workerChoiceStrategy)
- this.opts.workerChoiceStrategy = workerChoiceStrategy
- this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
- this.opts.workerChoiceStrategy
- )
if (workerChoiceStrategyOptions != null) {
- this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+ requireSync = !this.setWorkerChoiceStrategyOptions(
+ workerChoiceStrategyOptions
+ )
}
- for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
- workerNode.resetUsage()
- this.sendStatisticsMessageToWorker(workerNodeKey)
+ if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
+ this.opts.workerChoiceStrategy = workerChoiceStrategy
+ this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
+ this.opts.workerChoiceStrategy,
+ this.opts.workerChoiceStrategyOptions
+ )
+ requireSync = true
+ }
+ if (requireSync) {
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerChoiceStrategies(),
+ this.opts.workerChoiceStrategyOptions
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
}
}
/** @inheritDoc */
public setWorkerChoiceStrategyOptions (
- workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
- ): void {
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
+ ): boolean {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
- this.opts.workerChoiceStrategyOptions = {
- ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
- ...workerChoiceStrategyOptions
+ if (workerChoiceStrategyOptions != null) {
+ this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+ this.workerChoiceStrategiesContext?.setOptions(
+ this.opts.workerChoiceStrategyOptions
+ )
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerChoiceStrategies(),
+ this.opts.workerChoiceStrategyOptions
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
+ return true
}
- this.workerChoiceStrategyContext.setOptions(
- this.opts.workerChoiceStrategyOptions
- )
+ return false
}
/** @inheritDoc */
this.flushTasksQueues()
}
this.opts.enableTasksQueue = enable
- this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
+ this.setTasksQueueOptions(tasksQueueOptions)
}
/** @inheritDoc */
- public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
+ public setTasksQueueOptions (
+ tasksQueueOptions: TasksQueueOptions | undefined
+ ): void {
if (this.opts.enableTasksQueue === true) {
checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
- this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
if (this.opts.tasksQueueOptions.taskStealing === true) {
this.unsetTaskStealing()
this.setTaskStealing()
}
private buildTasksQueueOptions (
- tasksQueueOptions: TasksQueueOptions
+ tasksQueueOptions: TasksQueueOptions | undefined
): TasksQueueOptions {
return {
- ...{
- size: Math.pow(this.maxSize, 2),
- concurrency: 1,
- taskStealing: true,
- tasksStealingOnBackPressure: true
- },
- ...tasksQueueOptions
+ ...getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ),
+ ...tasksQueueOptions,
}
}
}
private setTaskStealing (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].on(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
- )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
}
}
private unsetTaskStealing (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].off(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
+ 'idle',
+ this.handleWorkerNodeIdleEvent
)
}
}
private setTasksStealingOnBackPressure (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
private unsetTasksStealingOnBackPressure (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].off(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
/**
* Whether the pool is full or not.
- *
- * The pool filling boolean status.
+ * @returns The pool fullness boolean status.
*/
protected get full (): boolean {
- return this.workerNodes.length >= this.maxSize
+ return (
+ this.workerNodes.length >=
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
+ )
}
/**
* Whether the pool is busy or not.
- *
- * The pool busyness boolean status.
+ * @returns The pool busyness boolean status.
*/
protected abstract get busy (): boolean
/**
* Whether worker nodes are executing concurrently their tasks quota or not.
- *
* @returns Worker nodes busyness boolean status.
*/
protected internalBusy (): boolean {
workerNode =>
workerNode.info.ready &&
workerNode.usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
) === -1
)
}
)
}
+ private isWorkerNodeBusy (workerNodeKey: number): boolean {
+ if (this.opts.enableTasksQueue === true) {
+ return (
+ this.workerNodes[workerNodeKey].usage.tasks.executing >=
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
+ )
+ }
+ return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
+ }
+
private async sendTaskFunctionOperationToWorker (
workerNodeKey: number,
message: MessageValue<Data>
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- const workerId = this.getWorkerInfo(workerNodeKey).id as number
+ const workerId = this.getWorkerInfo(workerNodeKey)?.id
if (
message.taskFunctionOperationStatus != null &&
message.workerId === workerId
) {
if (message.taskFunctionOperationStatus) {
resolve(true)
- } else if (!message.taskFunctionOperationStatus) {
+ } else {
reject(
new Error(
- `Task function operation '${
- message.taskFunctionOperation as string
- }' failed on worker ${message.workerId} with error: '${
- message.workerError?.message as string
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `Task function operation '${message.taskFunctionOperation?.toString()}' failed on worker ${message.workerId?.toString()} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ message.workerError?.message
}'`
)
)
new Error(
`Task function operation '${
message.taskFunctionOperation as string
- }' failed on worker ${
- errorResponse?.workerId as number
- } with error: '${
- errorResponse?.workerError?.message as string
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ }' failed on worker ${errorResponse?.workerId?.toString()} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ errorResponse?.workerError?.message
}'`
)
)
}
}
}
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.registerWorkerMessageListener(
workerNodeKey,
taskFunctionOperationsListener
/** @inheritDoc */
public hasTaskFunction (name: string): boolean {
- for (const workerNode of this.workerNodes) {
- if (
- Array.isArray(workerNode.info.taskFunctionNames) &&
- workerNode.info.taskFunctionNames.includes(name)
- ) {
- return true
- }
- }
- return false
+ return this.listTaskFunctionsProperties().some(
+ taskFunctionProperties => taskFunctionProperties.name === name
+ )
}
/** @inheritDoc */
public async addTaskFunction (
name: string,
- fn: TaskFunction<Data, Response>
+ fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
): Promise<boolean> {
if (typeof name !== 'string') {
throw new TypeError('name argument must be a string')
if (typeof name === 'string' && name.trim().length === 0) {
throw new TypeError('name argument must not be an empty string')
}
- if (typeof fn !== 'function') {
- throw new TypeError('fn argument must be a function')
+ if (typeof fn === 'function') {
+ fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
}
+ if (typeof fn.taskFunction !== 'function') {
+ throw new TypeError('taskFunction property must be a function')
+ }
+ checkValidPriority(fn.priority)
+ checkValidWorkerChoiceStrategy(fn.strategy)
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
- taskFunctionName: name,
- taskFunction: fn.toString()
+ taskFunctionProperties: buildTaskFunctionProperties(name, fn),
+ taskFunction: fn.taskFunction.toString(),
})
this.taskFunctions.set(name, fn)
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerChoiceStrategies()
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
return opResult
}
}
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'remove',
- taskFunctionName: name
+ taskFunctionProperties: buildTaskFunctionProperties(
+ name,
+ this.taskFunctions.get(name)
+ ),
})
- this.deleteTaskFunctionWorkerUsages(name)
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
this.taskFunctions.delete(name)
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerChoiceStrategies()
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
return opResult
}
/** @inheritDoc */
- public listTaskFunctionNames (): string[] {
+ public listTaskFunctionsProperties (): TaskFunctionProperties[] {
for (const workerNode of this.workerNodes) {
if (
- Array.isArray(workerNode.info.taskFunctionNames) &&
- workerNode.info.taskFunctionNames.length > 0
+ Array.isArray(workerNode.info.taskFunctionsProperties) &&
+ workerNode.info.taskFunctionsProperties.length > 0
) {
- return workerNode.info.taskFunctionNames
+ return workerNode.info.taskFunctionsProperties
}
}
return []
}
+ /**
+ * Gets task function worker choice strategy, if any.
+ * @param name - The task function name.
+ * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
+ */
+ private readonly getTaskFunctionWorkerChoiceStrategy = (
+ name?: string
+ ): WorkerChoiceStrategy | undefined => {
+ name = name ?? DEFAULT_TASK_NAME
+ const taskFunctionsProperties = this.listTaskFunctionsProperties()
+ if (name === DEFAULT_TASK_NAME) {
+ name = taskFunctionsProperties[1]?.name
+ }
+ return taskFunctionsProperties.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
+ }
+
+ /**
+ * Gets worker node task function worker choice strategy, if any.
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = (
+ workerNodeKey: number,
+ name?: string
+ ): WorkerChoiceStrategy | undefined => {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ return
+ }
+ name = name ?? DEFAULT_TASK_NAME
+ if (name === DEFAULT_TASK_NAME) {
+ name = workerInfo.taskFunctionsProperties?.[1]?.name
+ }
+ return workerInfo.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
+ }
+
+ /**
+ * Gets worker node task function priority, if any.
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionPriority = (
+ workerNodeKey: number,
+ name?: string
+ ): number | undefined => {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ return
+ }
+ name = name ?? DEFAULT_TASK_NAME
+ if (name === DEFAULT_TASK_NAME) {
+ name = workerInfo.taskFunctionsProperties?.[1]?.name
+ }
+ return workerInfo.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.priority
+ }
+
+ /**
+ * Gets the worker choice strategies registered in this pool.
+ * @returns The worker choice strategies.
+ */
+ private readonly getWorkerChoiceStrategies =
+ (): Set<WorkerChoiceStrategy> => {
+ return new Set([
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.workerChoiceStrategy!,
+ ...(this.listTaskFunctionsProperties()
+ .map(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.strategy
+ )
+ .filter(
+ (strategy: WorkerChoiceStrategy | undefined) => strategy != null
+ ) as WorkerChoiceStrategy[]),
+ ])
+ }
+
/** @inheritDoc */
public async setDefaultTaskFunction (name: string): Promise<boolean> {
return await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'default',
- taskFunctionName: name
+ taskFunctionProperties: buildTaskFunctionProperties(
+ name,
+ this.taskFunctions.get(name)
+ ),
})
}
- private deleteTaskFunctionWorkerUsages (name: string): void {
- for (const workerNode of this.workerNodes) {
- workerNode.deleteTaskFunctionWorkerUsage(name)
- }
- }
-
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
this.workerNodes[workerNodeKey].usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
)
}
public async execute (
data?: Data,
name?: string,
- transferList?: TransferListItem[]
+ transferList?: readonly TransferListItem[]
): Promise<Response> {
return await new Promise<Response>((resolve, reject) => {
if (!this.started) {
return
}
const timestamp = performance.now()
- const workerNodeKey = this.chooseWorkerNode()
+ const workerNodeKey = this.chooseWorkerNode(name)
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
- // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
+ priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
+ strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy(
+ workerNodeKey,
+ name
+ ),
transferList,
timestamp,
- taskId: randomUUID()
+ taskId: randomUUID(),
}
- this.promiseResponseMap.set(task.taskId as string, {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.promiseResponseMap.set(task.taskId!, {
resolve,
reject,
workerNodeKey,
...(this.emitter != null && {
asyncResource: new AsyncResource('poolifier:task', {
triggerAsyncId: this.emitter.asyncId,
- requireManualDestroy: true
- })
- })
+ requireManualDestroy: true,
+ }),
+ }),
})
if (
this.opts.enableTasksQueue === false ||
})
}
+ /** @inheritDoc */
+ public mapExecute (
+ data: Iterable<Data>,
+ name?: string,
+ transferList?: readonly TransferListItem[]
+ ): Promise<Response[]> {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (data == null) {
+ throw new TypeError('data argument must be a defined iterable')
+ }
+ if (typeof data[Symbol.iterator] !== 'function') {
+ throw new TypeError('data argument must be an iterable')
+ }
+ if (!Array.isArray(data)) {
+ data = [...data]
+ }
+ return Promise.all(
+ (data as Data[]).map(data => this.execute(data, name, transferList))
+ )
+ }
+
+ /**
+ * Starts the minimum number of workers.
+ * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false
+ */
+ private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
+ if (this.minimumNumberOfWorkers === 0) {
+ return
+ }
+ this.startingMinimumNumberOfWorkers = true
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.minimumNumberOfWorkers
+ ) {
+ const workerNodeKey = this.createAndSetupWorkerNode()
+ initWorkerNodeUsage &&
+ this.initWorkerNodeUsage(this.workerNodes[workerNodeKey])
+ }
+ this.startingMinimumNumberOfWorkers = false
+ }
+
/** @inheritdoc */
public start (): void {
if (this.started) {
throw new Error('Cannot start a destroying pool')
}
this.starting = true
- while (
- this.workerNodes.reduce(
- (accumulator, workerNode) =>
- !workerNode.info.dynamic ? accumulator + 1 : accumulator,
- 0
- ) < this.numberOfWorkers
- ) {
- this.createAndSetupWorkerNode()
- }
+ this.startMinimumNumberOfWorkers()
+ this.startTimestamp = performance.now()
this.starting = false
this.started = true
}
}
this.destroying = true
await Promise.all(
- this.workerNodes.map(async (_workerNode, workerNodeKey) => {
+ this.workerNodes.map(async (_, workerNodeKey) => {
await this.destroyWorkerNode(workerNodeKey)
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
- this.emitter?.removeAllListeners()
this.readyEventEmitted = false
+ delete this.startTimestamp
this.destroying = false
this.started = false
}
- protected async sendKillMessageToWorker (
- workerNodeKey: number
- ): Promise<void> {
+ private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (this.workerNodes[workerNodeKey] == null) {
+ resolve()
+ return
+ }
const killMessageListener = (message: MessageValue<Response>): void => {
this.checkMessageWorkerId(message)
if (message.kill === 'success') {
} else if (message.kill === 'failure') {
reject(
new Error(
- `Kill message handling failed on worker ${
- message.workerId as number
- }`
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `Kill message handling failed on worker ${message.workerId?.toString()}`
)
)
}
/**
* Terminates the worker node given its worker node key.
- *
* @param workerNodeKey - The worker node key.
*/
- protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
+ protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+ this.flagWorkerNodeAsNotReady(workerNodeKey)
+ const flushedTasks = this.flushTasksQueue(workerNodeKey)
+ const workerNode = this.workerNodes[workerNodeKey]
+ await waitWorkerNodeEvents(
+ workerNode,
+ 'taskFinished',
+ flushedTasks,
+ this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
+ getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ).tasksFinishedTimeout
+ )
+ await this.sendKillMessageToWorker(workerNodeKey)
+ await workerNode.terminate()
+ }
/**
* Setup hook to execute code before worker nodes are created in the abstract constructor.
* Can be overridden.
- *
- * @virtual
*/
protected setupHook (): void {
/* Intentionally empty */
}
/**
- * Should return whether the worker is the main worker or not.
+ * Returns whether the worker is the main worker or not.
+ * @returns `true` if the worker is the main worker, `false` otherwise.
*/
protected abstract isMain (): boolean
/**
* Hook executed before the worker task execution.
* Can be overridden.
- *
* @param workerNodeKey - The worker node key.
* @param task - The task to execute.
*/
workerNodeKey: number,
task: Task<Data>
): void {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
- this.updateWaitTimeWorkerUsage(workerUsage, task)
+ updateWaitTimeWorkerUsage(
+ this.workerChoiceStrategiesContext,
+ workerUsage,
+ task
+ )
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
- this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
- task.name as string
- ) != null
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
+ null
) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const taskFunctionWorkerUsage = this.workerNodes[
workerNodeKey
- ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(task.name!)!
++taskFunctionWorkerUsage.tasks.executing
- this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
+ updateWaitTimeWorkerUsage(
+ this.workerChoiceStrategiesContext,
+ taskFunctionWorkerUsage,
+ task
+ )
}
}
/**
* Hook executed after the worker task execution.
* Can be overridden.
- *
* @param workerNodeKey - The worker node key.
* @param message - The received message.
*/
workerNodeKey: number,
message: MessageValue<Response>
): void {
+ let needWorkerChoiceStrategiesUpdate = false
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
- this.updateTaskStatisticsWorkerUsage(workerUsage, message)
- this.updateRunTimeWorkerUsage(workerUsage, message)
- this.updateEluWorkerUsage(workerUsage, message)
+ updateTaskStatisticsWorkerUsage(workerUsage, message)
+ updateRunTimeWorkerUsage(
+ this.workerChoiceStrategiesContext,
+ workerUsage,
+ message
+ )
+ updateEluWorkerUsage(
+ this.workerChoiceStrategiesContext,
+ workerUsage,
+ message
+ )
+ needWorkerChoiceStrategiesUpdate = true
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
- message.taskPerformance?.name as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ message.taskPerformance!.name
) != null
) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const taskFunctionWorkerUsage = this.workerNodes[
workerNodeKey
- ].getTaskFunctionWorkerUsage(
- message.taskPerformance?.name as string
- ) as WorkerUsage
- this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
- this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
- this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
+ updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
+ updateRunTimeWorkerUsage(
+ this.workerChoiceStrategiesContext,
+ taskFunctionWorkerUsage,
+ message
+ )
+ updateEluWorkerUsage(
+ this.workerChoiceStrategiesContext,
+ taskFunctionWorkerUsage,
+ message
+ )
+ needWorkerChoiceStrategiesUpdate = true
+ }
+ if (needWorkerChoiceStrategiesUpdate) {
+ this.workerChoiceStrategiesContext?.update(workerNodeKey)
}
}
/**
* Whether the worker node shall update its task function worker usage or not.
- *
* @param workerNodeKey - The worker node key.
* @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
*/
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
workerInfo != null &&
- Array.isArray(workerInfo.taskFunctionNames) &&
- workerInfo.taskFunctionNames.length > 2
- )
- }
-
- private updateTaskStatisticsWorkerUsage (
- workerUsage: WorkerUsage,
- message: MessageValue<Response>
- ): void {
- const workerTaskStatistics = workerUsage.tasks
- if (
- workerTaskStatistics.executing != null &&
- workerTaskStatistics.executing > 0
- ) {
- --workerTaskStatistics.executing
- }
- if (message.workerError == null) {
- ++workerTaskStatistics.executed
- } else {
- ++workerTaskStatistics.failed
- }
- }
-
- private updateRunTimeWorkerUsage (
- workerUsage: WorkerUsage,
- message: MessageValue<Response>
- ): void {
- if (message.workerError != null) {
- return
- }
- updateMeasurementStatistics(
- workerUsage.runTime,
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
- message.taskPerformance?.runTime ?? 0
- )
- }
-
- private updateWaitTimeWorkerUsage (
- workerUsage: WorkerUsage,
- task: Task<Data>
- ): void {
- const timestamp = performance.now()
- const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
- updateMeasurementStatistics(
- workerUsage.waitTime,
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
- taskWaitTime
- )
- }
-
- private updateEluWorkerUsage (
- workerUsage: WorkerUsage,
- message: MessageValue<Response>
- ): void {
- if (message.workerError != null) {
- return
- }
- const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
- updateMeasurementStatistics(
- workerUsage.elu.active,
- eluTaskStatisticsRequirements,
- message.taskPerformance?.elu?.active ?? 0
- )
- updateMeasurementStatistics(
- workerUsage.elu.idle,
- eluTaskStatisticsRequirements,
- message.taskPerformance?.elu?.idle ?? 0
+ Array.isArray(workerInfo.taskFunctionsProperties) &&
+ workerInfo.taskFunctionsProperties.length > 2
)
- if (eluTaskStatisticsRequirements.aggregate) {
- if (message.taskPerformance?.elu != null) {
- if (workerUsage.elu.utilization != null) {
- workerUsage.elu.utilization =
- (workerUsage.elu.utilization +
- message.taskPerformance.elu.utilization) /
- 2
- } else {
- workerUsage.elu.utilization = message.taskPerformance.elu.utilization
- }
- }
- }
}
/**
* Chooses a worker node for the next task.
- *
- * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
- *
- * @returns The chosen worker node key
+ * @param name - The task function name.
+ * @returns The chosen worker node key.
*/
- private chooseWorkerNode (): number {
+ private chooseWorkerNode (name?: string): number {
if (this.shallCreateDynamicWorker()) {
const workerNodeKey = this.createAndSetupDynamicWorkerNode()
if (
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
+ true
) {
return workerNodeKey
}
}
- return this.workerChoiceStrategyContext.execute()
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ return this.workerChoiceStrategiesContext!.execute(
+ this.getTaskFunctionWorkerChoiceStrategy(name)
+ )
}
/**
* Conditions for dynamic worker creation.
- *
* @returns Whether to create a dynamic worker or not.
*/
- private shallCreateDynamicWorker (): boolean {
- return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
- }
+ protected abstract shallCreateDynamicWorker (): boolean
/**
* Sends a message to worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param message - The message.
* @param transferList - The optional array of transferable objects.
protected abstract sendToWorker (
workerNodeKey: number,
message: MessageValue<Data>,
- transferList?: TransferListItem[]
+ transferList?: readonly TransferListItem[]
): void
/**
- * Creates a new worker.
- *
- * @returns Newly created worker.
+ * Initializes the worker node usage with sensible default values gathered during runtime.
+ * @param workerNode - The worker node.
*/
- protected abstract createWorker (): Worker
+ private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate === true
+ ) {
+ workerNode.usage.runTime.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY
+ )
+ )
+ }
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .waitTime.aggregate === true
+ ) {
+ workerNode.usage.waitTime.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY
+ )
+ )
+ }
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
+ .aggregate === true
+ ) {
+ workerNode.usage.elu.active.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY
+ )
+ )
+ }
+ }
/**
* Creates a new, completely set up worker node.
- *
* @returns New, completely set up worker node key.
*/
protected createAndSetupWorkerNode (): number {
- const worker = this.createWorker()
-
- worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
- worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
- worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
- worker.on('error', error => {
- const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
- this.flagWorkerNodeAsNotReady(workerNodeKey)
- const workerInfo = this.getWorkerInfo(workerNodeKey)
+ const workerNode = this.createWorkerNode()
+ workerNode.registerWorkerEventHandler(
+ 'online',
+ this.opts.onlineHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerWorkerEventHandler(
+ 'message',
+ this.opts.messageHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerWorkerEventHandler(
+ 'error',
+ this.opts.errorHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
+ workerNode.info.ready = false
this.emitter?.emit(PoolEvents.error, error)
- this.workerNodes[workerNodeKey].closeChannel()
if (
this.started &&
- !this.starting &&
!this.destroying &&
this.opts.restartWorkerOnError === true
) {
- if (workerInfo.dynamic) {
+ if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
- } else {
- this.createAndSetupWorkerNode()
+ } else if (!this.startingMinimumNumberOfWorkers) {
+ this.startMinimumNumberOfWorkers(true)
}
}
- if (this.started && this.opts.enableTasksQueue === true) {
- this.redistributeQueuedTasks(workerNodeKey)
+ if (
+ this.started &&
+ !this.destroying &&
+ this.opts.enableTasksQueue === true
+ ) {
+ this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode?.terminate().catch((error: unknown) => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
})
- worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
- worker.once('exit', () => {
- this.removeWorkerNode(worker)
+ workerNode.registerWorkerEventHandler(
+ 'exit',
+ this.opts.exitHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerOnceWorkerEventHandler('exit', () => {
+ this.removeWorkerNode(workerNode)
+ if (
+ this.started &&
+ !this.startingMinimumNumberOfWorkers &&
+ !this.destroying
+ ) {
+ this.startMinimumNumberOfWorkers(true)
+ }
})
-
- const workerNodeKey = this.addWorkerNode(worker)
-
+ const workerNodeKey = this.addWorkerNode(workerNode)
this.afterWorkerNodeSetup(workerNodeKey)
-
return workerNodeKey
}
/**
* Creates a new, completely set up dynamic worker node.
- *
* @returns New, completely set up dynamic worker node key.
*/
protected createAndSetupDynamicWorkerNode (): number {
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
- const workerUsage = this.workerNodes[localWorkerNodeKey].usage
+ const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
+ const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
// Kill message received from worker
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
((this.opts.enableTasksQueue === false &&
workerUsage.tasks.executing === 0) ||
(this.opts.enableTasksQueue === true &&
+ workerInfo != null &&
+ !workerInfo.stealing &&
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
// Flag the worker node as not ready immediately
this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
- this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
+ this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
}
})
- const workerInfo = this.getWorkerInfo(workerNodeKey)
this.sendToWorker(workerNodeKey, {
- checkActive: true
+ checkActive: true,
})
if (this.taskFunctions.size > 0) {
- for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+ for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
this.sendTaskFunctionOperationToWorker(workerNodeKey, {
taskFunctionOperation: 'add',
- taskFunctionName,
- taskFunction: taskFunction.toString()
- }).catch(error => {
+ taskFunctionProperties: buildTaskFunctionProperties(
+ taskFunctionName,
+ taskFunctionObject
+ ),
+ taskFunction: taskFunctionObject.taskFunction.toString(),
+ }).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
}
}
- workerInfo.dynamic = true
+ const workerNode = this.workerNodes[workerNodeKey]
+ workerNode.info.dynamic = true
if (
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady ===
+ true ||
+ this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
+ true
) {
- workerInfo.ready = true
+ workerNode.info.ready = true
}
+ this.initWorkerNodeUsage(workerNode)
this.checkAndEmitDynamicWorkerCreationEvents()
return workerNodeKey
}
/**
* Registers a listener callback on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param listener - The message listener callback.
*/
/**
* Registers once a listener callback on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param listener - The message listener callback.
*/
/**
* Deregisters a listener callback on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param listener - The message listener callback.
*/
/**
* Method hooked up after a worker node has been newly created.
* Can be overridden.
- *
* @param workerNodeKey - The newly created worker node key.
*/
protected afterWorkerNodeSetup (workerNodeKey: number): void {
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
this.workerNodes[workerNodeKey].on(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
+ 'idle',
+ this.handleWorkerNodeIdleEvent
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
/**
* Sends the startup message to worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
*/
protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
/**
* Sends the statistics message to worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
*/
private sendStatisticsMessageToWorker (workerNodeKey: number): void {
this.sendToWorker(workerNodeKey, {
statistics: {
runTime:
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .runTime.aggregate,
- elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .elu.aggregate
- }
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate ?? false,
+ elu:
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .elu.aggregate ?? false,
+ },
})
}
- private redistributeQueuedTasks (workerNodeKey: number): void {
- if (this.workerNodes.length <= 1) {
+ private cannotStealTask (): boolean {
+ return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
+ }
+
+ private handleTask (workerNodeKey: number, task: Task<Data>): void {
+ if (this.shallExecuteTask(workerNodeKey)) {
+ this.executeTask(workerNodeKey, task)
+ } else {
+ this.enqueueTask(workerNodeKey, task)
+ }
+ }
+
+ private redistributeQueuedTasks (sourceWorkerNodeKey: number): void {
+ if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) {
return
}
- while (this.tasksQueueSize(workerNodeKey) > 0) {
+ while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) {
const destinationWorkerNodeKey = this.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
- return workerNode.info.ready &&
+ return sourceWorkerNodeKey !== workerNodeKey &&
+ workerNode.info.ready &&
workerNode.usage.tasks.queued <
workerNodes[minWorkerNodeKey].usage.tasks.queued
? workerNodeKey
},
0
)
- const task = this.dequeueTask(workerNodeKey) as Task<Data>
- if (this.shallExecuteTask(destinationWorkerNodeKey)) {
- this.executeTask(destinationWorkerNodeKey, task)
- } else {
- this.enqueueTask(destinationWorkerNodeKey, task)
- }
+ this.handleTask(
+ destinationWorkerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.dequeueTask(sourceWorkerNodeKey)!
+ )
}
}
taskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
++workerNode.usage.tasks.stolen
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
- ++taskFunctionWorkerUsage.tasks.stolen
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ++workerNode.getTaskFunctionWorkerUsage(taskName)!.tasks.stolen
}
}
private updateTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number
+ workerNodeKey: number,
+ taskName: string,
+ previousTaskName?: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
++workerNode.usage.tasks.sequentiallyStolen
}
- }
-
- private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
- workerNodeKey: number,
- taskName: string
- ): void {
- const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
- ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
+ if (
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 ||
+ (previousTaskName != null &&
+ previousTaskName === taskName &&
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0)
+ ) {
+ ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) {
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ }
}
}
private resetTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number
+ workerNodeKey: number,
+ taskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
workerNode.usage.tasks.sequentiallyStolen = 0
}
- }
-
- private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
- workerNodeKey: number,
- taskName: string
- ): void {
- const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(
taskName
- ) as WorkerUsage
- taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ )!.tasks.sequentiallyStolen = 0
}
}
- private readonly handleIdleWorkerNodeEvent = (
+ private readonly handleWorkerNodeIdleEvent = (
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
- if (this.workerNodes.length <= 1) {
- return
- }
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
- 'WorkerNode event detail workerNodeKey attribute must be defined'
+ "WorkerNode event detail 'workerNodeKey' property must be defined"
+ )
+ }
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey.toString()}' not found in pool`
)
}
- const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
- previousStolenTask != null &&
- workerNodeTasksUsage.sequentiallyStolen > 0 &&
- (workerNodeTasksUsage.executing > 0 ||
- this.tasksQueueSize(workerNodeKey) > 0)
+ this.cannotStealTask() ||
+ (this.info.stealingWorkerNodes ?? 0) >
+ Math.floor(this.workerNodes.length / 2)
) {
- for (const taskName of this.workerNodes[workerNodeKey].info
- .taskFunctionNames as string[]) {
- this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ if (previousStolenTask != null) {
+ workerInfo.stealing = false
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
workerNodeKey,
- taskName
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ previousStolenTask.name!
)
}
- this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
- const stolenTask = this.workerNodeStealTask(workerNodeKey)
+ const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
- this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
- stolenTask != null
+ previousStolenTask != null &&
+ (workerNodeTasksUsage.executing > 0 ||
+ this.tasksQueueSize(workerNodeKey) > 0)
) {
- const taskFunctionTasksWorkerUsage = this.workerNodes[
- workerNodeKey
- ].getTaskFunctionWorkerUsage(stolenTask.name as string)
- ?.tasks as TaskStatistics
- if (
- taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
- (previousStolenTask != null &&
- previousStolenTask.name === stolenTask.name &&
- taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
- ) {
- this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- stolenTask.name as string
- )
- } else {
- this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- stolenTask.name as string
- )
- }
+ workerInfo.stealing = false
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ previousStolenTask.name!
+ )
+ return
+ }
+ workerInfo.stealing = true
+ const stolenTask = this.workerNodeStealTask(workerNodeKey)
+ if (stolenTask != null) {
+ this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ stolenTask.name!,
+ previousStolenTask?.name
+ )
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
- this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
+ this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
- .catch(EMPTY_FUNCTION)
+ .catch((error: unknown) => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
}
private readonly workerNodeStealTask = (
const sourceWorkerNode = workerNodes.find(
(sourceWorkerNode, sourceWorkerNodeKey) =>
sourceWorkerNode.info.ready &&
+ !sourceWorkerNode.info.stealing &&
sourceWorkerNodeKey !== workerNodeKey &&
sourceWorkerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
- const task = sourceWorkerNode.popTask() as Task<Data>
- if (this.shallExecuteTask(workerNodeKey)) {
- this.executeTask(workerNodeKey, task)
- } else {
- this.enqueueTask(workerNodeKey, task)
- }
- this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
- this.updateTaskStolenStatisticsWorkerUsage(
- workerNodeKey,
- task.name as string
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
+ this.handleTask(workerNodeKey, task)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
return task
}
}
- private readonly handleBackPressureEvent = (
+ private readonly handleWorkerNodeBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
- if (this.workerNodes.length <= 1) {
+ if (
+ this.cannotStealTask() ||
+ this.hasBackPressure() ||
+ (this.info.stealingWorkerNodes ?? 0) >
+ Math.floor(this.workerNodes.length / 2)
+ ) {
return
}
- const { workerId } = eventDetail
const sizeOffset = 1
- if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
return
}
+ const { workerId } = eventDetail
const sourceWorkerNode =
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
const workerNodes = this.workerNodes
if (
sourceWorkerNode.usage.tasks.queued > 0 &&
workerNode.info.ready &&
+ !workerNode.info.stealing &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
- (this.opts.tasksQueueOptions?.size as number) - sizeOffset
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.size! - sizeOffset
) {
- const task = sourceWorkerNode.popTask() as Task<Data>
- if (this.shallExecuteTask(workerNodeKey)) {
- this.executeTask(workerNodeKey, task)
- } else {
- this.enqueueTask(workerNodeKey, task)
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey.toString()}' not found in pool`
+ )
}
- this.updateTaskStolenStatisticsWorkerUsage(
- workerNodeKey,
- task.name as string
- )
+ workerInfo.stealing = true
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
+ this.handleTask(workerNodeKey, task)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
+ workerInfo.stealing = false
}
}
}
+ private setTasksQueuePriority (workerNodeKey: number): void {
+ this.workerNodes[workerNodeKey].setTasksQueuePriority(
+ this.getTasksQueuePriority()
+ )
+ }
+
/**
* This method is the message listener registered on each worker.
+ * @param message - The message received from the worker.
*/
protected readonly workerMessageListener = (
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- const { workerId, ready, taskId, taskFunctionNames } = message
- if (ready != null && taskFunctionNames != null) {
+ const { workerId, ready, taskId, taskFunctionsProperties } = message
+ if (ready != null && taskFunctionsProperties != null) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
+ } else if (taskFunctionsProperties != null) {
+ // Task function properties message received from worker
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo != null) {
+ workerInfo.taskFunctionsProperties = taskFunctionsProperties
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
+ }
} else if (taskId != null) {
// Task execution response received from worker
this.handleTaskExecutionResponse(message)
- } else if (taskFunctionNames != null) {
- // Task function names message received from worker
- this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- ).taskFunctionNames = taskFunctionNames
}
}
- private handleWorkerReadyResponse (message: MessageValue<Response>): void {
- const { workerId, ready, taskFunctionNames } = message
- if (ready === false) {
- throw new Error(`Worker ${workerId as number} failed to initialize`)
- }
- const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- )
- workerInfo.ready = ready as boolean
- workerInfo.taskFunctionNames = taskFunctionNames
+ private checkAndEmitReadyEvent (): void {
if (!this.readyEventEmitted && this.ready) {
- this.readyEventEmitted = true
this.emitter?.emit(PoolEvents.ready, this.info)
+ this.readyEventEmitted = true
+ }
+ }
+
+ private handleWorkerReadyResponse (message: MessageValue<Response>): void {
+ const { workerId, ready, taskFunctionsProperties } = message
+ if (ready == null || !ready) {
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ throw new Error(`Worker ${workerId?.toString()} failed to initialize`)
}
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerNode = this.workerNodes[workerNodeKey]
+ workerNode.info.ready = ready
+ workerNode.info.taskFunctionsProperties = taskFunctionsProperties
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
+ this.checkAndEmitReadyEvent()
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
const { workerId, taskId, workerError, data } = message
- const promiseResponse = this.promiseResponseMap.get(taskId as string)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const promiseResponse = this.promiseResponseMap.get(taskId!)
if (promiseResponse != null) {
const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
+ const workerNode = this.workerNodes[workerNodeKey]
if (workerError != null) {
this.emitter?.emit(PoolEvents.taskError, workerError)
asyncResource != null
}
asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
- this.workerChoiceStrategyContext.update(workerNodeKey)
- this.promiseResponseMap.delete(taskId as string)
- if (this.opts.enableTasksQueue === true) {
- const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.promiseResponseMap.delete(taskId!)
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode?.emit('taskFinished', taskId)
+ if (
+ this.opts.enableTasksQueue === true &&
+ !this.destroying &&
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode != null
+ ) {
+ const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
workerNodeTasksUsage.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
}
if (
workerNodeTasksUsage.executing === 0 &&
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
- workerId: workerId as number,
- workerNodeKey
+ workerNode.emit('idle', {
+ workerId,
+ workerNodeKey,
})
}
}
}
}
- private checkAndEmitDynamicWorkerCreationEvents (): void {
- if (this.type === PoolTypes.dynamic) {
- if (this.full) {
- this.emitter?.emit(PoolEvents.full, this.info)
- }
- }
- }
+ /**
+ * Emits dynamic worker creation events.
+ */
+ protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
/**
* Gets the worker information given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @returns The worker information.
*/
- protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
+ protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
return this.workerNodes[workerNodeKey]?.info
}
+ private getTasksQueuePriority (): boolean {
+ return this.listTaskFunctionsProperties().some(
+ taskFunctionProperties => taskFunctionProperties.priority != null
+ )
+ }
+
/**
- * Adds the given worker in the pool worker nodes.
- *
- * @param worker - The worker.
- * @returns The added worker node key.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
+ * Creates a worker node.
+ * @returns The created worker node.
*/
- private addWorkerNode (worker: Worker): number {
+ private createWorkerNode (): IWorkerNode<Worker, Data> {
const workerNode = new WorkerNode<Worker, Data>(
- worker,
- this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+ this.worker,
+ this.filePath,
+ {
+ env: this.opts.env,
+ workerOptions: this.opts.workerOptions,
+ tasksQueueBackPressureSize:
+ this.opts.tasksQueueOptions?.size ??
+ getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ).size,
+ tasksQueueBucketSize: defaultBucketSize,
+ tasksQueuePriority: this.getTasksQueuePriority(),
+ }
)
// Flag the worker node as ready at pool startup.
if (this.starting) {
workerNode.info.ready = true
}
+ return workerNode
+ }
+
+ /**
+ * Adds the given worker node in the pool worker nodes.
+ * @param workerNode - The worker node.
+ * @returns The added worker node key.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
+ */
+ private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
this.workerNodes.push(workerNode)
- const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey === -1) {
throw new Error('Worker added not found in worker nodes')
}
return workerNodeKey
}
+ private checkAndEmitEmptyEvent (): void {
+ if (this.empty) {
+ this.emitter?.emit(PoolEvents.empty, this.info)
+ this.readyEventEmitted = false
+ }
+ }
+
/**
- * Removes the given worker from the pool worker nodes.
- *
- * @param worker - The worker.
+ * Removes the worker node from the pool worker nodes.
+ * @param workerNode - The worker node.
*/
- private removeWorkerNode (worker: Worker): void {
- const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
+ const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey !== -1) {
this.workerNodes.splice(workerNodeKey, 1)
- this.workerChoiceStrategyContext.remove(workerNodeKey)
+ this.workerChoiceStrategiesContext?.remove(workerNodeKey)
}
+ this.checkAndEmitEmptyEvent()
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
- this.getWorkerInfo(workerNodeKey).ready = false
- }
-
- /** @inheritDoc */
- public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
- return (
- this.opts.enableTasksQueue === true &&
- this.workerNodes[workerNodeKey].hasBackPressure()
- )
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo != null) {
+ workerInfo.ready = false
+ }
}
private hasBackPressure (): boolean {
/**
* Executes the given task on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param task - The task to execute.
*/
return this.workerNodes[workerNodeKey].tasksQueueSize()
}
- protected flushTasksQueue (workerNodeKey: number): void {
+ protected flushTasksQueue (workerNodeKey: number): number {
+ let flushedTasks = 0
while (this.tasksQueueSize(workerNodeKey) > 0) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
+ ++flushedTasks
}
this.workerNodes[workerNodeKey].clearTasksQueue()
+ return flushedTasks
}
private flushTasksQueues (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.flushTasksQueue(workerNodeKey)
}
}