/** @inheritDoc */
public listTaskFunctions (): string[] {
- if (
- Array.isArray(this.getWorkerInfo(0).taskFunctions) &&
- (this.getWorkerInfo(0).taskFunctions as string[]).length > 0
- ) {
- return this.getWorkerInfo(0).taskFunctions as string[]
- } else {
- return []
+ for (const workerNode of this.workerNodes) {
+ if (
+ Array.isArray(workerNode.info.taskFunctions) &&
+ workerNode.info.taskFunctions.length > 0
+ ) {
+ return workerNode.info.taskFunctions
+ }
}
+ return []
}
/** @inheritDoc */
}
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode()
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
if (
name != null &&
- Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
- !(this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).includes(
- name
- )
+ Array.isArray(workerInfo.taskFunctions) &&
+ !workerInfo.taskFunctions.includes(name)
) {
reject(
new Error(`Task function '${name}' is not registered in the pool`)
data: data ?? ({} as Data),
transferList,
timestamp,
- workerId: this.getWorkerInfo(workerNodeKey).id as number,
+ workerId: workerInfo.id as number,
taskId: randomUUID()
}
this.promiseResponseMap.set(task.taskId as string, {
}
private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
- Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
- (this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).length > 1
+ Array.isArray(workerInfo.taskFunctions) &&
+ workerInfo.taskFunctions.length > 1
)
}
protected workerListener (): (message: MessageValue<Response>) => void {
return (message) => {
this.checkMessageWorkerId(message)
- if (message.ready != null) {
+ if (message.ready != null && message.taskFunctions != null) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
} else if (message.taskId != null) {
if (message.ready === false) {
throw new Error(`Worker ${message.workerId} failed to initialize`)
}
- this.getWorkerInfo(
+ const workerInfo = this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(message.workerId)
- ).ready = message.ready as boolean
+ )
+ workerInfo.ready = message.ready as boolean
+ workerInfo.taskFunctions = message.taskFunctions
if (this.emitter != null && this.ready) {
this.emitter.emit(PoolEvents.ready, this.info)
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
- const promiseResponse = this.promiseResponseMap.get(
- message.taskId as string
- )
+ const { taskId, taskError, data } = message
+ const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
- if (message.taskError != null) {
- this.emitter?.emit(PoolEvents.taskError, message.taskError)
- promiseResponse.reject(message.taskError.message)
+ if (taskError != null) {
+ this.emitter?.emit(PoolEvents.taskError, taskError)
+ promiseResponse.reject(taskError.message)
} else {
- promiseResponse.resolve(message.data as Response)
+ promiseResponse.resolve(data as Response)
}
const workerNodeKey = promiseResponse.workerNodeKey
this.afterTaskExecutionHook(workerNodeKey, message)
- this.promiseResponseMap.delete(message.taskId as string)
+ this.promiseResponseMap.delete(taskId as string)
if (
this.opts.enableTasksQueue === true &&
this.tasksQueueSize(workerNodeKey) > 0 &&
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
*/
private addWorkerNode (worker: Worker): number {
- const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+ const workerNode = new WorkerNode<Worker, Data>(
+ worker,
+ this.worker,
+ this.maxSize
+ )
// Flag the worker node as ready at pool startup.
if (this.starting) {
workerNode.info.ready = true
this.workerNodes.push(workerNode)
const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
if (workerNodeKey === -1) {
- throw new Error('Worker node not found')
+ throw new Error('Worker node added not found')
}
return workerNodeKey
}
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+ if (
+ this.opts.enableTasksQueue === true &&
+ this.workerNodes[workerNodeKey].hasBackPressure()
+ ) {
+ this.emitter?.emit(PoolEvents.backPressure, {
+ workerId: this.getWorkerInfo(workerNodeKey).id,
+ ...this.info
+ })
+ }
return this.workerNodes[workerNodeKey].enqueueTask(task)
}