X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=6457b1554ddbf5bfeb0156a0d40e9bb108dd02dc;hb=3725d989b40800f70d495107b363c7efa2c96fc3;hp=aae083953c526c16f7bcaa92bddd3bf042423d3e;hpb=f201a0cd2575be684ab0f291c8590eec538038e0;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index aae08395..6457b155 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -418,14 +418,14 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity + workerNode => workerNode.usage.runTime?.minimum ?? Infinity ) ) ), maximum: round( max( ...this.workerNodes.map( - (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity + workerNode => workerNode.usage.runTime?.maximum ?? -Infinity ) ) ), @@ -461,14 +461,14 @@ export abstract class AbstractPool< minimum: round( min( ...this.workerNodes.map( - (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity + workerNode => workerNode.usage.waitTime?.minimum ?? Infinity ) ) ), maximum: round( max( ...this.workerNodes.map( - (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity + workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity ) ) ), @@ -590,7 +590,7 @@ export abstract class AbstractPool< */ private getWorkerNodeKeyByWorker (worker: Worker): number { return this.workerNodes.findIndex( - (workerNode) => workerNode.worker === worker + workerNode => workerNode.worker === worker ) } @@ -602,7 +602,7 @@ export abstract class AbstractPool< */ private getWorkerNodeKeyByWorkerId (workerId: number): number { return this.workerNodes.findIndex( - (workerNode) => workerNode.info.id === workerId + workerNode => workerNode.info.id === workerId ) } @@ -706,7 +706,7 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { return ( this.workerNodes.findIndex( - (workerNode) => + workerNode => workerNode.info.ready && workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) @@ -715,7 +715,7 @@ export abstract class AbstractPool< } else { return ( this.workerNodes.findIndex( - (workerNode) => + workerNode => workerNode.info.ready && workerNode.usage.tasks.executing === 0 ) === -1 ) @@ -772,14 +772,13 @@ export abstract class AbstractPool< } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() - const workerInfo = this.getWorkerInfo(workerNodeKey) const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), transferList, timestamp, - workerId: workerInfo.id as number, + workerId: this.getWorkerInfo(workerNodeKey).id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -815,7 +814,7 @@ export abstract class AbstractPool< workerId: number ): Promise { await new Promise((resolve, reject) => { - this.registerWorkerMessageListener(workerNodeKey, (message) => { + this.registerWorkerMessageListener(workerNodeKey, message => { if (message.kill === 'success') { resolve() } else if (message.kill === 'failure') { @@ -1064,7 +1063,7 @@ export abstract class AbstractPool< worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', (error) => { + worker.on('error', error => { const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.ready = false @@ -1072,8 +1071,8 @@ export abstract class AbstractPool< this.emitter?.emit(PoolEvents.error, error) if ( this.opts.restartWorkerOnError === true && - !this.starting && - this.started + this.started && + !this.starting ) { if (workerInfo.dynamic) { this.createAndSetupDynamicWorkerNode() @@ -1104,7 +1103,7 @@ export abstract class AbstractPool< */ protected createAndSetupDynamicWorkerNode (): number { const workerNodeKey = this.createAndSetupWorkerNode() - this.registerWorkerMessageListener(workerNodeKey, (message) => { + this.registerWorkerMessageListener(workerNodeKey, message => { const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) @@ -1119,7 +1118,7 @@ export abstract class AbstractPool< workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - this.destroyWorkerNode(localWorkerNodeKey).catch((error) => { + this.destroyWorkerNode(localWorkerNodeKey).catch(error => { this.emitter?.emit(PoolEvents.error, error) }) } @@ -1203,27 +1202,23 @@ export abstract class AbstractPool< while (this.tasksQueueSize(workerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - if (!workerNode.info.ready) { - return minWorkerNodeKey - } - return workerNode.usage.tasks.queued < - workerNodes[minWorkerNodeKey].usage.tasks.queued + return workerNode.info.ready && + workerNode.usage.tasks.queued < + workerNodes[minWorkerNodeKey].usage.tasks.queued ? workerNodeKey : minWorkerNodeKey }, 0 ) - if (destinationWorkerNodeKey != null) { - const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] - const task = { - ...(this.dequeueTask(workerNodeKey) as Task), - workerId: destinationWorkerNode.info.id as number - } - if (this.shallExecuteTask(destinationWorkerNodeKey)) { - this.executeTask(destinationWorkerNodeKey, task) - } else { - this.enqueueTask(destinationWorkerNodeKey, task) - } + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] + const task = { + ...(this.dequeueTask(workerNodeKey) as Task), + workerId: destinationWorkerNode.info.id as number + } + if (this.shallExecuteTask(destinationWorkerNodeKey)) { + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) } } } @@ -1257,7 +1252,7 @@ export abstract class AbstractPool< workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued ) const sourceWorkerNode = workerNodes.find( - (workerNode) => + workerNode => workerNode.info.ready && workerNode.info.id !== workerId && workerNode.usage.tasks.queued > 0 @@ -1323,7 +1318,7 @@ export abstract class AbstractPool< * @returns The listener function to execute when a message is received from a worker. */ protected workerListener (): (message: MessageValue) => void { - return (message) => { + return message => { this.checkMessageWorkerId(message) if (message.ready != null && message.taskFunctions != null) { // Worker ready response received from worker @@ -1461,7 +1456,7 @@ export abstract class AbstractPool< return ( this.opts.enableTasksQueue === true && this.workerNodes.findIndex( - (workerNode) => !workerNode.hasBackPressure() + workerNode => !workerNode.hasBackPressure() ) === -1 ) }