import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
-import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
+import type {
+ MessageValue,
+ PromiseResponseWrapper,
+ Task
+} from '../utility-types'
import {
+ DEFAULT_TASK_NAME,
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
isKillBehavior,
IWorker,
IWorkerNode,
MessageHandler,
- Task,
WorkerInfo,
WorkerType,
WorkerUsage
}
protected checkDynamicPoolSize (min: number, max: number): void {
- if (this.type === PoolTypes.dynamic && min > max) {
- throw new RangeError(
- 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
- )
- } else if (this.type === PoolTypes.dynamic && min === max) {
- throw new RangeError(
- 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
- )
+ if (this.type === PoolTypes.dynamic) {
+ if (min > max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
+ )
+ } else if (min === 0 && max === 0) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
+ )
+ } else if (min === max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
+ )
+ }
}
}
),
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.usage.tasks.maxQueued,
+ accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
0
),
failedTasks: this.workerNodes.reduce(
private get starting (): boolean {
return (
- !this.full ||
- (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
+ this.workerNodes.length < this.minSize ||
+ (this.workerNodes.length >= this.minSize &&
+ this.workerNodes.some(workerNode => !workerNode.info.ready))
)
}
private get ready (): boolean {
return (
- this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
+ this.workerNodes.length >= this.minSize &&
+ this.workerNodes.every(workerNode => workerNode.info.ready)
)
}
?.worker
}
+ private checkMessageWorkerId (message: MessageValue<Response>): void {
+ if (
+ message.workerId != null &&
+ this.getWorkerById(message.workerId) == null
+ ) {
+ throw new Error(
+ `Worker message received from unknown worker '${message.workerId}'`
+ )
+ }
+ }
+
/**
* Gets the given worker its worker node key.
*
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode()
const submittedTask: Task<Data> = {
- name,
+ name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
timestamp,
+ workerId: this.getWorkerInfo(workerNodeKey).id as number,
id: randomUUID()
}
const res = new Promise<Response>((resolve, reject) => {
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
this.updateWaitTimeWorkerUsage(workerUsage, task)
+ const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
+ task.name as string
+ ) as WorkerUsage
+ ++taskWorkerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
}
/**
worker: Worker,
message: MessageValue<Response>
): void {
- const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ const workerUsage = this.workerNodes[workerNodeKey].usage
this.updateTaskStatisticsWorkerUsage(workerUsage, message)
this.updateRunTimeWorkerUsage(workerUsage, message)
this.updateEluWorkerUsage(workerUsage, message)
+ const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
+ message.name as string
+ ) as WorkerUsage
+ this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
+ this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
+ this.updateEluWorkerUsage(taskWorkerUsage, message)
}
private updateTaskStatisticsWorkerUsage (
// Send startup message to worker.
this.sendToWorker(worker, {
ready: false,
- workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
})
// Setup worker task statistics computation.
this.setWorkerStatistics(worker)
void (this.destroyWorker(worker) as Promise<void>)
}
})
- this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
- this.sendToWorker(worker, { checkAlive: true })
+ const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
+ workerInfo.dynamic = true
+ this.sendToWorker(worker, {
+ checkAlive: true,
+ workerId: workerInfo.id as number
+ })
return worker
}
*/
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
+ this.checkMessageWorkerId(message)
if (message.ready != null && message.workerId != null) {
// Worker ready message received
this.handleWorkerReadyMessage(message)
}
private handleWorkerReadyMessage (message: MessageValue<Response>): void {
- const worker = this.getWorkerById(message.workerId as number)
- if (worker != null) {
- this.getWorkerInfo(this.getWorkerNodeKey(worker)).ready =
- message.ready as boolean
- } else {
- throw new Error(
- `Worker ready message received from unknown worker '${
- message.workerId as number
- }'`
- )
- }
+ const worker = this.getWorkerById(message.workerId)
+ this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
+ message.ready as boolean
if (this.emitter != null && this.ready) {
this.emitter.emit(PoolEvents.ready, this.info)
}
.runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu.aggregate
- }
+ },
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
})
}
}