+import { AsyncResource } from 'node:async_hooks'
import { randomUUID } from 'node:crypto'
+import { EventEmitterAsyncResource } from 'node:events'
import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
-import { EventEmitterAsyncResource } from 'node:events'
-import { AsyncResource } from 'node:async_hooks'
+
+import { defaultBucketSize } from '../priority-queue.js'
import type {
MessageValue,
PromiseResponseWrapper,
- Task
+ Task,
+ TaskFunctionProperties,
} from '../utility-types.js'
import {
+ average,
+ buildTaskFunctionProperties,
DEFAULT_TASK_NAME,
EMPTY_FUNCTION,
- average,
exponentialDelay,
isKillBehavior,
isPlainObject,
median,
min,
round,
- sleep
+ sleep,
} from '../utils.js'
+import type {
+ TaskFunction,
+ TaskFunctionObject,
+} from '../worker/task-functions.js'
import { KillBehaviors } from '../worker/worker-options.js'
-import type { TaskFunction } from '../worker/task-functions.js'
import {
type IPool,
PoolEvents,
type PoolOptions,
type PoolType,
PoolTypes,
- type TasksQueueOptions
+ type TasksQueueOptions,
} from './pool.js'
-import type {
- IWorker,
- IWorkerNode,
- WorkerInfo,
- WorkerNodeEventDetail,
- WorkerType
-} from './worker.js'
import {
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
- type WorkerChoiceStrategyOptions
+ type WorkerChoiceStrategyOptions,
} from './selection-strategies/selection-strategies-types.js'
-import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
-import { version } from './version.js'
-import { WorkerNode } from './worker-node.js'
+import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
import {
checkFilePath,
+ checkValidPriority,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
getDefaultTasksQueueOptions,
updateRunTimeWorkerUsage,
updateTaskStatisticsWorkerUsage,
updateWaitTimeWorkerUsage,
- waitWorkerNodeEvents
+ waitWorkerNodeEvents,
} from './utils.js'
+import { version } from './version.js'
+import type {
+ IWorker,
+ IWorkerNode,
+ WorkerInfo,
+ WorkerNodeEventDetail,
+ WorkerType,
+} from './worker.js'
+import { WorkerNode } from './worker-node.js'
/**
* Base class that implements some shared logic for all poolifier pools.
- *
* @typeParam Worker - Type of worker which manages this pool.
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
Response = unknown
> implements IPool<Worker, Data, Response> {
/** @inheritDoc */
- public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
+ public readonly workerNodes: IWorkerNode<Worker, Data>[] = []
/** @inheritDoc */
public emitter?: EventEmitterAsyncResource
/**
* The task execution response promise map:
* - `key`: The message id of each submitted task.
- * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
+ * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
*
* When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
*/
- protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
- new Map<string, PromiseResponseWrapper<Response>>()
+ protected promiseResponseMap: Map<
+ `${string}-${string}-${string}-${string}-${string}`,
+ PromiseResponseWrapper<Response>
+ > = new Map<
+ `${string}-${string}-${string}-${string}-${string}`,
+ PromiseResponseWrapper<Response>
+ >()
/**
- * Worker choice strategy context referencing a worker choice algorithm implementation.
+ * Worker choice strategies context referencing worker choice algorithms implementation.
*/
- protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
- Worker,
- Data,
- Response
+ protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
+ Worker,
+ Data,
+ Response
>
/**
* The task functions added at runtime map:
* - `key`: The task function name.
- * - `value`: The task function itself.
+ * - `value`: The task function object.
*/
- private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
+ private readonly taskFunctions: Map<
+ string,
+ TaskFunctionObject<Data, Response>
+ >
/**
* Whether the pool is started or not.
* Whether the pool is destroying or not.
*/
private destroying: boolean
+ /**
+ * Whether the minimum number of workers is starting or not.
+ */
+ private startingMinimumNumberOfWorkers: boolean
/**
* Whether the pool ready event has been emitted or not.
*/
/**
* The start timestamp of the pool.
*/
- private readonly startTimestamp
+ private startTimestamp?: number
/**
* Constructs a new poolifier pool.
- *
* @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
* @param filePath - Path to the worker file.
* @param opts - Options for the pool.
this.enqueueTask = this.enqueueTask.bind(this)
if (this.opts.enableEvents === true) {
- this.initializeEventEmitter()
+ this.initEventEmitter()
}
- this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
- Worker,
- Data,
- Response
+ this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
+ Worker,
+ Data,
+ Response
>(
this,
- this.opts.workerChoiceStrategy,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ [this.opts.workerChoiceStrategy!],
this.opts.workerChoiceStrategyOptions
)
this.setupHook()
- this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+ this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
this.started = false
this.starting = false
this.destroying = false
this.readyEventEmitted = false
+ this.startingMinimumNumberOfWorkers = false
if (this.opts.startWorkers === true) {
this.start()
}
-
- this.startTimestamp = performance.now()
}
private checkPoolType (): void {
}
}
- private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
+ private checkMinimumNumberOfWorkers (
+ minimumNumberOfWorkers: number | undefined
+ ): void {
if (minimumNumberOfWorkers == null) {
throw new Error(
'Cannot instantiate a pool without specifying the number of workers'
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy!)
+ checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategyOptions(
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- opts.workerChoiceStrategyOptions!
+ opts.workerChoiceStrategyOptions
)
if (opts.workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- checkValidTasksQueueOptions(opts.tasksQueueOptions!)
+ checkValidTasksQueueOptions(opts.tasksQueueOptions)
this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- opts.tasksQueueOptions!
+ opts.tasksQueueOptions
)
}
} else {
}
private checkValidWorkerChoiceStrategyOptions (
- workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
): void {
if (
workerChoiceStrategyOptions != null &&
}
}
- private initializeEventEmitter (): void {
+ private initEventEmitter (): void {
this.emitter = new EventEmitterAsyncResource({
- name: `poolifier:${this.type}-${this.worker}-pool`
+ name: `poolifier:${this.type}-${this.worker}-pool`,
})
}
started: this.started,
ready: this.ready,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- strategy: this.opts.workerChoiceStrategy!,
+ defaultStrategy: this.opts.workerChoiceStrategy!,
+ strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0,
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .runTime.aggregate &&
- this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .waitTime.aggregate && { utilization: round(this.utilization) }),
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate === true &&
+ this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && {
+ utilization: round(this.utilization),
+ }),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
(accumulator, workerNode) =>
workerNode.info.stealing ? accumulator + 1 : accumulator,
0
- )
+ ),
}),
busyWorkerNodes: this.workerNodes.reduce(
- (accumulator, _workerNode, workerNodeKey) =>
+ (accumulator, _, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
0
),
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.queued,
0
- )
+ ),
}),
...(this.opts.enableTasksQueue === true && {
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
+ accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
0
- )
+ ),
}),
...(this.opts.enableTasksQueue === true && {
- backPressure: this.hasBackPressure()
+ backPressure: this.hasBackPressure(),
}),
...(this.opts.enableTasksQueue === true && {
stolenTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.stolen,
0
- )
+ ),
}),
failedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.failed,
0
),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .runTime.aggregate && {
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate === true && {
runTime: {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime?.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.runTime.minimum ?? Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.runTime.maximum ?? Number.NEGATIVE_INFINITY
)
)
),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.runTime.average && {
average: round(
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.runTime.history),
+ accumulator.concat(
+ workerNode.usage.runTime.history.toArray()
+ ),
[]
)
)
- )
+ ),
}),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.runTime.median && {
median: round(
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.runTime.history),
+ accumulator.concat(
+ workerNode.usage.runTime.history.toArray()
+ ),
[]
)
)
- )
- })
- }
+ ),
+ }),
+ },
}),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .waitTime.aggregate && {
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .waitTime.aggregate === true && {
waitTime: {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
+ workerNode =>
+ workerNode.usage.waitTime.minimum ?? Number.POSITIVE_INFINITY
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
+ workerNode =>
+ workerNode.usage.waitTime.maximum ?? Number.NEGATIVE_INFINITY
)
)
),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.average && {
average: round(
average(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.waitTime.history),
+ accumulator.concat(
+ workerNode.usage.waitTime.history.toArray()
+ ),
[]
)
)
- )
+ ),
}),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.median && {
median: round(
median(
this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator.concat(workerNode.usage.waitTime.history),
+ accumulator.concat(
+ workerNode.usage.waitTime.history.toArray()
+ ),
[]
)
)
- )
- })
- }
- })
+ ),
+ }),
+ },
+ }),
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .elu.aggregate === true && {
+ elu: {
+ idle: {
+ minimum: round(
+ min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.idle.minimum ??
+ Number.POSITIVE_INFINITY
+ )
+ )
+ ),
+ maximum: round(
+ max(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.idle.maximum ??
+ Number.NEGATIVE_INFINITY
+ )
+ )
+ ),
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .elu.average && {
+ average: round(
+ average(
+ this.workerNodes.reduce<number[]>(
+ (accumulator, workerNode) =>
+ accumulator.concat(
+ workerNode.usage.elu.idle.history.toArray()
+ ),
+ []
+ )
+ )
+ ),
+ }),
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .elu.median && {
+ median: round(
+ median(
+ this.workerNodes.reduce<number[]>(
+ (accumulator, workerNode) =>
+ accumulator.concat(
+ workerNode.usage.elu.idle.history.toArray()
+ ),
+ []
+ )
+ )
+ ),
+ }),
+ },
+ active: {
+ minimum: round(
+ min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.active.minimum ??
+ Number.POSITIVE_INFINITY
+ )
+ )
+ ),
+ maximum: round(
+ max(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.active.maximum ??
+ Number.NEGATIVE_INFINITY
+ )
+ )
+ ),
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .elu.average && {
+ average: round(
+ average(
+ this.workerNodes.reduce<number[]>(
+ (accumulator, workerNode) =>
+ accumulator.concat(
+ workerNode.usage.elu.active.history.toArray()
+ ),
+ []
+ )
+ )
+ ),
+ }),
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
+ .elu.median && {
+ median: round(
+ median(
+ this.workerNodes.reduce<number[]>(
+ (accumulator, workerNode) =>
+ accumulator.concat(
+ workerNode.usage.elu.active.history.toArray()
+ ),
+ []
+ )
+ )
+ ),
+ }),
+ },
+ utilization: {
+ average: round(
+ average(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.elu.utilization ?? 0
+ )
+ )
+ ),
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.elu.utilization ?? 0
+ )
+ )
+ ),
+ },
+ },
+ }),
}
}
/**
- * The pool readiness boolean status.
+ * Whether the pool is ready or not.
+ * @returns The pool readiness boolean status.
*/
private get ready (): boolean {
+ if (this.empty) {
+ return false
+ }
return (
this.workerNodes.reduce(
(accumulator, workerNode) =>
)
}
+ /**
+ * Whether the pool is empty or not.
+ * @returns The pool emptiness boolean status.
+ */
+ protected get empty (): boolean {
+ return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
+ }
+
/**
* The approximate pool utilization.
- *
* @returns The pool utilization.
*/
private get utilization (): number {
+ if (this.startTimestamp == null) {
+ return 0
+ }
const poolTimeCapacity =
(performance.now() - this.startTimestamp) *
(this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
+ accumulator + (workerNode.usage.runTime.aggregate ?? 0),
0
)
const totalTasksWaitTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
+ accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
0
)
return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
/**
* Checks if the worker id sent in the received message from a worker is valid.
- *
* @param message - The received message.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
*/
throw new Error('Worker message received without worker id')
} else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
throw new Error(
- `Worker message received from unknown worker '${message.workerId}'`
+ `Worker message received from unknown worker '${message.workerId.toString()}'`
)
}
}
/**
* Gets the worker node key given its worker id.
- *
* @param workerId - The worker id.
* @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
*/
workerChoiceStrategy: WorkerChoiceStrategy,
workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
): void {
+ let requireSync = false
checkValidWorkerChoiceStrategy(workerChoiceStrategy)
- this.opts.workerChoiceStrategy = workerChoiceStrategy
- this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
- this.opts.workerChoiceStrategy
- )
if (workerChoiceStrategyOptions != null) {
- this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+ requireSync = !this.setWorkerChoiceStrategyOptions(
+ workerChoiceStrategyOptions
+ )
}
- for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
- workerNode.resetUsage()
- this.sendStatisticsMessageToWorker(workerNodeKey)
+ if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
+ this.opts.workerChoiceStrategy = workerChoiceStrategy
+ this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
+ this.opts.workerChoiceStrategy,
+ this.opts.workerChoiceStrategyOptions
+ )
+ requireSync = true
+ }
+ if (requireSync) {
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerChoiceStrategies(),
+ this.opts.workerChoiceStrategyOptions
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
}
}
/** @inheritDoc */
public setWorkerChoiceStrategyOptions (
- workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
- ): void {
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
+ ): boolean {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
if (workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+ this.workerChoiceStrategiesContext?.setOptions(
+ this.opts.workerChoiceStrategyOptions
+ )
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerChoiceStrategies(),
+ this.opts.workerChoiceStrategyOptions
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
+ return true
}
- this.workerChoiceStrategyContext.setOptions(
- this.opts.workerChoiceStrategyOptions
- )
+ return false
}
/** @inheritDoc */
this.flushTasksQueues()
}
this.opts.enableTasksQueue = enable
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.setTasksQueueOptions(tasksQueueOptions!)
+ this.setTasksQueueOptions(tasksQueueOptions)
}
/** @inheritDoc */
- public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
+ public setTasksQueueOptions (
+ tasksQueueOptions: TasksQueueOptions | undefined
+ ): void {
if (this.opts.enableTasksQueue === true) {
checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
}
private buildTasksQueueOptions (
- tasksQueueOptions: TasksQueueOptions
+ tasksQueueOptions: TasksQueueOptions | undefined
): TasksQueueOptions {
return {
...getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
),
- ...tasksQueueOptions
+ ...tasksQueueOptions,
}
}
}
private setTaskStealing (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
}
}
private unsetTaskStealing (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].off(
'idle',
this.handleWorkerNodeIdleEvent
}
private setTasksStealingOnBackPressure (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].on(
'backPressure',
this.handleWorkerNodeBackPressureEvent
}
private unsetTasksStealingOnBackPressure (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.workerNodes[workerNodeKey].off(
'backPressure',
this.handleWorkerNodeBackPressureEvent
/**
* Whether the pool is full or not.
- *
- * The pool filling boolean status.
+ * @returns The pool fullness boolean status.
*/
protected get full (): boolean {
return (
/**
* Whether the pool is busy or not.
- *
- * The pool busyness boolean status.
+ * @returns The pool busyness boolean status.
*/
protected abstract get busy (): boolean
/**
* Whether worker nodes are executing concurrently their tasks quota or not.
- *
* @returns Worker nodes busyness boolean status.
*/
protected internalBusy (): boolean {
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const workerId = this.getWorkerInfo(workerNodeKey).id!
+ const workerId = this.getWorkerInfo(workerNodeKey)?.id
if (
message.taskFunctionOperationStatus != null &&
message.workerId === workerId
) {
if (message.taskFunctionOperationStatus) {
resolve(true)
- } else if (!message.taskFunctionOperationStatus) {
+ } else {
reject(
new Error(
- `Task function operation '${
- message.taskFunctionOperation as string
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- }' failed on worker ${message.workerId} with error: '${
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- message.workerError!.message
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `Task function operation '${message.taskFunctionOperation?.toString()}' failed on worker ${message.workerId?.toString()} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ message.workerError?.message
}'`
)
)
new Error(
`Task function operation '${
message.taskFunctionOperation as string
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- }' failed on worker ${errorResponse!
- .workerId!} with error: '${
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- errorResponse!.workerError!.message
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ }' failed on worker ${errorResponse?.workerId?.toString()} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ errorResponse?.workerError?.message
}'`
)
)
}
}
}
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.registerWorkerMessageListener(
workerNodeKey,
taskFunctionOperationsListener
/** @inheritDoc */
public hasTaskFunction (name: string): boolean {
- for (const workerNode of this.workerNodes) {
- if (
- Array.isArray(workerNode.info.taskFunctionNames) &&
- workerNode.info.taskFunctionNames.includes(name)
- ) {
- return true
- }
- }
- return false
+ return this.listTaskFunctionsProperties().some(
+ taskFunctionProperties => taskFunctionProperties.name === name
+ )
}
/** @inheritDoc */
public async addTaskFunction (
name: string,
- fn: TaskFunction<Data, Response>
+ fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
): Promise<boolean> {
if (typeof name !== 'string') {
throw new TypeError('name argument must be a string')
if (typeof name === 'string' && name.trim().length === 0) {
throw new TypeError('name argument must not be an empty string')
}
- if (typeof fn !== 'function') {
- throw new TypeError('fn argument must be a function')
+ if (typeof fn === 'function') {
+ fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
+ }
+ if (typeof fn.taskFunction !== 'function') {
+ throw new TypeError('taskFunction property must be a function')
}
+ checkValidPriority(fn.priority)
+ checkValidWorkerChoiceStrategy(fn.strategy)
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
- taskFunctionName: name,
- taskFunction: fn.toString()
+ taskFunctionProperties: buildTaskFunctionProperties(name, fn),
+ taskFunction: fn.taskFunction.toString(),
})
this.taskFunctions.set(name, fn)
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerChoiceStrategies()
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
return opResult
}
}
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'remove',
- taskFunctionName: name
+ taskFunctionProperties: buildTaskFunctionProperties(
+ name,
+ this.taskFunctions.get(name)
+ ),
})
- this.deleteTaskFunctionWorkerUsages(name)
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
this.taskFunctions.delete(name)
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerChoiceStrategies()
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
return opResult
}
/** @inheritDoc */
- public listTaskFunctionNames (): string[] {
+ public listTaskFunctionsProperties (): TaskFunctionProperties[] {
for (const workerNode of this.workerNodes) {
if (
- Array.isArray(workerNode.info.taskFunctionNames) &&
- workerNode.info.taskFunctionNames.length > 0
+ Array.isArray(workerNode.info.taskFunctionsProperties) &&
+ workerNode.info.taskFunctionsProperties.length > 0
) {
- return workerNode.info.taskFunctionNames
+ return workerNode.info.taskFunctionsProperties
}
}
return []
}
+ /**
+ * Gets task function worker choice strategy, if any.
+ * @param name - The task function name.
+ * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
+ */
+ private readonly getTaskFunctionWorkerChoiceStrategy = (
+ name?: string
+ ): WorkerChoiceStrategy | undefined => {
+ name = name ?? DEFAULT_TASK_NAME
+ const taskFunctionsProperties = this.listTaskFunctionsProperties()
+ if (name === DEFAULT_TASK_NAME) {
+ name = taskFunctionsProperties[1]?.name
+ }
+ return taskFunctionsProperties.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
+ }
+
+ /**
+ * Gets worker node task function worker choice strategy, if any.
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = (
+ workerNodeKey: number,
+ name?: string
+ ): WorkerChoiceStrategy | undefined => {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ return
+ }
+ name = name ?? DEFAULT_TASK_NAME
+ if (name === DEFAULT_TASK_NAME) {
+ name = workerInfo.taskFunctionsProperties?.[1]?.name
+ }
+ return workerInfo.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
+ }
+
+ /**
+ * Gets worker node task function priority, if any.
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionPriority = (
+ workerNodeKey: number,
+ name?: string
+ ): number | undefined => {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ return
+ }
+ name = name ?? DEFAULT_TASK_NAME
+ if (name === DEFAULT_TASK_NAME) {
+ name = workerInfo.taskFunctionsProperties?.[1]?.name
+ }
+ return workerInfo.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.priority
+ }
+
+ /**
+ * Gets the worker choice strategies registered in this pool.
+ * @returns The worker choice strategies.
+ */
+ private readonly getWorkerChoiceStrategies =
+ (): Set<WorkerChoiceStrategy> => {
+ return new Set([
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.workerChoiceStrategy!,
+ ...this.listTaskFunctionsProperties()
+ .map(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.strategy
+ )
+ .filter(
+ (strategy: WorkerChoiceStrategy | undefined) => strategy != null
+ ),
+ ])
+ }
+
/** @inheritDoc */
public async setDefaultTaskFunction (name: string): Promise<boolean> {
return await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'default',
- taskFunctionName: name
+ taskFunctionProperties: buildTaskFunctionProperties(
+ name,
+ this.taskFunctions.get(name)
+ ),
})
}
- private deleteTaskFunctionWorkerUsages (name: string): void {
- for (const workerNode of this.workerNodes) {
- workerNode.deleteTaskFunctionWorkerUsage(name)
- }
- }
-
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
public async execute (
data?: Data,
name?: string,
- transferList?: TransferListItem[]
+ transferList?: readonly TransferListItem[]
): Promise<Response> {
return await new Promise<Response>((resolve, reject) => {
if (!this.started) {
return
}
const timestamp = performance.now()
- const workerNodeKey = this.chooseWorkerNode()
+ const workerNodeKey = this.chooseWorkerNode(name)
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
- // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
+ priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
+ strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy(
+ workerNodeKey,
+ name
+ ),
transferList,
timestamp,
- taskId: randomUUID()
+ taskId: randomUUID(),
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.set(task.taskId!, {
...(this.emitter != null && {
asyncResource: new AsyncResource('poolifier:task', {
triggerAsyncId: this.emitter.asyncId,
- requireManualDestroy: true
- })
- })
+ requireManualDestroy: true,
+ }),
+ }),
})
if (
this.opts.enableTasksQueue === false ||
})
}
+ /** @inheritDoc */
+ public mapExecute (
+ data: Iterable<Data>,
+ name?: string,
+ transferList?: readonly TransferListItem[]
+ ): Promise<Response[]> {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (data == null) {
+ throw new TypeError('data argument must be a defined iterable')
+ }
+ if (typeof data[Symbol.iterator] !== 'function') {
+ throw new TypeError('data argument must be an iterable')
+ }
+ if (!Array.isArray(data)) {
+ data = [...data]
+ }
+ return Promise.all(
+ (data as Data[]).map(data => this.execute(data, name, transferList))
+ )
+ }
+
+ /**
+ * Starts the minimum number of workers.
+ * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false
+ */
+ private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
+ if (this.minimumNumberOfWorkers === 0) {
+ return
+ }
+ this.startingMinimumNumberOfWorkers = true
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.minimumNumberOfWorkers
+ ) {
+ const workerNodeKey = this.createAndSetupWorkerNode()
+ initWorkerNodeUsage &&
+ this.initWorkerNodeUsage(this.workerNodes[workerNodeKey])
+ }
+ this.startingMinimumNumberOfWorkers = false
+ }
+
/** @inheritdoc */
public start (): void {
if (this.started) {
throw new Error('Cannot start a destroying pool')
}
this.starting = true
- while (
- this.workerNodes.reduce(
- (accumulator, workerNode) =>
- !workerNode.info.dynamic ? accumulator + 1 : accumulator,
- 0
- ) < this.minimumNumberOfWorkers
- ) {
- this.createAndSetupWorkerNode()
- }
+ this.startMinimumNumberOfWorkers()
+ this.startTimestamp = performance.now()
this.starting = false
this.started = true
}
}
this.destroying = true
await Promise.all(
- this.workerNodes.map(async (_workerNode, workerNodeKey) => {
+ this.workerNodes.map(async (_, workerNodeKey) => {
await this.destroyWorkerNode(workerNodeKey)
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
- this.emitter?.removeAllListeners()
this.readyEventEmitted = false
+ delete this.startTimestamp
this.destroying = false
this.started = false
}
private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
- if (this.workerNodes?.[workerNodeKey] == null) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (this.workerNodes[workerNodeKey] == null) {
resolve()
return
}
} else if (message.kill === 'failure') {
reject(
new Error(
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- `Kill message handling failed on worker ${message.workerId!}`
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `Kill message handling failed on worker ${message.workerId?.toString()}`
)
)
}
/**
* Terminates the worker node given its worker node key.
- *
* @param workerNodeKey - The worker node key.
*/
protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
/**
* Setup hook to execute code before worker nodes are created in the abstract constructor.
* Can be overridden.
- *
- * @virtual
*/
protected setupHook (): void {
/* Intentionally empty */
}
/**
- * Should return whether the worker is the main worker or not.
+ * Returns whether the worker is the main worker or not.
+ * @returns `true` if the worker is the main worker, `false` otherwise.
*/
protected abstract isMain (): boolean
/**
* Hook executed before the worker task execution.
* Can be overridden.
- *
* @param workerNodeKey - The worker node key.
* @param task - The task to execute.
*/
workerNodeKey: number,
task: Task<Data>
): void {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
updateWaitTimeWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
workerUsage,
task
)
].getTaskFunctionWorkerUsage(task.name!)!
++taskFunctionWorkerUsage.tasks.executing
updateWaitTimeWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
taskFunctionWorkerUsage,
task
)
/**
* Hook executed after the worker task execution.
* Can be overridden.
- *
* @param workerNodeKey - The worker node key.
* @param message - The received message.
*/
workerNodeKey: number,
message: MessageValue<Response>
): void {
- let needWorkerChoiceStrategyUpdate = false
+ let needWorkerChoiceStrategiesUpdate = false
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
updateTaskStatisticsWorkerUsage(workerUsage, message)
updateRunTimeWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
workerUsage,
message
)
updateEluWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
workerUsage,
message
)
- needWorkerChoiceStrategyUpdate = true
+ needWorkerChoiceStrategiesUpdate = true
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
updateRunTimeWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
taskFunctionWorkerUsage,
message
)
updateEluWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
taskFunctionWorkerUsage,
message
)
- needWorkerChoiceStrategyUpdate = true
+ needWorkerChoiceStrategiesUpdate = true
}
- if (needWorkerChoiceStrategyUpdate) {
- this.workerChoiceStrategyContext.update(workerNodeKey)
+ if (needWorkerChoiceStrategiesUpdate) {
+ this.workerChoiceStrategiesContext?.update(workerNodeKey)
}
}
/**
* Whether the worker node shall update its task function worker usage or not.
- *
* @param workerNodeKey - The worker node key.
* @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
*/
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
workerInfo != null &&
- Array.isArray(workerInfo.taskFunctionNames) &&
- workerInfo.taskFunctionNames.length > 2
+ Array.isArray(workerInfo.taskFunctionsProperties) &&
+ workerInfo.taskFunctionsProperties.length > 2
)
}
/**
* Chooses a worker node for the next task.
- *
- * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
- *
- * @returns The chosen worker node key
+ * @param name - The task function name.
+ * @returns The chosen worker node key.
*/
- private chooseWorkerNode (): number {
+ private chooseWorkerNode (name?: string): number {
if (this.shallCreateDynamicWorker()) {
const workerNodeKey = this.createAndSetupDynamicWorkerNode()
if (
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
+ true
) {
return workerNodeKey
}
}
- return this.workerChoiceStrategyContext.execute()
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ return this.workerChoiceStrategiesContext!.execute(
+ this.getTaskFunctionWorkerChoiceStrategy(name)
+ )
}
/**
* Conditions for dynamic worker creation.
- *
* @returns Whether to create a dynamic worker or not.
*/
protected abstract shallCreateDynamicWorker (): boolean
/**
* Sends a message to worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param message - The message.
* @param transferList - The optional array of transferable objects.
protected abstract sendToWorker (
workerNodeKey: number,
message: MessageValue<Data>,
- transferList?: TransferListItem[]
+ transferList?: readonly TransferListItem[]
): void
+ /**
+ * Initializes the worker node usage with sensible default values gathered during runtime.
+ * @param workerNode - The worker node.
+ */
+ private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate === true
+ ) {
+ workerNode.usage.runTime.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY
+ )
+ )
+ }
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .waitTime.aggregate === true
+ ) {
+ workerNode.usage.waitTime.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY
+ )
+ )
+ }
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
+ .aggregate === true
+ ) {
+ workerNode.usage.elu.active.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode =>
+ workerNode.usage.elu.active.aggregate ?? Number.POSITIVE_INFINITY
+ )
+ )
+ }
+ }
+
/**
* Creates a new, completely set up worker node.
- *
* @returns New, completely set up worker node key.
*/
protected createAndSetupWorkerNode (): number {
'error',
this.opts.errorHandler ?? EMPTY_FUNCTION
)
- workerNode.registerWorkerEventHandler('error', (error: Error) => {
+ workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
workerNode.info.ready = false
this.emitter?.emit(PoolEvents.error, error)
if (
) {
if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
- } else {
- this.createAndSetupWorkerNode()
+ } else if (!this.startingMinimumNumberOfWorkers) {
+ this.startMinimumNumberOfWorkers(true)
}
}
if (
) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
- workerNode?.terminate().catch(error => {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode?.terminate().catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
})
)
workerNode.registerOnceWorkerEventHandler('exit', () => {
this.removeWorkerNode(workerNode)
+ if (
+ this.started &&
+ !this.startingMinimumNumberOfWorkers &&
+ !this.destroying
+ ) {
+ this.startMinimumNumberOfWorkers(true)
+ }
})
const workerNodeKey = this.addWorkerNode(workerNode)
this.afterWorkerNodeSetup(workerNodeKey)
/**
* Creates a new, completely set up dynamic worker node.
- *
* @returns New, completely set up dynamic worker node key.
*/
protected createAndSetupDynamicWorkerNode (): number {
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
- const workerUsage = this.workerNodes[localWorkerNodeKey].usage
+ const workerInfo = this.getWorkerInfo(localWorkerNodeKey)
+ const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
// Kill message received from worker
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
((this.opts.enableTasksQueue === false &&
workerUsage.tasks.executing === 0) ||
(this.opts.enableTasksQueue === true &&
+ workerInfo != null &&
+ !workerInfo.stealing &&
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
// Flag the worker node as not ready immediately
this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
- this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
+ this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
}
})
this.sendToWorker(workerNodeKey, {
- checkActive: true
+ checkActive: true,
})
if (this.taskFunctions.size > 0) {
- for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+ for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
this.sendTaskFunctionOperationToWorker(workerNodeKey, {
taskFunctionOperation: 'add',
- taskFunctionName,
- taskFunction: taskFunction.toString()
- }).catch(error => {
+ taskFunctionProperties: buildTaskFunctionProperties(
+ taskFunctionName,
+ taskFunctionObject
+ ),
+ taskFunction: taskFunctionObject.taskFunction.toString(),
+ }).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
}
const workerNode = this.workerNodes[workerNodeKey]
workerNode.info.dynamic = true
if (
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady ===
+ true ||
+ this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
+ true
) {
workerNode.info.ready = true
}
+ this.initWorkerNodeUsage(workerNode)
this.checkAndEmitDynamicWorkerCreationEvents()
return workerNodeKey
}
/**
* Registers a listener callback on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param listener - The message listener callback.
*/
/**
* Registers once a listener callback on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param listener - The message listener callback.
*/
/**
* Deregisters a listener callback on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param listener - The message listener callback.
*/
/**
* Method hooked up after a worker node has been newly created.
* Can be overridden.
- *
* @param workerNodeKey - The newly created worker node key.
*/
protected afterWorkerNodeSetup (workerNodeKey: number): void {
/**
* Sends the startup message to worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
*/
protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
/**
* Sends the statistics message to worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
*/
private sendStatisticsMessageToWorker (workerNodeKey: number): void {
this.sendToWorker(workerNodeKey, {
statistics: {
runTime:
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .runTime.aggregate,
- elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .elu.aggregate
- }
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate ?? false,
+ elu:
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .elu.aggregate ?? false,
+ },
})
}
}
}
- private redistributeQueuedTasks (workerNodeKey: number): void {
- if (workerNodeKey === -1 || this.cannotStealTask()) {
+ private redistributeQueuedTasks (sourceWorkerNodeKey: number): void {
+ if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) {
return
}
- while (this.tasksQueueSize(workerNodeKey) > 0) {
+ while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) {
const destinationWorkerNodeKey = this.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
- return workerNode.info.ready &&
+ return sourceWorkerNodeKey !== workerNodeKey &&
+ workerNode.info.ready &&
workerNode.usage.tasks.queued <
workerNodes[minWorkerNodeKey].usage.tasks.queued
? workerNodeKey
this.handleTask(
destinationWorkerNodeKey,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.dequeueTask(workerNodeKey)!
+ this.dequeueTask(sourceWorkerNodeKey)!
)
}
}
taskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
++workerNode.usage.tasks.stolen
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage =
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- workerNode.getTaskFunctionWorkerUsage(taskName)!
- ++taskFunctionWorkerUsage.tasks.stolen
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ++workerNode.getTaskFunctionWorkerUsage(taskName)!.tasks.stolen
}
}
private updateTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number
+ workerNodeKey: number,
+ taskName: string,
+ previousTaskName?: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
++workerNode.usage.tasks.sequentiallyStolen
}
- }
-
- private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
- workerNodeKey: number,
- taskName: string
- ): void {
- const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
const taskFunctionWorkerUsage =
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
workerNode.getTaskFunctionWorkerUsage(taskName)!
- ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ if (
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 ||
+ (previousTaskName != null &&
+ previousTaskName === taskName &&
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0)
+ ) {
+ ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) {
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ }
}
}
private resetTaskSequentiallyStolenStatisticsWorkerUsage (
- workerNodeKey: number
+ workerNodeKey: number,
+ taskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
workerNode.usage.tasks.sequentiallyStolen = 0
}
- }
-
- private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
- workerNodeKey: number,
- taskName: string
- ): void {
- const workerNode = this.workerNodes[workerNodeKey]
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage =
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- workerNode.getTaskFunctionWorkerUsage(taskName)!
- taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ )!.tasks.sequentiallyStolen = 0
}
}
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
- 'WorkerNode event detail workerNodeKey property must be defined'
+ "WorkerNode event detail 'workerNodeKey' property must be defined"
+ )
+ }
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey.toString()}' not found in pool`
)
}
if (
this.cannotStealTask() ||
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
+ (this.info.stealingWorkerNodes ?? 0) >
+ Math.floor(this.workerNodes.length / 2)
) {
if (previousStolenTask != null) {
- this.getWorkerInfo(workerNodeKey).stealing = false
+ workerInfo.stealing = false
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ previousStolenTask.name!
+ )
}
return
}
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
previousStolenTask != null &&
- workerNodeTasksUsage.sequentiallyStolen > 0 &&
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
- this.getWorkerInfo(workerNodeKey).stealing = false
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- for (const taskName of this.workerNodes[workerNodeKey].info
- .taskFunctionNames!) {
- this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- taskName
- )
- }
- this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
+ workerInfo.stealing = false
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ previousStolenTask.name!
+ )
return
}
- this.getWorkerInfo(workerNodeKey).stealing = true
+ workerInfo.stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
- if (
- this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
- stolenTask != null
- ) {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const taskFunctionTasksWorkerUsage = this.workerNodes[
- workerNodeKey
+ if (stolenTask != null) {
+ this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
- if (
- taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
- (previousStolenTask != null &&
- previousStolenTask.name === stolenTask.name &&
- taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
- ) {
- this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- stolenTask.name!
- )
- } else {
- this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
- workerNodeKey,
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- stolenTask.name!
- )
- }
+ stolenTask.name!,
+ previousStolenTask?.name
+ )
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
- .catch(EMPTY_FUNCTION)
+ .catch((error: unknown) => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
}
private readonly workerNodeStealTask = (
)
if (sourceWorkerNode != null) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.popTask()!
+ const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
this.handleTask(workerNodeKey, task)
- this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
return task
): void => {
if (
this.cannotStealTask() ||
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
+ this.hasBackPressure() ||
+ (this.info.stealingWorkerNodes ?? 0) >
+ Math.floor(this.workerNodes.length / 2)
) {
return
}
- const { workerId } = eventDetail
const sizeOffset = 1
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
return
}
+ const { workerId } = eventDetail
const sourceWorkerNode =
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
const workerNodes = this.workerNodes
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.opts.tasksQueueOptions!.size! - sizeOffset
) {
- this.getWorkerInfo(workerNodeKey).stealing = true
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey.toString()}' not found in pool`
+ )
+ }
+ workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.popTask()!
+ const task = sourceWorkerNode.dequeueLastPrioritizedTask()!
this.handleTask(workerNodeKey, task)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
- this.getWorkerInfo(workerNodeKey).stealing = false
+ workerInfo.stealing = false
}
}
}
+ private setTasksQueuePriority (workerNodeKey: number): void {
+ this.workerNodes[workerNodeKey].setTasksQueuePriority(
+ this.getTasksQueuePriority()
+ )
+ }
+
/**
* This method is the message listener registered on each worker.
+ * @param message - The message received from the worker.
*/
protected readonly workerMessageListener = (
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- const { workerId, ready, taskId, taskFunctionNames } = message
- if (ready != null && taskFunctionNames != null) {
+ const { workerId, ready, taskId, taskFunctionsProperties } = message
+ if (ready != null && taskFunctionsProperties != null) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
+ } else if (taskFunctionsProperties != null) {
+ // Task function properties message received from worker
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo != null) {
+ workerInfo.taskFunctionsProperties = taskFunctionsProperties
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
+ }
} else if (taskId != null) {
// Task execution response received from worker
this.handleTaskExecutionResponse(message)
- } else if (taskFunctionNames != null) {
- // Task function names message received from worker
- this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- ).taskFunctionNames = taskFunctionNames
+ }
+ }
+
+ private checkAndEmitReadyEvent (): void {
+ if (!this.readyEventEmitted && this.ready) {
+ this.emitter?.emit(PoolEvents.ready, this.info)
+ this.readyEventEmitted = true
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
- const { workerId, ready, taskFunctionNames } = message
+ const { workerId, ready, taskFunctionsProperties } = message
if (ready == null || !ready) {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- throw new Error(`Worker ${workerId!} failed to initialize`)
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ throw new Error(`Worker ${workerId?.toString()} failed to initialize`)
}
- const workerNode =
- this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerNode = this.workerNodes[workerNodeKey]
workerNode.info.ready = ready
- workerNode.info.taskFunctionNames = taskFunctionNames
- if (!this.readyEventEmitted && this.ready) {
- this.readyEventEmitted = true
- this.emitter?.emit(PoolEvents.ready, this.info)
- }
+ workerNode.info.taskFunctionsProperties = taskFunctionsProperties
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
+ this.checkAndEmitReadyEvent()
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
this.afterTaskExecutionHook(workerNodeKey, message)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.delete(taskId!)
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
workerNode?.emit('taskFinished', taskId)
- if (this.opts.enableTasksQueue === true && !this.destroying) {
+ if (
+ this.opts.enableTasksQueue === true &&
+ !this.destroying &&
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode != null
+ ) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
workerNode.emit('idle', {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- workerId: workerId!,
- workerNodeKey
+ workerId,
+ workerNodeKey,
})
}
}
/**
* Gets the worker information given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @returns The worker information.
*/
- protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
+ protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
return this.workerNodes[workerNodeKey]?.info
}
+ private getTasksQueuePriority (): boolean {
+ return this.listTaskFunctionsProperties().some(
+ taskFunctionProperties => taskFunctionProperties.priority != null
+ )
+ }
+
/**
* Creates a worker node.
- *
* @returns The created worker node.
*/
private createWorkerNode (): IWorkerNode<Worker, Data> {
this.opts.tasksQueueOptions?.size ??
getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
- ).size
+ ).size,
+ tasksQueueBucketSize: defaultBucketSize,
+ tasksQueuePriority: this.getTasksQueuePriority(),
}
)
// Flag the worker node as ready at pool startup.
/**
* Adds the given worker node in the pool worker nodes.
- *
* @param workerNode - The worker node.
* @returns The added worker node key.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
return workerNodeKey
}
+ private checkAndEmitEmptyEvent (): void {
+ if (this.empty) {
+ this.emitter?.emit(PoolEvents.empty, this.info)
+ this.readyEventEmitted = false
+ }
+ }
+
/**
* Removes the worker node from the pool worker nodes.
- *
* @param workerNode - The worker node.
*/
private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey !== -1) {
this.workerNodes.splice(workerNodeKey, 1)
- this.workerChoiceStrategyContext.remove(workerNodeKey)
+ this.workerChoiceStrategiesContext?.remove(workerNodeKey)
}
+ this.checkAndEmitEmptyEvent()
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
- this.getWorkerInfo(workerNodeKey).ready = false
- }
-
- /** @inheritDoc */
- public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
- return (
- this.opts.enableTasksQueue === true &&
- this.workerNodes[workerNodeKey].hasBackPressure()
- )
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo != null) {
+ workerInfo.ready = false
+ }
}
private hasBackPressure (): boolean {
/**
* Executes the given task on the worker given its worker node key.
- *
* @param workerNodeKey - The worker node key.
* @param task - The task to execute.
*/
}
private flushTasksQueues (): void {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
+ for (const workerNodeKey of this.workerNodes.keys()) {
this.flushTasksQueue(workerNodeKey)
}
}