-import crypto from 'node:crypto'
+import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
import {
round
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
-import { CircularArray } from '../circular-array'
-import { Queue } from '../queue'
import {
type IPool,
PoolEmitter,
type PoolOptions,
type PoolType,
PoolTypes,
- type TasksQueueOptions,
- type WorkerType,
- WorkerTypes
+ type TasksQueueOptions
} from './pool'
import type {
IWorker,
+ IWorkerNode,
MessageHandler,
Task,
- WorkerNode,
+ WorkerInfo,
+ WorkerType,
WorkerUsage
} from './worker'
import {
type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types'
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+import { version } from './version'
+import { WorkerNode } from './worker-node'
/**
* Base class that implements some shared logic for all poolifier pools.
Response = unknown
> implements IPool<Worker, Data, Response> {
/** @inheritDoc */
- public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
+ public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
/** @inheritDoc */
public readonly emitter?: PoolEmitter
'Cannot instantiate a pool with a negative number of workers'
)
} else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
- throw new Error('Cannot instantiate a fixed pool with no worker')
+ throw new RangeError('Cannot instantiate a fixed pool with zero worker')
+ }
+ }
+
+ protected checkDynamicPoolSize (min: number, max: number): void {
+ if (this.type === PoolTypes.dynamic && min > max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
+ )
+ } else if (this.type === PoolTypes.dynamic && min === 0 && max === 0) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
+ )
+ } else if (this.type === PoolTypes.dynamic && min === max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
+ )
}
}
/** @inheritDoc */
public get info (): PoolInfo {
return {
+ version,
type: this.type,
worker: this.worker,
+ ready: this.ready,
+ strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
minSize: this.minSize,
maxSize: this.maxSize,
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
(accumulator, workerNode) =>
accumulator + workerNode.usage.tasks.failed,
0
- )
+ ),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate && {
+ runTime: {
+ minimum: round(
+ Math.min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.runTime?.minimum ?? Infinity
+ )
+ )
+ ),
+ maximum: round(
+ Math.max(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
+ )
+ )
+ ),
+ average: round(
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
+ 0
+ ) /
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.tasks?.executed ?? 0),
+ 0
+ )
+ ),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.median && {
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.runTime?.median ?? 0
+ )
+ )
+ )
+ })
+ }
+ }),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && {
+ waitTime: {
+ minimum: round(
+ Math.min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
+ )
+ )
+ ),
+ maximum: round(
+ Math.max(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
+ )
+ )
+ ),
+ average: round(
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
+ 0
+ ) /
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.tasks?.executed ?? 0),
+ 0
+ )
+ ),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.median && {
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime?.median ?? 0
+ )
+ )
+ )
+ })
+ }
+ })
}
}
- /**
- * Gets the pool run time.
- *
- * @returns The pool run time in milliseconds.
- */
- private get runTime (): number {
- return performance.now() - this.startTimestamp
+ private get starting (): boolean {
+ return (
+ !this.full ||
+ (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
+ )
+ }
+
+ private get ready (): boolean {
+ return (
+ this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
+ )
}
/**
* @returns The pool utilization.
*/
private get utilization (): number {
- const poolRunTimeCapacity = this.runTime * this.maxSize
+ const poolRunTimeCapacity =
+ (performance.now() - this.startTimestamp) * this.maxSize
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.usage.runTime.aggregate,
+ accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
0
)
const totalTasksWaitTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.usage.waitTime.aggregate,
+ accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
0
)
return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
?.worker
}
+ private checkMessageWorkerId (message: MessageValue<Response>): void {
+ if (
+ message.workerId != null &&
+ this.getWorkerById(message.workerId) == null
+ ) {
+ throw new Error(
+ `Worker message received from unknown worker '${message.workerId}'`
+ )
+ }
+ }
+
/**
* Gets the given worker its worker node key.
*
if (workerChoiceStrategyOptions != null) {
this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
}
- for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
- this.setWorkerNodeTasksUsage(
- workerNode,
- this.getWorkerUsage(workerNodeKey)
- )
+ for (const workerNode of this.workerNodes) {
+ workerNode.resetUsage()
this.setWorkerStatistics(workerNode.worker)
}
}
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
timestamp,
- id: crypto.randomUUID()
+ workerId: this.getWorkerInfo(workerNodeKey).id as number,
+ id: randomUUID()
}
const res = new Promise<Response>((resolve, reject) => {
this.promiseResponseMap.set(submittedTask.id as string, {
this.workerNodes.map(async (workerNode, workerNodeKey) => {
this.flushTasksQueue(workerNodeKey)
// FIXME: wait for tasks to be finished
+ const workerExitPromise = new Promise<void>(resolve => {
+ workerNode.worker.on('exit', () => {
+ resolve()
+ })
+ })
await this.destroyWorker(workerNode.worker)
+ await workerExitPromise
})
)
}
): void {
const workerTaskStatistics = workerUsage.tasks
--workerTaskStatistics.executing
- ++workerTaskStatistics.executed
- if (message.taskError != null) {
+ if (message.taskError == null) {
+ ++workerTaskStatistics.executed
+ } else {
++workerTaskStatistics.failed
}
}
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
.aggregate
) {
- workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
+ const taskRunTime = message.taskPerformance?.runTime ?? 0
+ workerUsage.runTime.aggregate =
+ (workerUsage.runTime.aggregate ?? 0) + taskRunTime
+ workerUsage.runTime.minimum = Math.min(
+ taskRunTime,
+ workerUsage.runTime?.minimum ?? Infinity
+ )
+ workerUsage.runTime.maximum = Math.max(
+ taskRunTime,
+ workerUsage.runTime?.maximum ?? -Infinity
+ )
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
.average &&
workerUsage.tasks.executed !== 0
) {
workerUsage.runTime.average =
- workerUsage.runTime.aggregate /
- (workerUsage.tasks.executed - workerUsage.tasks.failed)
+ workerUsage.runTime.aggregate / workerUsage.tasks.executed
}
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
.aggregate
) {
- workerUsage.waitTime.aggregate += taskWaitTime ?? 0
+ workerUsage.waitTime.aggregate =
+ (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
+ workerUsage.waitTime.minimum = Math.min(
+ taskWaitTime,
+ workerUsage.waitTime?.minimum ?? Infinity
+ )
+ workerUsage.waitTime.maximum = Math.max(
+ taskWaitTime,
+ workerUsage.waitTime?.maximum ?? -Infinity
+ )
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.waitTime.average &&
workerUsage.tasks.executed !== 0
) {
workerUsage.waitTime.average =
- workerUsage.waitTime.aggregate /
- (workerUsage.tasks.executed - workerUsage.tasks.failed)
+ workerUsage.waitTime.aggregate / workerUsage.tasks.executed
}
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .waitTime.median &&
- taskWaitTime != null
+ .waitTime.median
) {
workerUsage.waitTime.history.push(taskWaitTime)
workerUsage.waitTime.median = median(workerUsage.waitTime.history)
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
.aggregate
) {
- if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
- workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
- workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
- workerUsage.elu.utilization =
- (workerUsage.elu.utilization +
- message.taskPerformance.elu.utilization) /
- 2
- } else if (message.taskPerformance?.elu != null) {
- workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
- workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
- workerUsage.elu.utilization = message.taskPerformance.elu.utilization
- }
- if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
- .average &&
- workerUsage.tasks.executed !== 0
- ) {
- const executedTasks =
- workerUsage.tasks.executed - workerUsage.tasks.failed
- workerUsage.elu.idle.average =
- workerUsage.elu.idle.aggregate / executedTasks
- workerUsage.elu.active.average =
- workerUsage.elu.active.aggregate / executedTasks
- }
- if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
- .median &&
- message.taskPerformance?.elu != null
- ) {
- workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
- workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
- workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
- workerUsage.elu.active.median = median(workerUsage.elu.active.history)
+ if (message.taskPerformance?.elu != null) {
+ workerUsage.elu.idle.aggregate =
+ (workerUsage.elu.idle?.aggregate ?? 0) +
+ message.taskPerformance.elu.idle
+ workerUsage.elu.active.aggregate =
+ (workerUsage.elu.active?.aggregate ?? 0) +
+ message.taskPerformance.elu.active
+ if (workerUsage.elu.utilization != null) {
+ workerUsage.elu.utilization =
+ (workerUsage.elu.utilization +
+ message.taskPerformance.elu.utilization) /
+ 2
+ } else {
+ workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+ }
+ workerUsage.elu.idle.minimum = Math.min(
+ message.taskPerformance.elu.idle,
+ workerUsage.elu.idle?.minimum ?? Infinity
+ )
+ workerUsage.elu.idle.maximum = Math.max(
+ message.taskPerformance.elu.idle,
+ workerUsage.elu.idle?.maximum ?? -Infinity
+ )
+ workerUsage.elu.active.minimum = Math.min(
+ message.taskPerformance.elu.active,
+ workerUsage.elu.active?.minimum ?? Infinity
+ )
+ workerUsage.elu.active.maximum = Math.max(
+ message.taskPerformance.elu.active,
+ workerUsage.elu.active?.maximum ?? -Infinity
+ )
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .average &&
+ workerUsage.tasks.executed !== 0
+ ) {
+ workerUsage.elu.idle.average =
+ workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
+ workerUsage.elu.active.average =
+ workerUsage.elu.active.aggregate / workerUsage.tasks.executed
+ }
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .median
+ ) {
+ workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
+ workerUsage.elu.active.history.push(
+ message.taskPerformance.elu.active
+ )
+ workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
+ workerUsage.elu.active.median = median(workerUsage.elu.active.history)
+ }
}
}
}
protected afterWorkerSetup (worker: Worker): void {
// Listen to worker messages.
this.registerWorkerMessageListener(worker, this.workerListener())
+ // Send startup message to worker.
+ this.sendToWorker(worker, {
+ ready: false,
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
+ })
+ // Setup worker task statistics computation.
+ this.setWorkerStatistics(worker)
}
/**
if (this.emitter != null) {
this.emitter.emit(PoolEvents.error, error)
}
- if (this.opts.restartWorkerOnError === true) {
- this.createAndSetupWorker()
+ if (this.opts.enableTasksQueue === true) {
+ this.redistributeQueuedTasks(worker)
+ }
+ if (this.opts.restartWorkerOnError === true && !this.starting) {
+ if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
+ this.createAndSetupDynamicWorker()
+ } else {
+ this.createAndSetupWorker()
+ }
}
})
worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
this.pushWorkerNode(worker)
- this.setWorkerStatistics(worker)
-
this.afterWorkerSetup(worker)
return worker
}
+ private redistributeQueuedTasks (worker: Worker): void {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ let targetWorkerNodeKey: number = workerNodeKey
+ let minQueuedTasks = Infinity
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued === 0
+ ) {
+ targetWorkerNodeKey = workerNodeId
+ break
+ }
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued < minQueuedTasks
+ ) {
+ minQueuedTasks = workerNode.usage.tasks.queued
+ targetWorkerNodeKey = workerNodeId
+ }
+ }
+ this.enqueueTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+
/**
* Creates a new dynamic worker and sets it up completely in the pool worker nodes.
*
void (this.destroyWorker(worker) as Promise<void>)
}
})
+ const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
+ workerInfo.dynamic = true
+ this.sendToWorker(worker, {
+ checkAlive: true,
+ workerId: workerInfo.id as number
+ })
return worker
}
*/
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
- if (message.workerId != null && message.started != null) {
- // Worker started message received
- this.handleWorkerStartedMessage(message)
+ this.checkMessageWorkerId(message)
+ if (message.ready != null && message.workerId != null) {
+ // Worker ready message received
+ this.handleWorkerReadyMessage(message)
} else if (message.id != null) {
// Task execution response received
this.handleTaskExecutionResponse(message)
}
}
- private handleWorkerStartedMessage (message: MessageValue<Response>): void {
- // Worker started message received
- const worker = this.getWorkerById(message.workerId as number)
- if (worker != null) {
- this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
- message.started as boolean
- } else {
- throw new Error(
- `Worker started message received from unknown worker '${
- message.workerId as number
- }'`
- )
+ private handleWorkerReadyMessage (message: MessageValue<Response>): void {
+ const worker = this.getWorkerById(message.workerId)
+ this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
+ message.ready as boolean
+ if (this.emitter != null && this.ready) {
+ this.emitter.emit(PoolEvents.ready, this.info)
}
}
private checkAndEmitEvents (): void {
if (this.emitter != null) {
if (this.busy) {
- this.emitter?.emit(PoolEvents.busy, this.info)
+ this.emitter.emit(PoolEvents.busy, this.info)
}
if (this.type === PoolTypes.dynamic && this.full) {
- this.emitter?.emit(PoolEvents.full, this.info)
+ this.emitter.emit(PoolEvents.full, this.info)
}
}
}
/**
- * Sets the given worker node its tasks usage in the pool.
+ * Gets the worker information.
*
- * @param workerNode - The worker node.
- * @param workerUsage - The worker usage.
+ * @param workerNodeKey - The worker node key.
*/
- private setWorkerNodeTasksUsage (
- workerNode: WorkerNode<Worker, Data>,
- workerUsage: WorkerUsage
- ): void {
- workerNode.usage = workerUsage
+ private getWorkerInfo (workerNodeKey: number): WorkerInfo {
+ return this.workerNodes[workerNodeKey].info
}
/**
* @returns The worker nodes length.
*/
private pushWorkerNode (worker: Worker): number {
- this.workerNodes.push({
- worker,
- info: { id: this.getWorkerId(worker), started: true },
- usage: this.getWorkerUsage(),
- tasksQueue: new Queue<Task<Data>>()
- })
- const workerNodeKey = this.getWorkerNodeKey(worker)
- this.setWorkerNodeTasksUsage(
- this.workerNodes[workerNodeKey],
- this.getWorkerUsage(workerNodeKey)
- )
- return this.workerNodes.length
+ return this.workerNodes.push(new WorkerNode(worker, this.worker))
}
- /**
- * Gets the worker id.
- *
- * @param worker - The worker.
- * @returns The worker id.
- */
- private getWorkerId (worker: Worker): number | undefined {
- if (this.worker === WorkerTypes.thread) {
- return worker.threadId
- } else if (this.worker === WorkerTypes.cluster) {
- return worker.id
- }
- }
-
- // /**
- // * Sets the given worker in the pool worker nodes.
- // *
- // * @param workerNodeKey - The worker node key.
- // * @param worker - The worker.
- // * @param workerInfo - The worker info.
- // * @param workerUsage - The worker usage.
- // * @param tasksQueue - The worker task queue.
- // */
- // private setWorkerNode (
- // workerNodeKey: number,
- // worker: Worker,
- // workerInfo: WorkerInfo,
- // workerUsage: WorkerUsage,
- // tasksQueue: Queue<Task<Data>>
- // ): void {
- // this.workerNodes[workerNodeKey] = {
- // worker,
- // info: workerInfo,
- // usage: workerUsage,
- // tasksQueue
- // }
- // }
-
/**
* Removes the given worker from the pool worker nodes.
*
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
- return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
+ return this.workerNodes[workerNodeKey].enqueueTask(task)
}
private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
+ return this.workerNodes[workerNodeKey].dequeueTask()
}
private tasksQueueSize (workerNodeKey: number): number {
- return this.workerNodes[workerNodeKey].tasksQueue.size
- }
-
- private tasksMaxQueueSize (workerNodeKey: number): number {
- return this.workerNodes[workerNodeKey].tasksQueue.maxSize
+ return this.workerNodes[workerNodeKey].tasksQueueSize()
}
private flushTasksQueue (workerNodeKey: number): void {
- if (this.tasksQueueSize(workerNodeKey) > 0) {
- for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
- }
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
}
- this.workerNodes[workerNodeKey].tasksQueue.clear()
+ this.workerNodes[workerNodeKey].clearTasksQueue()
}
private flushTasksQueues (): void {
.runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu.aggregate
- }
- })
- }
-
- private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
- const getTasksQueueSize = (workerNodeKey?: number): number => {
- return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
- }
- const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
- return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
- }
- return {
- tasks: {
- executed: 0,
- executing: 0,
- get queued (): number {
- return getTasksQueueSize(workerNodeKey)
- },
- get maxQueued (): number {
- return getTasksMaxQueueSize(workerNodeKey)
- },
- failed: 0
- },
- runTime: {
- aggregate: 0,
- average: 0,
- median: 0,
- history: new CircularArray()
- },
- waitTime: {
- aggregate: 0,
- average: 0,
- median: 0,
- history: new CircularArray()
},
- elu: {
- idle: {
- aggregate: 0,
- average: 0,
- median: 0,
- history: new CircularArray()
- },
- active: {
- aggregate: 0,
- average: 0,
- median: 0,
- history: new CircularArray()
- },
- utilization: 0
- }
- }
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
+ })
}
}