* Whether the pool is destroying or not.
*/
private destroying: boolean
+ /**
+ * Whether the minimum number of workers is starting or not.
+ */
+ private startingMinimumNumberOfWorkers: boolean
/**
* Whether the pool ready event has been emitted or not.
*/
this.starting = false
this.destroying = false
this.readyEventEmitted = false
+ this.startingMinimumNumberOfWorkers = false
if (this.opts.startWorkers === true) {
this.start()
}
public async execute (
data?: Data,
name?: string,
- transferList?: TransferListItem[]
+ transferList?: readonly TransferListItem[]
): Promise<Response> {
return await new Promise<Response>((resolve, reject) => {
if (!this.started) {
})
}
+ /**
+ * Starts the minimum number of workers.
+ */
+ private startMinimumNumberOfWorkers (): void {
+ this.startingMinimumNumberOfWorkers = true
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.minimumNumberOfWorkers
+ ) {
+ this.createAndSetupWorkerNode()
+ }
+ this.startingMinimumNumberOfWorkers = false
+ }
+
/** @inheritdoc */
public start (): void {
if (this.started) {
throw new Error('Cannot start a destroying pool')
}
this.starting = true
- while (
- this.workerNodes.reduce(
- (accumulator, workerNode) =>
- !workerNode.info.dynamic ? accumulator + 1 : accumulator,
- 0
- ) < this.minimumNumberOfWorkers
- ) {
- this.createAndSetupWorkerNode()
- }
+ this.startMinimumNumberOfWorkers()
this.starting = false
this.started = true
}
}
/**
- * Should return whether the worker is the main worker or not.
+ * Returns whether the worker is the main worker or not.
+ *
+ * @returns `true` if the worker is the main worker, `false` otherwise.
*/
protected abstract isMain (): boolean
protected abstract sendToWorker (
workerNodeKey: number,
message: MessageValue<Data>,
- transferList?: TransferListItem[]
+ transferList?: readonly TransferListItem[]
): void
/**
'error',
this.opts.errorHandler ?? EMPTY_FUNCTION
)
- workerNode.registerWorkerEventHandler('error', (error: Error) => {
+ workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
workerNode.info.ready = false
this.emitter?.emit(PoolEvents.error, error)
if (
) {
if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
- } else {
- this.createAndSetupWorkerNode()
+ } else if (!this.startingMinimumNumberOfWorkers) {
+ this.startMinimumNumberOfWorkers()
}
}
if (
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- workerNode?.terminate().catch(error => {
+ workerNode?.terminate().catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
})
)
workerNode.registerOnceWorkerEventHandler('exit', () => {
this.removeWorkerNode(workerNode)
+ if (
+ this.started &&
+ !this.startingMinimumNumberOfWorkers &&
+ !this.destroying
+ ) {
+ this.startMinimumNumberOfWorkers()
+ }
})
const workerNodeKey = this.addWorkerNode(workerNode)
this.afterWorkerNodeSetup(workerNodeKey)
) {
// Flag the worker node as not ready immediately
this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
- this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
+ this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
}
taskFunctionOperation: 'add',
taskFunctionName,
taskFunction: taskFunction.toString()
- }).catch(error => {
+ }).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
}
this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
- .catch(error => {
+ .catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
}