* The start timestamp of the pool.
*/
private readonly startTimestamp
- /**
- * The task function names.
- */
- private taskFunctions!: string[]
/**
* Constructs a new poolifier pool.
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
- this.opts.workerChoiceStrategyOptions =
- opts.workerChoiceStrategyOptions ??
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ this.opts.workerChoiceStrategyOptions = {
+ ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+ ...opts.workerChoiceStrategyOptions
+ }
this.checkValidWorkerChoiceStrategyOptions(
this.opts.workerChoiceStrategyOptions
)
'Invalid worker choice strategy options: must be a plain object'
)
}
+ if (
+ workerChoiceStrategyOptions.choiceRetries != null &&
+ !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
+ ) {
+ throw new TypeError(
+ 'Invalid worker choice strategy options: choice retries must be an integer'
+ )
+ }
+ if (
+ workerChoiceStrategyOptions.choiceRetries != null &&
+ workerChoiceStrategyOptions.choiceRetries <= 0
+ ) {
+ throw new RangeError(
+ `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
+ )
+ }
if (
workerChoiceStrategyOptions.weights != null &&
Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
*/
private checkMessageWorkerId (message: MessageValue<Response>): void {
- if (
+ if (message.workerId == null) {
+ throw new Error('Worker message received without worker id')
+ } else if (
message.workerId != null &&
this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
) {
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
): void {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
- this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+ this.opts.workerChoiceStrategyOptions = {
+ ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+ ...workerChoiceStrategyOptions
+ }
this.workerChoiceStrategyContext.setOptions(
this.opts.workerChoiceStrategyOptions
)
/** @inheritDoc */
public listTaskFunctions (): string[] {
- if (this.taskFunctions != null) {
- return this.taskFunctions
- } else {
- return []
+ for (const workerNode of this.workerNodes) {
+ if (
+ Array.isArray(workerNode.info.taskFunctions) &&
+ workerNode.info.taskFunctions.length > 0
+ ) {
+ return workerNode.info.taskFunctions
+ }
}
+ return []
}
/** @inheritDoc */
) {
reject(new TypeError('name argument must not be an empty string'))
}
+ if (transferList != null && !Array.isArray(transferList)) {
+ reject(new TypeError('transferList argument must be an array'))
+ }
+ const timestamp = performance.now()
+ const workerNodeKey = this.chooseWorkerNode()
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
if (
name != null &&
- this.taskFunctions != null &&
- !this.taskFunctions.includes(name)
+ Array.isArray(workerInfo.taskFunctions) &&
+ !workerInfo.taskFunctions.includes(name)
) {
reject(
new Error(`Task function '${name}' is not registered in the pool`)
)
}
- if (transferList != null && !Array.isArray(transferList)) {
- reject(new TypeError('transferList argument must be an array'))
- }
- const timestamp = performance.now()
- const workerNodeKey = this.chooseWorkerNode()
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
transferList,
timestamp,
- workerId: this.getWorkerInfo(workerNodeKey).id as number,
+ workerId: workerInfo.id as number,
taskId: randomUUID()
}
this.promiseResponseMap.set(task.taskId as string, {
await this.destroyWorkerNode(workerNodeKey)
})
)
+ this.emitter?.emit(PoolEvents.destroy)
}
protected async sendKillMessageToWorker (
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
this.updateWaitTimeWorkerUsage(workerUsage, task)
- const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
- task.name as string
- ) as WorkerUsage
- ++taskWorkerUsage.tasks.executing
- this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+ if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
+ const taskWorkerUsage = this.workerNodes[
+ workerNodeKey
+ ].getTaskWorkerUsage(task.name as string) as WorkerUsage
+ ++taskWorkerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+ }
}
/**
this.updateTaskStatisticsWorkerUsage(workerUsage, message)
this.updateRunTimeWorkerUsage(workerUsage, message)
this.updateEluWorkerUsage(workerUsage, message)
- const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
- message.taskPerformance?.name ?? DEFAULT_TASK_NAME
- ) as WorkerUsage
- this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
- this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
- this.updateEluWorkerUsage(taskWorkerUsage, message)
+ if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
+ const taskWorkerUsage = this.workerNodes[
+ workerNodeKey
+ ].getTaskWorkerUsage(
+ message.taskPerformance?.name ?? DEFAULT_TASK_NAME
+ ) as WorkerUsage
+ this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
+ this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
+ this.updateEluWorkerUsage(taskWorkerUsage, message)
+ }
+ }
+
+ private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ return (
+ Array.isArray(workerInfo.taskFunctions) &&
+ workerInfo.taskFunctions.length > 1
+ )
}
private updateTaskStatisticsWorkerUsage (
protected createAndSetupWorkerNode (): number {
const worker = this.createWorker()
+ worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
worker.on('error', (error) => {
this.redistributeQueuedTasks(workerNodeKey)
}
})
- worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
worker.once('exit', () => {
this.removeWorkerNode(worker)
protected workerListener (): (message: MessageValue<Response>) => void {
return (message) => {
this.checkMessageWorkerId(message)
- if (message.ready != null) {
+ if (message.ready != null && message.taskFunctions != null) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
} else if (message.taskId != null) {
this.handleTaskExecutionResponse(message)
} else if (message.taskFunctions != null) {
// Task functions message received from worker
- this.taskFunctions = message.taskFunctions
+ this.getWorkerInfo(
+ this.getWorkerNodeKeyByWorkerId(message.workerId)
+ ).taskFunctions = message.taskFunctions
}
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
- this.getWorkerInfo(
+ if (message.ready === false) {
+ throw new Error(`Worker ${message.workerId} failed to initialize`)
+ }
+ const workerInfo = this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(message.workerId)
- ).ready = message.ready as boolean
+ )
+ workerInfo.ready = message.ready as boolean
+ workerInfo.taskFunctions = message.taskFunctions
if (this.emitter != null && this.ready) {
this.emitter.emit(PoolEvents.ready, this.info)
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
- const promiseResponse = this.promiseResponseMap.get(
- message.taskId as string
- )
+ const { taskId, taskError, data } = message
+ const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
- if (message.taskError != null) {
- this.emitter?.emit(PoolEvents.taskError, message.taskError)
- promiseResponse.reject(message.taskError.message)
+ if (taskError != null) {
+ this.emitter?.emit(PoolEvents.taskError, taskError)
+ promiseResponse.reject(taskError.message)
} else {
- promiseResponse.resolve(message.data as Response)
+ promiseResponse.resolve(data as Response)
}
const workerNodeKey = promiseResponse.workerNodeKey
this.afterTaskExecutionHook(workerNodeKey, message)
- this.promiseResponseMap.delete(message.taskId as string)
+ this.promiseResponseMap.delete(taskId as string)
if (
this.opts.enableTasksQueue === true &&
this.tasksQueueSize(workerNodeKey) > 0 &&
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
*/
private addWorkerNode (worker: Worker): number {
- const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+ const workerNode = new WorkerNode<Worker, Data>(
+ worker,
+ this.worker,
+ this.maxSize
+ )
// Flag the worker node as ready at pool startup.
if (this.starting) {
workerNode.info.ready = true
this.workerNodes.push(workerNode)
const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
if (workerNodeKey === -1) {
- throw new Error('Worker node not found')
+ throw new Error('Worker node added not found')
}
return workerNodeKey
}
}
}
+ /** @inheritDoc */
+ public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
+ if (
+ this.opts.enableTasksQueue === true &&
+ this.workerNodes[workerNodeKey].hasBackPressure()
+ ) {
+ return true
+ }
+ return false
+ }
+
/**
* Executes the given task on the worker given its worker node key.
*
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
- return this.workerNodes[workerNodeKey].enqueueTask(task)
+ const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
+ if (this.hasWorkerNodeBackPressure(workerNodeKey)) {
+ this.emitter?.emit(PoolEvents.backPressure, {
+ workerId: this.getWorkerInfo(workerNodeKey).id,
+ ...this.info
+ })
+ }
+ return tasksQueueSize
}
private dequeueTask (workerNodeKey: number): Task<Data> | undefined {