-import crypto from 'node:crypto'
+import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
-import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
+import type {
+ MessageValue,
+ PromiseResponseWrapper,
+ Task
+} from '../utility-types'
import {
+ DEFAULT_TASK_NAME,
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
+ isKillBehavior,
isPlainObject,
- median
+ median,
+ round
} from '../utils'
-import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
-import { CircularArray } from '../circular-array'
-import { Queue } from '../queue'
+import { KillBehaviors } from '../worker/worker-options'
import {
type IPool,
PoolEmitter,
type PoolOptions,
type PoolType,
PoolTypes,
- type TasksQueueOptions,
- type WorkerType
+ type TasksQueueOptions
} from './pool'
import type {
IWorker,
- Task,
- TaskStatistics,
- WorkerNode,
+ IWorkerNode,
+ MessageHandler,
+ WorkerInfo,
+ WorkerType,
WorkerUsage
} from './worker'
import {
+ Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types'
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+import { version } from './version'
+import { WorkerNode } from './worker-node'
/**
* Base class that implements some shared logic for all poolifier pools.
*
* @typeParam Worker - Type of worker which manages this pool.
- * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
- * @typeParam Response - Type of execution response. This can only be serializable data.
+ * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
+ * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
*/
export abstract class AbstractPool<
Worker extends IWorker,
Response = unknown
> implements IPool<Worker, Data, Response> {
/** @inheritDoc */
- public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
+ public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
/** @inheritDoc */
public readonly emitter?: PoolEmitter
Response
>
+ /**
+ * The start timestamp of the pool.
+ */
+ private readonly startTimestamp
+
/**
* Constructs a new poolifier pool.
*
this.setupHook()
- for (let i = 1; i <= this.numberOfWorkers; i++) {
+ while (this.workerNodes.length < this.numberOfWorkers) {
this.createAndSetupWorker()
}
+
+ this.startTimestamp = performance.now()
}
private checkFilePath (filePath: string): void {
'Cannot instantiate a pool with a negative number of workers'
)
} else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
- throw new Error('Cannot instantiate a fixed pool with no worker')
+ throw new RangeError('Cannot instantiate a fixed pool with zero worker')
+ }
+ }
+
+ protected checkDynamicPoolSize (min: number, max: number): void {
+ if (this.type === PoolTypes.dynamic) {
+ if (min > max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
+ )
+ } else if (min === 0 && max === 0) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
+ )
+ } else if (min === max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
+ )
+ }
}
}
'Invalid worker choice strategy options: must have a weight for each worker node'
)
}
+ if (
+ workerChoiceStrategyOptions.measurement != null &&
+ !Object.values(Measurements).includes(
+ workerChoiceStrategyOptions.measurement
+ )
+ ) {
+ throw new Error(
+ `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
+ )
+ }
}
private checkValidTasksQueueOptions (
if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
throw new TypeError('Invalid tasks queue options: must be a plain object')
}
- if ((tasksQueueOptions?.concurrency as number) <= 0) {
+ if (
+ tasksQueueOptions?.concurrency != null &&
+ !Number.isSafeInteger(tasksQueueOptions.concurrency)
+ ) {
+ throw new TypeError(
+ 'Invalid worker tasks concurrency: must be an integer'
+ )
+ }
+ if (
+ tasksQueueOptions?.concurrency != null &&
+ tasksQueueOptions.concurrency <= 0
+ ) {
throw new Error(
- `Invalid worker tasks concurrency '${
- tasksQueueOptions.concurrency as number
- }'`
+ `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
)
}
}
/** @inheritDoc */
public get info (): PoolInfo {
return {
+ version,
type: this.type,
worker: this.worker,
+ ready: this.ready,
+ strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
minSize: this.minSize,
maxSize: this.maxSize,
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && { utilization: round(this.utilization) }),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
- workerNode.workerUsage.tasks.executing === 0
+ workerNode.usage.tasks.executing === 0
? accumulator + 1
: accumulator,
0
),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
- workerNode.workerUsage.tasks.executing > 0
- ? accumulator + 1
- : accumulator,
+ workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
0
),
executedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.workerUsage.tasks.executed,
+ accumulator + workerNode.usage.tasks.executed,
0
),
executingTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.workerUsage.tasks.executing,
+ accumulator + workerNode.usage.tasks.executing,
0
),
queuedTasks: this.workerNodes.reduce(
- (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
+ (accumulator, workerNode) =>
+ accumulator + workerNode.usage.tasks.queued,
0
),
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.tasksQueue.maxSize,
+ accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
0
),
failedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.workerUsage.tasks.failed,
+ accumulator + workerNode.usage.tasks.failed,
0
- )
+ ),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate && {
+ runTime: {
+ minimum: round(
+ Math.min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.runTime?.minimum ?? Infinity
+ )
+ )
+ ),
+ maximum: round(
+ Math.max(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
+ )
+ )
+ ),
+ average: round(
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
+ 0
+ ) /
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.tasks?.executed ?? 0),
+ 0
+ )
+ ),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.median && {
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.runTime?.median ?? 0
+ )
+ )
+ )
+ })
+ }
+ }),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && {
+ waitTime: {
+ minimum: round(
+ Math.min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
+ )
+ )
+ ),
+ maximum: round(
+ Math.max(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
+ )
+ )
+ ),
+ average: round(
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
+ 0
+ ) /
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.tasks?.executed ?? 0),
+ 0
+ )
+ ),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.median && {
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime?.median ?? 0
+ )
+ )
+ )
+ })
+ }
+ })
}
}
+ private get starting (): boolean {
+ return (
+ this.workerNodes.length < this.minSize ||
+ (this.workerNodes.length >= this.minSize &&
+ this.workerNodes.some(workerNode => !workerNode.info.ready))
+ )
+ }
+
+ private get ready (): boolean {
+ return (
+ this.workerNodes.length >= this.minSize &&
+ this.workerNodes.every(workerNode => workerNode.info.ready)
+ )
+ }
+
+ /**
+ * Gets the approximate pool utilization.
+ *
+ * @returns The pool utilization.
+ */
+ private get utilization (): number {
+ const poolRunTimeCapacity =
+ (performance.now() - this.startTimestamp) * this.maxSize
+ const totalTasksRunTime = this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
+ 0
+ )
+ const totalTasksWaitTime = this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
+ 0
+ )
+ return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
+ }
+
/**
* Pool type.
*
*/
protected abstract get maxSize (): number
+ /**
+ * Get the worker given its id.
+ *
+ * @param workerId - The worker id.
+ * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
+ */
+ private getWorkerById (workerId: number): Worker | undefined {
+ return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
+ ?.worker
+ }
+
+ /**
+ * Checks if the worker id sent in the received message from a worker is valid.
+ *
+ * @param message - The received message.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
+ */
+ private checkMessageWorkerId (message: MessageValue<Response>): void {
+ if (
+ message.workerId != null &&
+ this.getWorkerById(message.workerId) == null
+ ) {
+ throw new Error(
+ `Worker message received from unknown worker '${message.workerId}'`
+ )
+ }
+ }
+
/**
* Gets the given worker its worker node key.
*
* @param worker - The worker.
- * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
+ * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
*/
private getWorkerNodeKey (worker: Worker): number {
return this.workerNodes.findIndex(
this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
}
for (const workerNode of this.workerNodes) {
- this.setWorkerNodeTasksUsage(
- workerNode,
- this.getWorkerUsage(workerNode.worker)
- )
+ workerNode.resetUsage()
this.setWorkerStatistics(workerNode.worker)
}
}
protected internalBusy (): boolean {
return (
this.workerNodes.findIndex(workerNode => {
- return workerNode.workerUsage.tasks.executing === 0
+ return workerNode.usage.tasks.executing === 0
}) === -1
)
}
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode()
const submittedTask: Task<Data> = {
- name,
+ name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
timestamp,
- id: crypto.randomUUID()
+ workerId: this.getWorkerInfo(workerNodeKey).id as number,
+ id: randomUUID()
}
const res = new Promise<Response>((resolve, reject) => {
this.promiseResponseMap.set(submittedTask.id as string, {
if (
this.opts.enableTasksQueue === true &&
(this.busy ||
- this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
+ this.workerNodes[workerNodeKey].usage.tasks.executing >=
((this.opts.tasksQueueOptions as TasksQueueOptions)
.concurrency as number))
) {
this.workerNodes.map(async (workerNode, workerNodeKey) => {
this.flushTasksQueue(workerNodeKey)
// FIXME: wait for tasks to be finished
+ const workerExitPromise = new Promise<void>(resolve => {
+ workerNode.worker.on('exit', () => {
+ resolve()
+ })
+ })
await this.destroyWorker(workerNode.worker)
+ await workerExitPromise
})
)
}
protected abstract destroyWorker (worker: Worker): void | Promise<void>
/**
- * Setup hook to execute code before worker node are created in the abstract constructor.
- * Can be overridden
+ * Setup hook to execute code before worker nodes are created in the abstract constructor.
+ * Can be overridden.
*
* @virtual
*/
workerNodeKey: number,
task: Task<Data>
): void {
- const workerUsage = this.workerNodes[workerNodeKey].workerUsage
+ const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
this.updateWaitTimeWorkerUsage(workerUsage, task)
+ const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
+ task.name as string
+ ) as WorkerUsage
+ ++taskWorkerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
}
/**
worker: Worker,
message: MessageValue<Response>
): void {
- const workerUsage =
- this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ const workerUsage = this.workerNodes[workerNodeKey].usage
this.updateTaskStatisticsWorkerUsage(workerUsage, message)
this.updateRunTimeWorkerUsage(workerUsage, message)
this.updateEluWorkerUsage(workerUsage, message)
+ const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
+ message.taskPerformance?.name ?? DEFAULT_TASK_NAME
+ ) as WorkerUsage
+ this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
+ this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
+ this.updateEluWorkerUsage(taskWorkerUsage, message)
}
private updateTaskStatisticsWorkerUsage (
): void {
const workerTaskStatistics = workerUsage.tasks
--workerTaskStatistics.executing
- ++workerTaskStatistics.executed
- if (message.taskError != null) {
+ if (message.taskError == null) {
+ ++workerTaskStatistics.executed
+ } else {
++workerTaskStatistics.failed
}
}
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
.aggregate
) {
- workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
+ const taskRunTime = message.taskPerformance?.runTime ?? 0
+ workerUsage.runTime.aggregate =
+ (workerUsage.runTime.aggregate ?? 0) + taskRunTime
+ workerUsage.runTime.minimum = Math.min(
+ taskRunTime,
+ workerUsage.runTime?.minimum ?? Infinity
+ )
+ workerUsage.runTime.maximum = Math.max(
+ taskRunTime,
+ workerUsage.runTime?.maximum ?? -Infinity
+ )
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
.average &&
workerUsage.tasks.executed !== 0
) {
workerUsage.runTime.average =
- workerUsage.runTime.aggregate /
- (workerUsage.tasks.executed - workerUsage.tasks.failed)
+ workerUsage.runTime.aggregate / workerUsage.tasks.executed
}
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
.aggregate
) {
- workerUsage.waitTime.aggregate += taskWaitTime ?? 0
+ workerUsage.waitTime.aggregate =
+ (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
+ workerUsage.waitTime.minimum = Math.min(
+ taskWaitTime,
+ workerUsage.waitTime?.minimum ?? Infinity
+ )
+ workerUsage.waitTime.maximum = Math.max(
+ taskWaitTime,
+ workerUsage.waitTime?.maximum ?? -Infinity
+ )
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.waitTime.average &&
workerUsage.tasks.executed !== 0
) {
workerUsage.waitTime.average =
- workerUsage.waitTime.aggregate /
- (workerUsage.tasks.executed - workerUsage.tasks.failed)
+ workerUsage.waitTime.aggregate / workerUsage.tasks.executed
}
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .waitTime.median &&
- taskWaitTime != null
+ .waitTime.median
) {
workerUsage.waitTime.history.push(taskWaitTime)
workerUsage.waitTime.median = median(workerUsage.waitTime.history)
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
.aggregate
) {
- if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
- workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
- workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
- workerUsage.elu.utilization =
- (workerUsage.elu.utilization +
- message.taskPerformance.elu.utilization) /
- 2
- } else if (message.taskPerformance?.elu != null) {
- workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
- workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
- workerUsage.elu.utilization = message.taskPerformance.elu.utilization
- }
- if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
- .average &&
- workerUsage.tasks.executed !== 0
- ) {
- const executedTasks =
- workerUsage.tasks.executed - workerUsage.tasks.failed
- workerUsage.elu.idle.average =
- workerUsage.elu.idle.aggregate / executedTasks
- workerUsage.elu.active.average =
- workerUsage.elu.active.aggregate / executedTasks
- }
- if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
- .median &&
- message.taskPerformance?.elu != null
- ) {
- workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
- workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
- workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
- workerUsage.elu.active.median = median(workerUsage.elu.active.history)
+ if (message.taskPerformance?.elu != null) {
+ workerUsage.elu.idle.aggregate =
+ (workerUsage.elu.idle?.aggregate ?? 0) +
+ message.taskPerformance.elu.idle
+ workerUsage.elu.active.aggregate =
+ (workerUsage.elu.active?.aggregate ?? 0) +
+ message.taskPerformance.elu.active
+ if (workerUsage.elu.utilization != null) {
+ workerUsage.elu.utilization =
+ (workerUsage.elu.utilization +
+ message.taskPerformance.elu.utilization) /
+ 2
+ } else {
+ workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+ }
+ workerUsage.elu.idle.minimum = Math.min(
+ message.taskPerformance.elu.idle,
+ workerUsage.elu.idle?.minimum ?? Infinity
+ )
+ workerUsage.elu.idle.maximum = Math.max(
+ message.taskPerformance.elu.idle,
+ workerUsage.elu.idle?.maximum ?? -Infinity
+ )
+ workerUsage.elu.active.minimum = Math.min(
+ message.taskPerformance.elu.active,
+ workerUsage.elu.active?.minimum ?? Infinity
+ )
+ workerUsage.elu.active.maximum = Math.max(
+ message.taskPerformance.elu.active,
+ workerUsage.elu.active?.maximum ?? -Infinity
+ )
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .average &&
+ workerUsage.tasks.executed !== 0
+ ) {
+ workerUsage.elu.idle.average =
+ workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
+ workerUsage.elu.active.average =
+ workerUsage.elu.active.aggregate / workerUsage.tasks.executed
+ }
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .median
+ ) {
+ workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
+ workerUsage.elu.active.history.push(
+ message.taskPerformance.elu.active
+ )
+ workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
+ workerUsage.elu.active.median = median(workerUsage.elu.active.history)
+ }
}
}
}
* @param worker - The worker which should register a listener.
* @param listener - The message listener callback.
*/
- protected abstract registerWorkerMessageListener<
- Message extends Data | Response
- >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
+ private registerWorkerMessageListener<Message extends Data | Response>(
+ worker: Worker,
+ listener: (message: MessageValue<Message>) => void
+ ): void {
+ worker.on('message', listener as MessageHandler<Worker>)
+ }
/**
* Creates a new worker.
/**
* Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
- *
- * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
+ * Can be overridden.
*
* @param worker - The newly created worker.
*/
- protected abstract afterWorkerSetup (worker: Worker): void
+ protected afterWorkerSetup (worker: Worker): void {
+ // Listen to worker messages.
+ this.registerWorkerMessageListener(worker, this.workerListener())
+ // Send startup message to worker.
+ this.sendToWorker(worker, {
+ ready: false,
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
+ })
+ // Setup worker task statistics computation.
+ this.setWorkerStatistics(worker)
+ }
/**
* Creates a new worker and sets it up completely in the pool worker nodes.
if (this.emitter != null) {
this.emitter.emit(PoolEvents.error, error)
}
- if (this.opts.restartWorkerOnError === true) {
- this.createAndSetupWorker()
+ if (this.opts.restartWorkerOnError === true && !this.starting) {
+ if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
+ this.createAndSetupDynamicWorker()
+ } else {
+ this.createAndSetupWorker()
+ }
+ }
+ if (this.opts.enableTasksQueue === true) {
+ this.redistributeQueuedTasks(worker)
}
})
worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
this.pushWorkerNode(worker)
- this.setWorkerStatistics(worker)
-
this.afterWorkerSetup(worker)
return worker
}
+ private redistributeQueuedTasks (worker: Worker): void {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ let targetWorkerNodeKey: number = workerNodeKey
+ let minQueuedTasks = Infinity
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued === 0
+ ) {
+ targetWorkerNodeKey = workerNodeId
+ break
+ }
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued < minQueuedTasks
+ ) {
+ minQueuedTasks = workerNode.usage.tasks.queued
+ targetWorkerNodeKey = workerNodeId
+ }
+ }
+ this.enqueueTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+
/**
* Creates a new dynamic worker and sets it up completely in the pool worker nodes.
*
isKillBehavior(KillBehaviors.HARD, message.kill) ||
(message.kill != null &&
((this.opts.enableTasksQueue === false &&
- this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
- 0) ||
+ this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
(this.opts.enableTasksQueue === true &&
- this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
- 0 &&
+ this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
this.tasksQueueSize(workerNodeKey) === 0)))
) {
// Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
void (this.destroyWorker(worker) as Promise<void>)
}
})
+ const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
+ workerInfo.dynamic = true
+ this.sendToWorker(worker, {
+ checkAlive: true,
+ workerId: workerInfo.id as number
+ })
return worker
}
*/
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
- if (message.id != null) {
+ this.checkMessageWorkerId(message)
+ if (message.ready != null && message.workerId != null) {
+ // Worker ready message received
+ this.handleWorkerReadyMessage(message)
+ } else if (message.id != null) {
// Task execution response received
- const promiseResponse = this.promiseResponseMap.get(message.id)
- if (promiseResponse != null) {
- if (message.taskError != null) {
- promiseResponse.reject(message.taskError.message)
- if (this.emitter != null) {
- this.emitter.emit(PoolEvents.taskError, message.taskError)
- }
- } else {
- promiseResponse.resolve(message.data as Response)
- }
- this.afterTaskExecutionHook(promiseResponse.worker, message)
- this.promiseResponseMap.delete(message.id)
- const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
- if (
- this.opts.enableTasksQueue === true &&
- this.tasksQueueSize(workerNodeKey) > 0
- ) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
- }
- this.workerChoiceStrategyContext.update(workerNodeKey)
+ this.handleTaskExecutionResponse(message)
+ }
+ }
+ }
+
+ private handleWorkerReadyMessage (message: MessageValue<Response>): void {
+ const worker = this.getWorkerById(message.workerId)
+ this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
+ message.ready as boolean
+ if (this.emitter != null && this.ready) {
+ this.emitter.emit(PoolEvents.ready, this.info)
+ }
+ }
+
+ private handleTaskExecutionResponse (message: MessageValue<Response>): void {
+ const promiseResponse = this.promiseResponseMap.get(message.id as string)
+ if (promiseResponse != null) {
+ if (message.taskError != null) {
+ if (this.emitter != null) {
+ this.emitter.emit(PoolEvents.taskError, message.taskError)
}
+ promiseResponse.reject(message.taskError.message)
+ } else {
+ promiseResponse.resolve(message.data as Response)
+ }
+ this.afterTaskExecutionHook(promiseResponse.worker, message)
+ this.promiseResponseMap.delete(message.id as string)
+ const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
+ if (
+ this.opts.enableTasksQueue === true &&
+ this.tasksQueueSize(workerNodeKey) > 0
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
}
+ this.workerChoiceStrategyContext.update(workerNodeKey)
}
}
private checkAndEmitEvents (): void {
if (this.emitter != null) {
if (this.busy) {
- this.emitter?.emit(PoolEvents.busy, this.info)
+ this.emitter.emit(PoolEvents.busy, this.info)
}
if (this.type === PoolTypes.dynamic && this.full) {
- this.emitter?.emit(PoolEvents.full, this.info)
+ this.emitter.emit(PoolEvents.full, this.info)
}
}
}
/**
- * Sets the given worker node its tasks usage in the pool.
+ * Gets the worker information.
*
- * @param workerNode - The worker node.
- * @param workerUsage - The worker usage.
+ * @param workerNodeKey - The worker node key.
*/
- private setWorkerNodeTasksUsage (
- workerNode: WorkerNode<Worker, Data>,
- workerUsage: WorkerUsage
- ): void {
- workerNode.workerUsage = workerUsage
+ private getWorkerInfo (workerNodeKey: number): WorkerInfo {
+ return this.workerNodes[workerNodeKey].info
}
/**
* @returns The worker nodes length.
*/
private pushWorkerNode (worker: Worker): number {
- return this.workerNodes.push({
- worker,
- workerUsage: this.getWorkerUsage(worker),
- tasksQueue: new Queue<Task<Data>>()
- })
+ return this.workerNodes.push(new WorkerNode(worker, this.worker))
}
- // /**
- // * Sets the given worker in the pool worker nodes.
- // *
- // * @param workerNodeKey - The worker node key.
- // * @param worker - The worker.
- // * @param workerUsage - The worker usage.
- // * @param tasksQueue - The worker task queue.
- // */
- // private setWorkerNode (
- // workerNodeKey: number,
- // worker: Worker,
- // workerUsage: WorkerUsage,
- // tasksQueue: Queue<Task<Data>>
- // ): void {
- // this.workerNodes[workerNodeKey] = {
- // worker,
- // workerUsage,
- // tasksQueue
- // }
- // }
-
/**
* Removes the given worker from the pool worker nodes.
*
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
- return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
+ return this.workerNodes[workerNodeKey].enqueueTask(task)
}
private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
+ return this.workerNodes[workerNodeKey].dequeueTask()
}
private tasksQueueSize (workerNodeKey: number): number {
- return this.workerNodes[workerNodeKey].tasksQueue.size
+ return this.workerNodes[workerNodeKey].tasksQueueSize()
}
private flushTasksQueue (workerNodeKey: number): void {
- if (this.tasksQueueSize(workerNodeKey) > 0) {
- for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
- }
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
}
+ this.workerNodes[workerNodeKey].clearTasksQueue()
}
private flushTasksQueues (): void {
.runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu.aggregate
- }
- })
- }
-
- private getWorkerUsage (worker: Worker): WorkerUsage {
- return {
- tasks: this.getTaskStatistics(worker),
- runTime: {
- aggregate: 0,
- average: 0,
- median: 0,
- history: new CircularArray()
},
- waitTime: {
- aggregate: 0,
- average: 0,
- median: 0,
- history: new CircularArray()
- },
- elu: {
- idle: {
- aggregate: 0,
- average: 0,
- median: 0,
- history: new CircularArray()
- },
- active: {
- aggregate: 0,
- average: 0,
- median: 0,
- history: new CircularArray()
- },
- utilization: 0
- }
- }
- }
-
- private getTaskStatistics (worker: Worker): TaskStatistics {
- const queueSize =
- this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
- return {
- executed: 0,
- executing: 0,
- get queued (): number {
- return queueSize ?? 0
- },
- failed: 0
- }
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
+ })
}
}