X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=5eeb57911903587c0760dac8fee11e30407cb86a;hb=5b95eb9bcafda56ce57003da834cf4e153bb0509;hp=65ace6d907a20ca5a88d820d638c1aaa1a6f8bf1;hpb=cdaecaee1c7fa5c412daf29f2db41470506793ac;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 65ace6d9..5eeb5791 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,17 +1,18 @@ +import { AsyncResource } from 'node:async_hooks' import { randomUUID } from 'node:crypto' +import { EventEmitterAsyncResource } from 'node:events' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' -import { EventEmitterAsyncResource } from 'node:events' -import { AsyncResource } from 'node:async_hooks' + import type { MessageValue, PromiseResponseWrapper, Task } from '../utility-types.js' import { + average, DEFAULT_TASK_NAME, EMPTY_FUNCTION, - average, exponentialDelay, isKillBehavior, isPlainObject, @@ -21,8 +22,8 @@ import { round, sleep } from '../utils.js' -import { KillBehaviors } from '../worker/worker-options.js' import type { TaskFunction } from '../worker/task-functions.js' +import { KillBehaviors } from '../worker/worker-options.js' import { type IPool, PoolEvents, @@ -32,13 +33,6 @@ import { PoolTypes, type TasksQueueOptions } from './pool.js' -import type { - IWorker, - IWorkerNode, - WorkerInfo, - WorkerNodeEventDetail, - WorkerType -} from './worker.js' import { Measurements, WorkerChoiceStrategies, @@ -46,8 +40,6 @@ import { type WorkerChoiceStrategyOptions } from './selection-strategies/selection-strategies-types.js' import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' -import { version } from './version.js' -import { WorkerNode } from './worker-node.js' import { checkFilePath, checkValidTasksQueueOptions, @@ -59,6 +51,15 @@ import { updateWaitTimeWorkerUsage, waitWorkerNodeEvents } from './utils.js' +import { version } from './version.js' +import type { + IWorker, + IWorkerNode, + WorkerInfo, + WorkerNodeEventDetail, + WorkerType +} from './worker.js' +import { WorkerNode } from './worker-node.js' /** * Base class that implements some shared logic for all poolifier pools. @@ -116,6 +117,10 @@ export abstract class AbstractPool< * Whether the pool is destroying or not. */ private destroying: boolean + /** + * Whether the minimum number of workers is starting or not. + */ + private startingMinimumNumberOfWorkers: boolean /** * Whether the pool ready event has been emitted or not. */ @@ -174,6 +179,7 @@ export abstract class AbstractPool< this.starting = false this.destroying = false this.readyEventEmitted = false + this.startingMinimumNumberOfWorkers = false if (this.opts.startWorkers === true) { this.start() } @@ -283,10 +289,11 @@ export abstract class AbstractPool< ready: this.ready, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion strategy: this.opts.workerChoiceStrategy!, + strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0, minSize: this.minimumNumberOfWorkers, maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - ?.runTime.aggregate === true && + .runTime.aggregate === true && this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.aggregate && { utilization: round(this.utilization) @@ -351,7 +358,7 @@ export abstract class AbstractPool< 0 ), ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - ?.runTime.aggregate === true && { + .runTime.aggregate === true && { runTime: { minimum: round( min( @@ -394,7 +401,7 @@ export abstract class AbstractPool< } }), ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - ?.waitTime.aggregate === true && { + .waitTime.aggregate === true && { waitTime: { minimum: round( min( @@ -443,6 +450,9 @@ export abstract class AbstractPool< * The pool readiness boolean status. */ private get ready (): boolean { + if (this.empty) { + return false + } return ( this.workerNodes.reduce( (accumulator, workerNode) => @@ -454,6 +464,13 @@ export abstract class AbstractPool< ) } + /** + * The pool emptiness boolean status. + */ + protected get empty (): boolean { + return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0 + } + /** * The approximate pool utilization. * @@ -937,6 +954,23 @@ export abstract class AbstractPool< }) } + /** + * Starts the minimum number of workers. + */ + private startMinimumNumberOfWorkers (): void { + this.startingMinimumNumberOfWorkers = true + while ( + this.workerNodes.reduce( + (accumulator, workerNode) => + !workerNode.info.dynamic ? accumulator + 1 : accumulator, + 0 + ) < this.minimumNumberOfWorkers + ) { + this.createAndSetupWorkerNode() + } + this.startingMinimumNumberOfWorkers = false + } + /** @inheritdoc */ public start (): void { if (this.started) { @@ -949,15 +983,7 @@ export abstract class AbstractPool< throw new Error('Cannot start a destroying pool') } this.starting = true - while ( - this.workerNodes.reduce( - (accumulator, workerNode) => - !workerNode.info.dynamic ? accumulator + 1 : accumulator, - 0 - ) < this.minimumNumberOfWorkers - ) { - this.createAndSetupWorkerNode() - } + this.startMinimumNumberOfWorkers() this.starting = false this.started = true } @@ -981,7 +1007,6 @@ export abstract class AbstractPool< ) this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() - this.emitter?.removeAllListeners() this.readyEventEmitted = false this.destroying = false this.started = false @@ -1223,7 +1248,7 @@ export abstract class AbstractPool< 'error', this.opts.errorHandler ?? EMPTY_FUNCTION ) - workerNode.registerWorkerEventHandler('error', (error: Error) => { + workerNode.registerOnceWorkerEventHandler('error', (error: Error) => { workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) if ( @@ -1233,8 +1258,8 @@ export abstract class AbstractPool< ) { if (workerNode.info.dynamic) { this.createAndSetupDynamicWorkerNode() - } else { - this.createAndSetupWorkerNode() + } else if (!this.startingMinimumNumberOfWorkers) { + this.startMinimumNumberOfWorkers() } } if ( @@ -1244,7 +1269,8 @@ export abstract class AbstractPool< ) { this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } - workerNode.terminate().catch(error => { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode?.terminate().catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) }) @@ -1254,6 +1280,13 @@ export abstract class AbstractPool< ) workerNode.registerOnceWorkerEventHandler('exit', () => { this.removeWorkerNode(workerNode) + if ( + this.started && + !this.startingMinimumNumberOfWorkers && + !this.destroying + ) { + this.startMinimumNumberOfWorkers() + } }) const workerNodeKey = this.addWorkerNode(workerNode) this.afterWorkerNodeSetup(workerNodeKey) @@ -1272,7 +1305,7 @@ export abstract class AbstractPool< const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) - const workerUsage = this.workerNodes[localWorkerNodeKey].usage + const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || @@ -1285,7 +1318,7 @@ export abstract class AbstractPool< ) { // Flag the worker node as not ready immediately this.flagWorkerNodeAsNotReady(localWorkerNodeKey) - this.destroyWorkerNode(localWorkerNodeKey).catch(error => { + this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) } @@ -1299,7 +1332,7 @@ export abstract class AbstractPool< taskFunctionOperation: 'add', taskFunctionName, taskFunction: taskFunction.toString() - }).catch(error => { + }).catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) } @@ -1406,9 +1439,9 @@ export abstract class AbstractPool< statistics: { runTime: this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() - ?.runTime.aggregate ?? false, + .runTime.aggregate ?? false, elu: - this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu + this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu .aggregate ?? false } }) @@ -1552,8 +1585,7 @@ export abstract class AbstractPool< ) { workerInfo.stealing = false // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - for (const taskName of this.workerNodes[workerNodeKey].info - .taskFunctionNames!) { + for (const taskName of workerInfo.taskFunctionNames!) { this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( workerNodeKey, taskName @@ -1602,7 +1634,9 @@ export abstract class AbstractPool< this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) return undefined }) - .catch(EMPTY_FUNCTION) + .catch((error: unknown) => { + this.emitter?.emit(PoolEvents.error, error) + }) } private readonly workerNodeStealTask = ( @@ -1708,6 +1742,13 @@ export abstract class AbstractPool< } } + private checkAndEmitReadyEvent (): void { + if (!this.readyEventEmitted && this.ready) { + this.emitter?.emit(PoolEvents.ready, this.info) + this.readyEventEmitted = true + } + } + private handleWorkerReadyResponse (message: MessageValue): void { const { workerId, ready, taskFunctionNames } = message if (ready == null || !ready) { @@ -1717,10 +1758,7 @@ export abstract class AbstractPool< this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] workerNode.info.ready = ready workerNode.info.taskFunctionNames = taskFunctionNames - if (!this.readyEventEmitted && this.ready) { - this.emitter?.emit(PoolEvents.ready, this.info) - this.readyEventEmitted = true - } + this.checkAndEmitReadyEvent() } private handleTaskExecutionResponse (message: MessageValue): void { @@ -1750,7 +1788,12 @@ export abstract class AbstractPool< this.promiseResponseMap.delete(taskId!) // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerNode?.emit('taskFinished', taskId) - if (this.opts.enableTasksQueue === true && !this.destroying) { + if ( + this.opts.enableTasksQueue === true && + !this.destroying && + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode != null + ) { const workerNodeTasksUsage = workerNode.usage.tasks if ( this.tasksQueueSize(workerNodeKey) > 0 && @@ -1767,8 +1810,7 @@ export abstract class AbstractPool< workerNodeTasksUsage.sequentiallyStolen === 0 ) { workerNode.emit('idle', { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerId: workerId!, + workerId, workerNodeKey }) } @@ -1845,6 +1887,13 @@ export abstract class AbstractPool< return workerNodeKey } + private checkAndEmitEmptyEvent (): void { + if (this.empty) { + this.emitter?.emit(PoolEvents.empty, this.info) + this.readyEventEmitted = false + } + } + /** * Removes the worker node from the pool worker nodes. * @@ -1856,6 +1905,7 @@ export abstract class AbstractPool< this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext?.remove(workerNodeKey) } + this.checkAndEmitEmptyEvent() } protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {