X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=eb2c2f7818a687ac8f8016eb27489895cadfaaee;hb=671d515455c745dc74f4c385fe23683975bfc3df;hp=e7c27cdae8a881392aa3cb4c51e8ab0386365f99;hpb=5441aea6894a5031667db6447634fa5d97f69e36;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index e7c27cda..eb2c2f78 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -646,14 +646,15 @@ export abstract class AbstractPool< /** @inheritDoc */ public listTaskFunctions (): string[] { - if ( - Array.isArray(this.getWorkerInfo(0).taskFunctions) && - (this.getWorkerInfo(0).taskFunctions as string[]).length > 0 - ) { - return this.getWorkerInfo(0).taskFunctions as string[] - } else { - return [] + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctions) && + workerNode.info.taskFunctions.length > 0 + ) { + return workerNode.info.taskFunctions + } } + return [] } /** @inheritDoc */ @@ -678,12 +679,11 @@ export abstract class AbstractPool< } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() + const workerInfo = this.getWorkerInfo(workerNodeKey) if ( name != null && - Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) && - !(this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).includes( - name - ) + Array.isArray(workerInfo.taskFunctions) && + !workerInfo.taskFunctions.includes(name) ) { reject( new Error(`Task function '${name}' is not registered in the pool`) @@ -695,7 +695,7 @@ export abstract class AbstractPool< data: data ?? ({} as Data), transferList, timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, + workerId: workerInfo.id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -816,9 +816,10 @@ export abstract class AbstractPool< } private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean { + const workerInfo = this.getWorkerInfo(workerNodeKey) return ( - Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) && - (this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).length > 1 + Array.isArray(workerInfo.taskFunctions) && + workerInfo.taskFunctions.length > 1 ) } @@ -1125,7 +1126,7 @@ export abstract class AbstractPool< protected workerListener (): (message: MessageValue) => void { return (message) => { this.checkMessageWorkerId(message) - if (message.ready != null) { + if (message.ready != null && message.taskFunctions != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.taskId != null) { @@ -1144,9 +1145,11 @@ export abstract class AbstractPool< if (message.ready === false) { throw new Error(`Worker ${message.workerId} failed to initialize`) } - this.getWorkerInfo( + const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ).ready = message.ready as boolean + ) + workerInfo.ready = message.ready as boolean + workerInfo.taskFunctions = message.taskFunctions if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) } @@ -1209,7 +1212,11 @@ export abstract class AbstractPool< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ private addWorkerNode (worker: Worker): number { - const workerNode = new WorkerNode(worker, this.worker) + const workerNode = new WorkerNode( + worker, + this.worker, + this.maxSize + ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true @@ -1217,7 +1224,7 @@ export abstract class AbstractPool< this.workerNodes.push(workerNode) const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) if (workerNodeKey === -1) { - throw new Error('Worker node not found') + throw new Error('Worker node added not found') } return workerNodeKey } @@ -1247,6 +1254,15 @@ export abstract class AbstractPool< } private enqueueTask (workerNodeKey: number, task: Task): number { + if ( + this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].hasBackPressure() + ) { + this.emitter?.emit(PoolEvents.backPressure, { + workerId: this.getWorkerInfo(workerNodeKey).id, + ...this.info + }) + } return this.workerNodes[workerNodeKey].enqueueTask(task) }