+ * @param numberOfWorkers - Number of workers that this pool should manage.
+ * @param filePath - Path to the worker file.
+ * @param opts - Options for the pool.
+ */
+ public constructor (
+ protected readonly numberOfWorkers: number,
+ protected readonly filePath: string,
+ protected readonly opts: PoolOptions<Worker>
+ ) {
+ if (!this.isMain()) {
+ throw new Error('Cannot start a pool from a worker!')
+ }
+ this.checkNumberOfWorkers(this.numberOfWorkers)
+ this.checkFilePath(this.filePath)
+ this.checkPoolOptions(this.opts)
+
+ this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
+ this.executeTask = this.executeTask.bind(this)
+ this.enqueueTask = this.enqueueTask.bind(this)
+ this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
+
+ if (this.opts.enableEvents === true) {
+ this.emitter = new PoolEmitter()
+ }
+ this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
+ Worker,
+ Data,
+ Response
+ >(
+ this,
+ this.opts.workerChoiceStrategy,
+ this.opts.workerChoiceStrategyOptions
+ )
+
+ this.setupHook()
+
+ this.starting = true
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.numberOfWorkers
+ ) {
+ this.createAndSetupWorker()
+ }
+ this.starting = false
+
+ this.startTimestamp = performance.now()
+ }
+
+ private checkFilePath (filePath: string): void {
+ if (
+ filePath == null ||
+ typeof filePath !== 'string' ||
+ (typeof filePath === 'string' && filePath.trim().length === 0)
+ ) {
+ throw new Error('Please specify a file with a worker implementation')
+ }
+ if (!existsSync(filePath)) {
+ throw new Error(`Cannot find the worker file '${filePath}'`)
+ }
+ }
+
+ private checkNumberOfWorkers (numberOfWorkers: number): void {
+ if (numberOfWorkers == null) {
+ throw new Error(
+ 'Cannot instantiate a pool without specifying the number of workers'
+ )
+ } else if (!Number.isSafeInteger(numberOfWorkers)) {
+ throw new TypeError(
+ 'Cannot instantiate a pool with a non safe integer number of workers'
+ )
+ } else if (numberOfWorkers < 0) {
+ throw new RangeError(
+ 'Cannot instantiate a pool with a negative number of workers'
+ )
+ } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
+ throw new RangeError('Cannot instantiate a fixed pool with zero worker')
+ }
+ }
+
+ protected checkDynamicPoolSize (min: number, max: number): void {
+ if (this.type === PoolTypes.dynamic) {
+ if (min > max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
+ )
+ } else if (max === 0) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a pool size equal to zero'
+ )
+ } else if (min === max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
+ )
+ }
+ }
+ }
+
+ private checkPoolOptions (opts: PoolOptions<Worker>): void {
+ if (isPlainObject(opts)) {
+ this.opts.workerChoiceStrategy =
+ opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
+ this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
+ this.opts.workerChoiceStrategyOptions =
+ opts.workerChoiceStrategyOptions ??
+ DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ this.checkValidWorkerChoiceStrategyOptions(
+ this.opts.workerChoiceStrategyOptions
+ )
+ this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
+ this.opts.enableEvents = opts.enableEvents ?? true
+ this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
+ if (this.opts.enableTasksQueue) {
+ this.checkValidTasksQueueOptions(
+ opts.tasksQueueOptions as TasksQueueOptions
+ )
+ this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
+ opts.tasksQueueOptions as TasksQueueOptions
+ )
+ }
+ } else {
+ throw new TypeError('Invalid pool options: must be a plain object')
+ }
+ }
+
+ private checkValidWorkerChoiceStrategy (
+ workerChoiceStrategy: WorkerChoiceStrategy
+ ): void {
+ if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
+ throw new Error(
+ `Invalid worker choice strategy '${workerChoiceStrategy}'`
+ )
+ }
+ }
+
+ private checkValidWorkerChoiceStrategyOptions (
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ ): void {
+ if (!isPlainObject(workerChoiceStrategyOptions)) {
+ throw new TypeError(
+ 'Invalid worker choice strategy options: must be a plain object'
+ )
+ }
+ if (
+ workerChoiceStrategyOptions.weights != null &&
+ Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
+ ) {
+ throw new Error(
+ 'Invalid worker choice strategy options: must have a weight for each worker node'
+ )
+ }
+ if (
+ workerChoiceStrategyOptions.measurement != null &&
+ !Object.values(Measurements).includes(
+ workerChoiceStrategyOptions.measurement
+ )
+ ) {
+ throw new Error(
+ `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
+ )
+ }
+ }
+
+ private checkValidTasksQueueOptions (
+ tasksQueueOptions: TasksQueueOptions
+ ): void {
+ if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
+ throw new TypeError('Invalid tasks queue options: must be a plain object')
+ }
+ if (
+ tasksQueueOptions?.concurrency != null &&
+ !Number.isSafeInteger(tasksQueueOptions.concurrency)
+ ) {
+ throw new TypeError(
+ 'Invalid worker tasks concurrency: must be an integer'
+ )
+ }
+ if (
+ tasksQueueOptions?.concurrency != null &&
+ tasksQueueOptions.concurrency <= 0
+ ) {
+ throw new Error(
+ `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
+ )
+ }
+ }
+
+ /** @inheritDoc */
+ public get info (): PoolInfo {
+ return {
+ version,
+ type: this.type,
+ worker: this.worker,
+ ready: this.ready,
+ strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
+ minSize: this.minSize,
+ maxSize: this.maxSize,
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && { utilization: round(this.utilization) }),
+ workerNodes: this.workerNodes.length,
+ idleWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.usage.tasks.executing === 0
+ ? accumulator + 1
+ : accumulator,
+ 0
+ ),
+ busyWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
+ 0
+ ),
+ executedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.usage.tasks.executed,
+ 0
+ ),
+ executingTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.usage.tasks.executing,
+ 0
+ ),
+ queuedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.usage.tasks.queued,
+ 0
+ ),
+ maxQueuedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
+ 0
+ ),
+ failedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.usage.tasks.failed,
+ 0
+ ),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate && {
+ runTime: {
+ minimum: round(
+ Math.min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.runTime?.minimum ?? Infinity
+ )
+ )
+ ),
+ maximum: round(
+ Math.max(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
+ )
+ )
+ ),
+ average: round(
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
+ 0
+ ) /
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.tasks?.executed ?? 0),
+ 0
+ )
+ ),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.median && {
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.runTime?.median ?? 0
+ )
+ )
+ )
+ })
+ }
+ }),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && {
+ waitTime: {
+ minimum: round(
+ Math.min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
+ )
+ )
+ ),
+ maximum: round(
+ Math.max(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
+ )
+ )
+ ),
+ average: round(
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
+ 0
+ ) /
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.tasks?.executed ?? 0),
+ 0
+ )
+ ),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.median && {
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime?.median ?? 0
+ )
+ )
+ )
+ })
+ }
+ })
+ }
+ }
+
+ private get ready (): boolean {
+ return (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic && workerNode.info.ready
+ ? accumulator + 1
+ : accumulator,
+ 0
+ ) >= this.minSize
+ )
+ }
+
+ /**
+ * Gets the approximate pool utilization.