* The start timestamp of the pool.
*/
private readonly startTimestamp
- /**
- * The task function names.
- */
- private taskFunctions!: string[]
/**
* Constructs a new poolifier pool.
/** @inheritDoc */
public listTaskFunctions (): string[] {
- if (this.taskFunctions != null) {
- return this.taskFunctions
+ if (
+ Array.isArray(this.getWorkerInfo(0).taskFunctions) &&
+ (this.getWorkerInfo(0).taskFunctions as string[]).length > 0
+ ) {
+ return this.getWorkerInfo(0).taskFunctions as string[]
} else {
return []
}
) {
reject(new TypeError('name argument must not be an empty string'))
}
+ if (transferList != null && !Array.isArray(transferList)) {
+ reject(new TypeError('transferList argument must be an array'))
+ }
+ const timestamp = performance.now()
+ const workerNodeKey = this.chooseWorkerNode()
if (
name != null &&
- this.taskFunctions != null &&
- !this.taskFunctions.includes(name)
+ Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
+ !(this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).includes(
+ name
+ )
) {
reject(
new Error(`Task function '${name}' is not registered in the pool`)
)
}
- if (transferList != null && !Array.isArray(transferList)) {
- reject(new TypeError('transferList argument must be an array'))
- }
- const timestamp = performance.now()
- const workerNodeKey = this.chooseWorkerNode()
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
this.updateWaitTimeWorkerUsage(workerUsage, task)
- const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
- task.name as string
- ) as WorkerUsage
- ++taskWorkerUsage.tasks.executing
- this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+ if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
+ const taskWorkerUsage = this.workerNodes[
+ workerNodeKey
+ ].getTaskWorkerUsage(task.name as string) as WorkerUsage
+ ++taskWorkerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+ }
}
/**
this.updateTaskStatisticsWorkerUsage(workerUsage, message)
this.updateRunTimeWorkerUsage(workerUsage, message)
this.updateEluWorkerUsage(workerUsage, message)
- const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
- message.taskPerformance?.name ?? DEFAULT_TASK_NAME
- ) as WorkerUsage
- this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
- this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
- this.updateEluWorkerUsage(taskWorkerUsage, message)
+ if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
+ const taskWorkerUsage = this.workerNodes[
+ workerNodeKey
+ ].getTaskWorkerUsage(
+ message.taskPerformance?.name ?? DEFAULT_TASK_NAME
+ ) as WorkerUsage
+ this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
+ this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
+ this.updateEluWorkerUsage(taskWorkerUsage, message)
+ }
+ }
+
+ private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
+ return (
+ Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
+ (this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).length > 1
+ )
}
private updateTaskStatisticsWorkerUsage (
this.handleTaskExecutionResponse(message)
} else if (message.taskFunctions != null) {
// Task functions message received from worker
- this.taskFunctions = message.taskFunctions
+ this.getWorkerInfo(
+ this.getWorkerNodeKeyByWorkerId(message.workerId)
+ ).taskFunctions = message.taskFunctions
}
}
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
- const promiseResponse = this.promiseResponseMap.get(
- message.taskId as string
- )
+ const { taskId, taskError, data } = message
+ const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
- if (message.taskError != null) {
- this.emitter?.emit(PoolEvents.taskError, message.taskError)
- promiseResponse.reject(message.taskError.message)
+ if (taskError != null) {
+ this.emitter?.emit(PoolEvents.taskError, taskError)
+ promiseResponse.reject(taskError.message)
} else {
- promiseResponse.resolve(message.data as Response)
+ promiseResponse.resolve(data as Response)
}
const workerNodeKey = promiseResponse.workerNodeKey
this.afterTaskExecutionHook(workerNodeKey, message)
- this.promiseResponseMap.delete(message.taskId as string)
+ this.promiseResponseMap.delete(taskId as string)
if (
this.opts.enableTasksQueue === true &&
this.tasksQueueSize(workerNodeKey) > 0 &&