import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
-import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
+import type {
+ MessageValue,
+ PromiseResponseWrapper,
+ Task
+} from '../utility-types'
import {
+ DEFAULT_TASK_NAME,
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
isKillBehavior,
round
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
-import { CircularArray } from '../circular-array'
-import { Queue } from '../queue'
import {
type IPool,
PoolEmitter,
type PoolOptions,
type PoolType,
PoolTypes,
- type TasksQueueOptions,
- type WorkerType,
- WorkerTypes
+ type TasksQueueOptions
} from './pool'
import type {
IWorker,
+ IWorkerNode,
MessageHandler,
- Task,
WorkerInfo,
- WorkerNode,
+ WorkerType,
WorkerUsage
} from './worker'
import {
} from './selection-strategies/selection-strategies-types'
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
import { version } from './version'
+import { WorkerNode } from './worker-node'
/**
* Base class that implements some shared logic for all poolifier pools.
Response = unknown
> implements IPool<Worker, Data, Response> {
/** @inheritDoc */
- public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
+ public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
/** @inheritDoc */
public readonly emitter?: PoolEmitter
'Cannot instantiate a pool with a negative number of workers'
)
} else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
- throw new Error('Cannot instantiate a fixed pool with no worker')
+ throw new RangeError('Cannot instantiate a fixed pool with zero worker')
+ }
+ }
+
+ protected checkDynamicPoolSize (min: number, max: number): void {
+ if (this.type === PoolTypes.dynamic) {
+ if (min > max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
+ )
+ } else if (min === 0 && max === 0) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
+ )
+ } else if (min === max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
+ )
+ }
}
}
version,
type: this.type,
worker: this.worker,
+ ready: this.ready,
+ strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
minSize: this.minSize,
maxSize: this.maxSize,
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
),
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.usage.tasks.maxQueued,
+ accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
0
),
failedTasks: this.workerNodes.reduce(
}
}
+ private get starting (): boolean {
+ return (
+ this.workerNodes.length < this.minSize ||
+ (this.workerNodes.length >= this.minSize &&
+ this.workerNodes.some(workerNode => !workerNode.info.ready))
+ )
+ }
+
+ private get ready (): boolean {
+ return (
+ this.workerNodes.length >= this.minSize &&
+ this.workerNodes.every(workerNode => workerNode.info.ready)
+ )
+ }
+
/**
* Gets the approximate pool utilization.
*
?.worker
}
+ private checkMessageWorkerId (message: MessageValue<Response>): void {
+ if (
+ message.workerId != null &&
+ this.getWorkerById(message.workerId) == null
+ ) {
+ throw new Error(
+ `Worker message received from unknown worker '${message.workerId}'`
+ )
+ }
+ }
+
/**
* Gets the given worker its worker node key.
*
this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
}
for (const workerNode of this.workerNodes) {
- this.setWorkerNodeTasksUsage(
- workerNode,
- this.getInitialWorkerUsage(workerNode.worker)
- )
+ workerNode.resetUsage()
this.setWorkerStatistics(workerNode.worker)
}
}
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode()
const submittedTask: Task<Data> = {
- name,
+ name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
timestamp,
+ workerId: this.getWorkerInfo(workerNodeKey).id as number,
id: randomUUID()
}
const res = new Promise<Response>((resolve, reject) => {
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
this.updateWaitTimeWorkerUsage(workerUsage, task)
+ const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
+ task.name as string
+ ) as WorkerUsage
+ ++taskWorkerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
}
/**
worker: Worker,
message: MessageValue<Response>
): void {
- const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ const workerUsage = this.workerNodes[workerNodeKey].usage
this.updateTaskStatisticsWorkerUsage(workerUsage, message)
this.updateRunTimeWorkerUsage(workerUsage, message)
this.updateEluWorkerUsage(workerUsage, message)
+ const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
+ message.name as string
+ ) as WorkerUsage
+ this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
+ this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
+ this.updateEluWorkerUsage(taskWorkerUsage, message)
}
private updateTaskStatisticsWorkerUsage (
protected afterWorkerSetup (worker: Worker): void {
// Listen to worker messages.
this.registerWorkerMessageListener(worker, this.workerListener())
+ // Send startup message to worker.
+ this.sendToWorker(worker, {
+ ready: false,
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
+ })
+ // Setup worker task statistics computation.
+ this.setWorkerStatistics(worker)
}
/**
if (this.opts.enableTasksQueue === true) {
this.redistributeQueuedTasks(worker)
}
- if (this.opts.restartWorkerOnError === true) {
+ if (this.opts.restartWorkerOnError === true && !this.starting) {
if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
this.createAndSetupDynamicWorker()
} else {
this.pushWorkerNode(worker)
- this.setWorkerStatistics(worker)
-
this.afterWorkerSetup(worker)
return worker
*/
protected createAndSetupDynamicWorker (): Worker {
const worker = this.createAndSetupWorker()
- this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
this.registerWorkerMessageListener(worker, message => {
const workerNodeKey = this.getWorkerNodeKey(worker)
if (
void (this.destroyWorker(worker) as Promise<void>)
}
})
- this.sendToWorker(worker, { dynamic: true })
+ const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
+ workerInfo.dynamic = true
+ this.sendToWorker(worker, {
+ checkAlive: true,
+ workerId: workerInfo.id as number
+ })
return worker
}
*/
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
- if (message.workerId != null && message.started != null) {
- // Worker started message received
- this.handleWorkerStartedMessage(message)
+ this.checkMessageWorkerId(message)
+ if (message.ready != null && message.workerId != null) {
+ // Worker ready message received
+ this.handleWorkerReadyMessage(message)
} else if (message.id != null) {
// Task execution response received
this.handleTaskExecutionResponse(message)
}
}
- private handleWorkerStartedMessage (message: MessageValue<Response>): void {
- // Worker started message received
- const worker = this.getWorkerById(message.workerId as number)
- if (worker != null) {
- this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
- message.started as boolean
- } else {
- throw new Error(
- `Worker started message received from unknown worker '${
- message.workerId as number
- }'`
- )
+ private handleWorkerReadyMessage (message: MessageValue<Response>): void {
+ const worker = this.getWorkerById(message.workerId)
+ this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
+ message.ready as boolean
+ if (this.emitter != null && this.ready) {
+ this.emitter.emit(PoolEvents.ready, this.info)
}
}
}
}
- /**
- * Sets the given worker node its tasks usage in the pool.
- *
- * @param workerNode - The worker node.
- * @param workerUsage - The worker usage.
- */
- private setWorkerNodeTasksUsage (
- workerNode: WorkerNode<Worker, Data>,
- workerUsage: WorkerUsage
- ): void {
- workerNode.usage = workerUsage
- }
-
/**
* Gets the worker information.
*
* @returns The worker nodes length.
*/
private pushWorkerNode (worker: Worker): number {
- this.workerNodes.push({
- worker,
- info: this.getInitialWorkerInfo(worker),
- usage: this.getInitialWorkerUsage(),
- tasksQueue: new Queue<Task<Data>>()
- })
- this.setWorkerNodeTasksUsage(
- this.workerNodes[this.getWorkerNodeKey(worker)],
- this.getInitialWorkerUsage(worker)
- )
- return this.workerNodes.length
- }
-
- /**
- * Gets the worker id.
- *
- * @param worker - The worker.
- * @returns The worker id.
- */
- private getWorkerId (worker: Worker): number | undefined {
- if (this.worker === WorkerTypes.thread) {
- return worker.threadId
- } else if (this.worker === WorkerTypes.cluster) {
- return worker.id
- }
+ return this.workerNodes.push(new WorkerNode(worker, this.worker))
}
- // /**
- // * Sets the given worker in the pool worker nodes.
- // *
- // * @param workerNodeKey - The worker node key.
- // * @param worker - The worker.
- // * @param workerInfo - The worker info.
- // * @param workerUsage - The worker usage.
- // * @param tasksQueue - The worker task queue.
- // */
- // private setWorkerNode (
- // workerNodeKey: number,
- // worker: Worker,
- // workerInfo: WorkerInfo,
- // workerUsage: WorkerUsage,
- // tasksQueue: Queue<Task<Data>>
- // ): void {
- // this.workerNodes[workerNodeKey] = {
- // worker,
- // info: workerInfo,
- // usage: workerUsage,
- // tasksQueue
- // }
- // }
-
/**
* Removes the given worker from the pool worker nodes.
*
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
- return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
+ return this.workerNodes[workerNodeKey].enqueueTask(task)
}
private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
+ return this.workerNodes[workerNodeKey].dequeueTask()
}
private tasksQueueSize (workerNodeKey: number): number {
- return this.workerNodes[workerNodeKey].tasksQueue.size
- }
-
- private tasksMaxQueueSize (workerNodeKey: number): number {
- return this.workerNodes[workerNodeKey].tasksQueue.maxSize
+ return this.workerNodes[workerNodeKey].tasksQueueSize()
}
private flushTasksQueue (workerNodeKey: number): void {
this.dequeueTask(workerNodeKey) as Task<Data>
)
}
- this.workerNodes[workerNodeKey].tasksQueue.clear()
+ this.workerNodes[workerNodeKey].clearTasksQueue()
}
private flushTasksQueues (): void {
.runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu.aggregate
- }
- })
- }
-
- private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
- const getTasksQueueSize = (worker?: Worker): number => {
- if (worker == null) {
- return 0
- }
- return this.tasksQueueSize(this.getWorkerNodeKey(worker))
- }
- const getTasksMaxQueueSize = (worker?: Worker): number => {
- if (worker == null) {
- return 0
- }
- return this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
- }
- return {
- tasks: {
- executed: 0,
- executing: 0,
- get queued (): number {
- return getTasksQueueSize(worker)
- },
- get maxQueued (): number {
- return getTasksMaxQueueSize(worker)
- },
- failed: 0
},
- runTime: {
- history: new CircularArray()
- },
- waitTime: {
- history: new CircularArray()
- },
- elu: {
- idle: {
- history: new CircularArray()
- },
- active: {
- history: new CircularArray()
- }
- }
- }
- }
-
- private getInitialWorkerInfo (worker: Worker): WorkerInfo {
- return { id: this.getWorkerId(worker), dynamic: false, started: true }
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
+ })
}
}