import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
-import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
+import { existsSync } from 'node:fs'
+import type {
+ MessageValue,
+ PromiseResponseWrapper,
+ Task
+} from '../utility-types'
import {
+ DEFAULT_TASK_NAME,
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
isKillBehavior,
IWorker,
IWorkerNode,
MessageHandler,
- Task,
WorkerInfo,
WorkerType,
WorkerUsage
this.setupHook()
- while (this.workerNodes.length < this.numberOfWorkers) {
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.numberOfWorkers
+ ) {
this.createAndSetupWorker()
}
private checkFilePath (filePath: string): void {
if (
filePath == null ||
+ typeof filePath !== 'string' ||
(typeof filePath === 'string' && filePath.trim().length === 0)
) {
throw new Error('Please specify a file with a worker implementation')
}
+ if (!existsSync(filePath)) {
+ throw new Error(`Cannot find the worker file '${filePath}'`)
+ }
}
private checkNumberOfWorkers (numberOfWorkers: number): void {
throw new RangeError(
'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
)
- } else if (min === 0 && max === 0) {
+ } else if (max === 0) {
throw new RangeError(
- 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
+ 'Cannot instantiate a dynamic pool with a pool size equal to zero'
)
} else if (min === max) {
throw new RangeError(
),
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.usage.tasks.maxQueued,
+ accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
0
),
failedTasks: this.workerNodes.reduce(
private get starting (): boolean {
return (
- !this.full ||
- (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.minSize
)
}
private get ready (): boolean {
return (
- this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic && workerNode.info.ready
+ ? accumulator + 1
+ : accumulator,
+ 0
+ ) >= this.minSize
)
}
* @returns The pool utilization.
*/
private get utilization (): number {
- const poolRunTimeCapacity =
+ const poolTimeCapacity =
(performance.now() - this.startTimestamp) * this.maxSize
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
0
)
- return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
+ return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
}
/**
?.worker
}
+ /**
+ * Checks if the worker id sent in the received message from a worker is valid.
+ *
+ * @param message - The received message.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
+ */
private checkMessageWorkerId (message: MessageValue<Response>): void {
if (
message.workerId != null &&
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode()
const submittedTask: Task<Data> = {
- name,
+ name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
timestamp,
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
this.updateWaitTimeWorkerUsage(workerUsage, task)
+ const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
+ task.name as string
+ ) as WorkerUsage
+ ++taskWorkerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
}
/**
worker: Worker,
message: MessageValue<Response>
): void {
- const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ const workerUsage = this.workerNodes[workerNodeKey].usage
this.updateTaskStatisticsWorkerUsage(workerUsage, message)
this.updateRunTimeWorkerUsage(workerUsage, message)
this.updateEluWorkerUsage(workerUsage, message)
+ const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
+ message.taskPerformance?.name ?? DEFAULT_TASK_NAME
+ ) as WorkerUsage
+ this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
+ this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
+ this.updateEluWorkerUsage(taskWorkerUsage, message)
}
private updateTaskStatisticsWorkerUsage (
message: MessageValue<Data>
): void
- /**
- * Registers a listener callback on the given worker.
- *
- * @param worker - The worker which should register a listener.
- * @param listener - The message listener callback.
- */
- private registerWorkerMessageListener<Message extends Data | Response>(
- worker: Worker,
- listener: (message: MessageValue<Message>) => void
- ): void {
- worker.on('message', listener as MessageHandler<Worker>)
- }
-
/**
* Creates a new worker.
*
*/
protected abstract createWorker (): Worker
- /**
- * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
- * Can be overridden.
- *
- * @param worker - The newly created worker.
- */
- protected afterWorkerSetup (worker: Worker): void {
- // Listen to worker messages.
- this.registerWorkerMessageListener(worker, this.workerListener())
- // Send startup message to worker.
- this.sendToWorker(worker, {
- ready: false,
- workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
- })
- // Setup worker task statistics computation.
- this.setWorkerStatistics(worker)
- }
-
/**
* Creates a new worker and sets it up completely in the pool worker nodes.
*
worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
worker.on('error', error => {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ workerInfo.ready = false
if (this.emitter != null) {
this.emitter.emit(PoolEvents.error, error)
}
- if (this.opts.enableTasksQueue === true) {
- this.redistributeQueuedTasks(worker)
- }
if (this.opts.restartWorkerOnError === true && !this.starting) {
- if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
+ if (workerInfo.dynamic) {
this.createAndSetupDynamicWorker()
} else {
this.createAndSetupWorker()
}
}
+ if (this.opts.enableTasksQueue === true) {
+ this.redistributeQueuedTasks(workerNodeKey)
+ }
})
worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
this.removeWorkerNode(worker)
})
- this.pushWorkerNode(worker)
+ this.addWorkerNode(worker)
this.afterWorkerSetup(worker)
return worker
}
- private redistributeQueuedTasks (worker: Worker): void {
- const workerNodeKey = this.getWorkerNodeKey(worker)
- while (this.tasksQueueSize(workerNodeKey) > 0) {
- let targetWorkerNodeKey: number = workerNodeKey
- let minQueuedTasks = Infinity
- for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
- if (
- workerNodeId !== workerNodeKey &&
- workerNode.usage.tasks.queued === 0
- ) {
- targetWorkerNodeKey = workerNodeId
- break
- }
- if (
- workerNodeId !== workerNodeKey &&
- workerNode.usage.tasks.queued < minQueuedTasks
- ) {
- minQueuedTasks = workerNode.usage.tasks.queued
- targetWorkerNodeKey = workerNodeId
- }
- }
- this.enqueueTask(
- targetWorkerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
- }
- }
-
/**
* Creates a new dynamic worker and sets it up completely in the pool worker nodes.
*
})
const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
workerInfo.dynamic = true
+ if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
+ workerInfo.ready = true
+ }
this.sendToWorker(worker, {
- checkAlive: true,
+ checkActive: true,
workerId: workerInfo.id as number
})
return worker
}
+ /**
+ * Registers a listener callback on the given worker.
+ *
+ * @param worker - The worker which should register a listener.
+ * @param listener - The message listener callback.
+ */
+ private registerWorkerMessageListener<Message extends Data | Response>(
+ worker: Worker,
+ listener: (message: MessageValue<Message>) => void
+ ): void {
+ worker.on('message', listener as MessageHandler<Worker>)
+ }
+
+ /**
+ * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
+ * Can be overridden.
+ *
+ * @param worker - The newly created worker.
+ */
+ protected afterWorkerSetup (worker: Worker): void {
+ // Listen to worker messages.
+ this.registerWorkerMessageListener(worker, this.workerListener())
+ // Send startup message to worker.
+ this.sendWorkerStartupMessage(worker)
+ // Setup worker task statistics computation.
+ this.setWorkerStatistics(worker)
+ }
+
+ private sendWorkerStartupMessage (worker: Worker): void {
+ this.sendToWorker(worker, {
+ ready: false,
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
+ })
+ }
+
+ private redistributeQueuedTasks (workerNodeKey: number): void {
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ let targetWorkerNodeKey: number = workerNodeKey
+ let minQueuedTasks = Infinity
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ const workerInfo = this.getWorkerInfo(workerNodeId)
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerInfo.ready &&
+ workerNode.usage.tasks.queued === 0
+ ) {
+ targetWorkerNodeKey = workerNodeId
+ break
+ }
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerInfo.ready &&
+ workerNode.usage.tasks.queued < minQueuedTasks
+ ) {
+ minQueuedTasks = workerNode.usage.tasks.queued
+ targetWorkerNodeKey = workerNodeId
+ }
+ }
+ this.enqueueTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+
/**
* This function is the listener registered for each worker message.
*
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
this.checkMessageWorkerId(message)
- if (message.ready != null && message.workerId != null) {
- // Worker ready message received
- this.handleWorkerReadyMessage(message)
+ if (message.ready != null) {
+ // Worker ready response received
+ this.handleWorkerReadyResponse(message)
} else if (message.id != null) {
// Task execution response received
this.handleTaskExecutionResponse(message)
}
}
- private handleWorkerReadyMessage (message: MessageValue<Response>): void {
+ private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const worker = this.getWorkerById(message.workerId)
this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
message.ready as boolean
}
/**
- * Pushes the given worker in the pool worker nodes.
+ * Adds the given worker in the pool worker nodes.
*
* @param worker - The worker.
* @returns The worker nodes length.
*/
- private pushWorkerNode (worker: Worker): number {
- return this.workerNodes.push(new WorkerNode(worker, this.worker))
+ private addWorkerNode (worker: Worker): number {
+ const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+ // Flag the worker node as ready at pool startup.
+ if (this.starting) {
+ workerNode.info.ready = true
+ }
+ return this.workerNodes.push(workerNode)
}
/**
}
}
+ /**
+ * Executes the given task on the given worker.
+ *
+ * @param worker - The worker.
+ * @param task - The task to execute.
+ */
private executeTask (workerNodeKey: number, task: Task<Data>): void {
this.beforeTaskExecutionHook(workerNodeKey, task)
this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)