+import { AsyncResource } from 'node:async_hooks'
import { randomUUID } from 'node:crypto'
+import { EventEmitterAsyncResource } from 'node:events'
import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
-import { EventEmitterAsyncResource } from 'node:events'
-import { AsyncResource } from 'node:async_hooks'
+
import type {
MessageValue,
PromiseResponseWrapper,
Task
} from '../utility-types.js'
import {
+ average,
DEFAULT_TASK_NAME,
EMPTY_FUNCTION,
- average,
exponentialDelay,
isKillBehavior,
isPlainObject,
round,
sleep
} from '../utils.js'
-import { KillBehaviors } from '../worker/worker-options.js'
import type { TaskFunction } from '../worker/task-functions.js'
+import { KillBehaviors } from '../worker/worker-options.js'
import {
type IPool,
PoolEvents,
PoolTypes,
type TasksQueueOptions
} from './pool.js'
-import type {
- IWorker,
- IWorkerNode,
- WorkerInfo,
- WorkerNodeEventDetail,
- WorkerType
-} from './worker.js'
import {
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types.js'
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
-import { version } from './version.js'
-import { WorkerNode } from './worker-node.js'
import {
checkFilePath,
checkValidTasksQueueOptions,
updateWaitTimeWorkerUsage,
waitWorkerNodeEvents
} from './utils.js'
+import { version } from './version.js'
+import type {
+ IWorker,
+ IWorkerNode,
+ WorkerInfo,
+ WorkerNodeEventDetail,
+ WorkerType
+} from './worker.js'
+import { WorkerNode } from './worker-node.js'
/**
* Base class that implements some shared logic for all poolifier pools.
* Whether the pool is destroying or not.
*/
private destroying: boolean
+ /**
+ * Whether the minimum number of workers is starting or not.
+ */
+ private startingMinimumNumberOfWorkers: boolean
/**
* Whether the pool ready event has been emitted or not.
*/
this.starting = false
this.destroying = false
this.readyEventEmitted = false
+ this.startingMinimumNumberOfWorkers = false
if (this.opts.startWorkers === true) {
this.start()
}
})
}
+ /**
+ * Starts the minimum number of workers.
+ */
+ private startMinimumNumberOfWorkers (): void {
+ this.startingMinimumNumberOfWorkers = true
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.minimumNumberOfWorkers
+ ) {
+ this.createAndSetupWorkerNode()
+ }
+ this.startingMinimumNumberOfWorkers = false
+ }
+
/** @inheritdoc */
public start (): void {
if (this.started) {
throw new Error('Cannot start a destroying pool')
}
this.starting = true
- while (
- this.workerNodes.reduce(
- (accumulator, workerNode) =>
- !workerNode.info.dynamic ? accumulator + 1 : accumulator,
- 0
- ) < this.minimumNumberOfWorkers
- ) {
- this.createAndSetupWorkerNode()
- }
+ this.startMinimumNumberOfWorkers()
this.starting = false
this.started = true
}
)
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
- this.emitter?.removeAllListeners()
this.readyEventEmitted = false
this.destroying = false
this.started = false
'error',
this.opts.errorHandler ?? EMPTY_FUNCTION
)
- workerNode.registerWorkerEventHandler('error', (error: Error) => {
+ workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
workerNode.info.ready = false
this.emitter?.emit(PoolEvents.error, error)
if (
) {
if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
- } else {
- this.createAndSetupWorkerNode()
+ } else if (!this.startingMinimumNumberOfWorkers) {
+ this.startMinimumNumberOfWorkers()
}
}
if (
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- workerNode?.terminate().catch(error => {
+ workerNode?.terminate().catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
})
)
workerNode.registerOnceWorkerEventHandler('exit', () => {
this.removeWorkerNode(workerNode)
+ if (
+ this.started &&
+ !this.startingMinimumNumberOfWorkers &&
+ !this.destroying
+ ) {
+ this.startMinimumNumberOfWorkers()
+ }
})
const workerNodeKey = this.addWorkerNode(workerNode)
this.afterWorkerNodeSetup(workerNodeKey)
) {
// Flag the worker node as not ready immediately
this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
- this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
+ this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
}
taskFunctionOperation: 'add',
taskFunctionName,
taskFunction: taskFunction.toString()
- }).catch(error => {
+ }).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
}
) {
workerInfo.stealing = false
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- for (const taskName of this.workerNodes[workerNodeKey].info
- .taskFunctionNames!) {
+ for (const taskName of workerInfo.taskFunctionNames!) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
taskName
this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
- .catch(error => {
+ .catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
}