import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
-import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
+import type {
+ MessageValue,
+ PromiseResponseWrapper,
+ Task
+} from '../utility-types'
import {
+ DEFAULT_TASK_NAME,
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
isKillBehavior,
IWorker,
IWorkerNode,
MessageHandler,
- Task,
WorkerInfo,
WorkerType,
WorkerUsage
),
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.usage.tasks.maxQueued,
+ accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
0
),
failedTasks: this.workerNodes.reduce(
private get starting (): boolean {
return (
- !this.full ||
- (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
+ this.workerNodes.length < this.minSize ||
+ (this.workerNodes.length >= this.minSize &&
+ this.workerNodes.some(workerNode => !workerNode.info.ready))
)
}
private get ready (): boolean {
return (
- this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
+ this.workerNodes.length >= this.minSize &&
+ this.workerNodes.every(workerNode => workerNode.info.ready)
)
}
?.worker
}
+ /**
+ * Checks if the worker id sent in the received message from a worker is valid.
+ *
+ * @param message - The received message.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
+ */
private checkMessageWorkerId (message: MessageValue<Response>): void {
if (
message.workerId != null &&
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode()
const submittedTask: Task<Data> = {
- name,
+ name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
timestamp,
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
this.updateWaitTimeWorkerUsage(workerUsage, task)
+ const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
+ task.name as string
+ ) as WorkerUsage
+ ++taskWorkerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
}
/**
worker: Worker,
message: MessageValue<Response>
): void {
- const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ const workerUsage = this.workerNodes[workerNodeKey].usage
this.updateTaskStatisticsWorkerUsage(workerUsage, message)
this.updateRunTimeWorkerUsage(workerUsage, message)
this.updateEluWorkerUsage(workerUsage, message)
+ const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
+ message.taskPerformance?.name ?? DEFAULT_TASK_NAME
+ ) as WorkerUsage
+ this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
+ this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
+ this.updateEluWorkerUsage(taskWorkerUsage, message)
}
private updateTaskStatisticsWorkerUsage (
if (this.emitter != null) {
this.emitter.emit(PoolEvents.error, error)
}
- if (this.opts.enableTasksQueue === true) {
- this.redistributeQueuedTasks(worker)
- }
if (this.opts.restartWorkerOnError === true && !this.starting) {
if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
this.createAndSetupDynamicWorker()
this.createAndSetupWorker()
}
}
+ if (this.opts.enableTasksQueue === true) {
+ this.redistributeQueuedTasks(worker)
+ }
})
worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)