X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=768114e4cafcce5249f04e448240f0e9eb7bc5c6;hb=e2b31e32498626103ef3c737bdffb285087b13e6;hp=25c0eba870ee62560e7d47c69cad7909fa4ccc4e;hpb=b558f6b5a5625753de41024325e40e1cbd03eda1;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 25c0eba8..768114e4 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -646,14 +646,15 @@ export abstract class AbstractPool< /** @inheritDoc */ public listTaskFunctions (): string[] { - if ( - Array.isArray(this.getWorkerInfo(0).taskFunctions) && - (this.getWorkerInfo(0).taskFunctions as string[]).length > 0 - ) { - return this.getWorkerInfo(0).taskFunctions as string[] - } else { - return [] + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctions) && + workerNode.info.taskFunctions.length > 0 + ) { + return workerNode.info.taskFunctions + } } + return [] } /** @inheritDoc */ @@ -678,12 +679,11 @@ export abstract class AbstractPool< } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() + const workerInfo = this.getWorkerInfo(workerNodeKey) if ( name != null && - Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) && - !(this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).includes( - name - ) + Array.isArray(workerInfo.taskFunctions) && + !workerInfo.taskFunctions.includes(name) ) { reject( new Error(`Task function '${name}' is not registered in the pool`) @@ -695,7 +695,7 @@ export abstract class AbstractPool< data: data ?? ({} as Data), transferList, timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, + workerId: workerInfo.id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -816,9 +816,10 @@ export abstract class AbstractPool< } private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean { + const workerInfo = this.getWorkerInfo(workerNodeKey) return ( - Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) && - (this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).length > 1 + Array.isArray(workerInfo.taskFunctions) && + workerInfo.taskFunctions.length > 1 ) } @@ -1125,7 +1126,7 @@ export abstract class AbstractPool< protected workerListener (): (message: MessageValue) => void { return (message) => { this.checkMessageWorkerId(message) - if (message.ready != null) { + if (message.ready != null && message.taskFunctions != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.taskId != null) { @@ -1144,28 +1145,29 @@ export abstract class AbstractPool< if (message.ready === false) { throw new Error(`Worker ${message.workerId} failed to initialize`) } - this.getWorkerInfo( + const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ).ready = message.ready as boolean + ) + workerInfo.ready = message.ready as boolean + workerInfo.taskFunctions = message.taskFunctions if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) } } private handleTaskExecutionResponse (message: MessageValue): void { - const promiseResponse = this.promiseResponseMap.get( - message.taskId as string - ) + const { taskId, taskError, data } = message + const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - if (message.taskError != null) { - this.emitter?.emit(PoolEvents.taskError, message.taskError) - promiseResponse.reject(message.taskError.message) + if (taskError != null) { + this.emitter?.emit(PoolEvents.taskError, taskError) + promiseResponse.reject(taskError.message) } else { - promiseResponse.resolve(message.data as Response) + promiseResponse.resolve(data as Response) } const workerNodeKey = promiseResponse.workerNodeKey this.afterTaskExecutionHook(workerNodeKey, message) - this.promiseResponseMap.delete(message.taskId as string) + this.promiseResponseMap.delete(taskId as string) if ( this.opts.enableTasksQueue === true && this.tasksQueueSize(workerNodeKey) > 0 && @@ -1210,7 +1212,11 @@ export abstract class AbstractPool< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ private addWorkerNode (worker: Worker): number { - const workerNode = new WorkerNode(worker, this.worker) + const workerNode = new WorkerNode( + worker, + this.worker, + this.maxSize + ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true @@ -1218,7 +1224,7 @@ export abstract class AbstractPool< this.workerNodes.push(workerNode) const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) if (workerNodeKey === -1) { - throw new Error('Worker node not found') + throw new Error('Worker node added not found') } return workerNodeKey } @@ -1236,6 +1242,17 @@ export abstract class AbstractPool< } } + /** @inheritDoc */ + public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { + if ( + this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].hasBackPressure() + ) { + return true + } + return false + } + /** * Executes the given task on the worker given its worker node key. * @@ -1248,7 +1265,14 @@ export abstract class AbstractPool< } private enqueueTask (workerNodeKey: number, task: Task): number { - return this.workerNodes[workerNodeKey].enqueueTask(task) + const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) + if (this.hasWorkerNodeBackPressure(workerNodeKey)) { + this.emitter?.emit(PoolEvents.backPressure, { + workerId: this.getWorkerInfo(workerNodeKey).id, + ...this.info + }) + } + return tasksQueueSize } private dequeueTask (workerNodeKey: number): Task | undefined {