X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=30b4d01f9846cd2c2327d5a52e4a6be594e8d62c;hb=23a6c28d3cc6edb778653a4d9d9a4d36f9a961c3;hp=086d7ee82550bc40c46781b83616c1639b32155b;hpb=aa9eede876376ffbb6d8585192f3865849252f5c;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 086d7ee8..30b4d01f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -114,6 +114,7 @@ export abstract class AbstractPool< this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) + this.dequeueTask = this.dequeueTask.bind(this) this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) if (this.opts.enableEvents === true) { @@ -171,7 +172,15 @@ export abstract class AbstractPool< protected checkDynamicPoolSize (min: number, max: number): void { if (this.type === PoolTypes.dynamic) { - if (min > max) { + if (max == null) { + throw new Error( + 'Cannot instantiate a dynamic pool without specifying the maximum pool size' + ) + } else if (!Number.isSafeInteger(max)) { + throw new TypeError( + 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' + ) + } else if (min > max) { throw new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' ) @@ -325,16 +334,20 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.executing, 0 ), - queuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + workerNode.usage.tasks.queued, - 0 - ), - maxQueuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), - 0 - ), + ...(this.opts.enableTasksQueue === true && { + queuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.queued, + 0 + ) + }), + ...(this.opts.enableTasksQueue === true && { + maxQueuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), + 0 + ) + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -492,7 +505,7 @@ export abstract class AbstractPool< private checkMessageWorkerId (message: MessageValue): void { if ( message.workerId != null && - this.getWorkerNodeKeyByWorkerId(message.workerId) == null + this.getWorkerNodeKeyByWorkerId(message.workerId) === -1 ) { throw new Error( `Worker message received from unknown worker '${message.workerId}'` @@ -506,7 +519,7 @@ export abstract class AbstractPool< * @param worker - The worker. * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. */ - private getWorkerNodeKey (worker: Worker): number { + private getWorkerNodeKeyByWorker (worker: Worker): number { return this.workerNodes.findIndex( workerNode => workerNode.worker === worker ) @@ -516,14 +529,12 @@ export abstract class AbstractPool< * Gets the worker node key given its worker id. * * @param workerId - The worker id. - * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise. + * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ - private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined { - for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { - if (workerNode.info.id === workerId) { - return workerNodeKey - } - } + private getWorkerNodeKeyByWorkerId (workerId: number): number { + return this.workerNodes.findIndex( + workerNode => workerNode.info.id === workerId + ) } /** @inheritDoc */ @@ -610,9 +621,10 @@ export abstract class AbstractPool< */ protected internalBusy (): boolean { return ( - this.workerNodes.findIndex(workerNode => { - return workerNode.usage.tasks.executing === 0 - }) === -1 + this.workerNodes.findIndex( + workerNode => + workerNode.info.ready && workerNode.usage.tasks.executing === 0 + ) === -1 ) } @@ -635,15 +647,14 @@ export abstract class AbstractPool< workerNodeKey }) if ( - this.opts.enableTasksQueue === true && - (this.busy || - this.workerNodes[workerNodeKey].usage.tasks.executing >= - ((this.opts.tasksQueueOptions as TasksQueueOptions) - .concurrency as number)) + this.opts.enableTasksQueue === false || + (this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number)) ) { - this.enqueueTask(workerNodeKey, task) - } else { this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) } this.checkAndEmitEvents() }) @@ -652,16 +663,8 @@ export abstract class AbstractPool< /** @inheritDoc */ public async destroy (): Promise { await Promise.all( - this.workerNodes.map(async (workerNode, workerNodeKey) => { - this.flushTasksQueue(workerNodeKey) - // FIXME: wait for tasks to be finished - const workerExitPromise = new Promise(resolve => { - workerNode.worker.on('exit', () => { - resolve() - }) - }) + this.workerNodes.map(async (_, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) - await workerExitPromise }) ) } @@ -671,9 +674,7 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorkerNode ( - workerNodeKey: number - ): void | Promise + protected abstract destroyWorkerNode (workerNodeKey: number): Promise /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -862,7 +863,7 @@ export abstract class AbstractPool< worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) worker.on('error', error => { - const workerNodeKey = this.getWorkerNodeKey(worker) + const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.ready = false this.workerNodes[workerNodeKey].closeChannel() @@ -901,8 +902,9 @@ export abstract class AbstractPool< this.registerWorkerMessageListener(workerNodeKey, message => { const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId - ) as number + ) const workerUsage = this.workerNodes[localWorkerNodeKey].usage + // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || (message.kill != null && @@ -912,19 +914,18 @@ export abstract class AbstractPool< workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void (this.destroyWorkerNode(localWorkerNodeKey) as Promise) + this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION) } }) const workerInfo = this.getWorkerInfo(workerNodeKey) - workerInfo.dynamic = true - if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { - workerInfo.ready = true - } this.sendToWorker(workerNodeKey, { checkActive: true, workerId: workerInfo.id as number }) + workerInfo.dynamic = true + if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { + workerInfo.ready = true + } return workerNodeKey } @@ -985,6 +986,7 @@ export abstract class AbstractPool< while (this.tasksQueueSize(workerNodeKey) > 0) { let targetWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity + let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { const workerInfo = this.getWorkerInfo(workerNodeId) if ( @@ -992,6 +994,12 @@ export abstract class AbstractPool< workerInfo.ready && workerNode.usage.tasks.queued === 0 ) { + if ( + this.workerNodes[workerNodeId].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + executeTask = true + } targetWorkerNodeKey = workerNodeId break } @@ -1004,10 +1012,17 @@ export abstract class AbstractPool< targetWorkerNodeKey = workerNodeId } } - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + if (executeTask) { + this.executeTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } else { + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } } } @@ -1020,10 +1035,10 @@ export abstract class AbstractPool< return message => { this.checkMessageWorkerId(message) if (message.ready != null) { - // Worker ready response received + // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.id != null) { - // Task execution response received + // Task execution response received from worker this.handleTaskExecutionResponse(message) } } @@ -1031,7 +1046,7 @@ export abstract class AbstractPool< private handleWorkerReadyResponse (message: MessageValue): void { this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(message.workerId) as number + this.getWorkerNodeKeyByWorkerId(message.workerId) ).ready = message.ready as boolean if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) @@ -1052,7 +1067,9 @@ export abstract class AbstractPool< this.promiseResponseMap.delete(message.id as string) if ( this.opts.enableTasksQueue === true && - this.tasksQueueSize(workerNodeKey) > 0 + this.tasksQueueSize(workerNodeKey) > 0 && + this.workerNodes[workerNodeKey].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) ) { this.executeTask( workerNodeKey, @@ -1098,7 +1115,7 @@ export abstract class AbstractPool< workerNode.info.ready = true } this.workerNodes.push(workerNode) - const workerNodeKey = this.getWorkerNodeKey(worker) + const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) if (workerNodeKey === -1) { throw new Error('Worker node not found') } @@ -1111,7 +1128,7 @@ export abstract class AbstractPool< * @param worker - The worker. */ private removeWorkerNode (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKey(worker) + const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext.remove(workerNodeKey) @@ -1141,7 +1158,7 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - private flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { this.executeTask( workerNodeKey,