private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
+ this.checkValidWorkerChoiceStrategy(
+ opts.workerChoiceStrategy as WorkerChoiceStrategy
+ )
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
- this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
+ this.checkValidWorkerChoiceStrategyOptions(
+ opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
+ )
this.opts.workerChoiceStrategyOptions = {
...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
...opts.workerChoiceStrategyOptions
}
- this.checkValidWorkerChoiceStrategyOptions(
- this.opts.workerChoiceStrategyOptions
- )
this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
private checkValidWorkerChoiceStrategy (
workerChoiceStrategy: WorkerChoiceStrategy
): void {
- if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
+ if (
+ workerChoiceStrategy != null &&
+ !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
+ ) {
throw new Error(
`Invalid worker choice strategy '${workerChoiceStrategy}'`
)
private checkValidWorkerChoiceStrategyOptions (
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
): void {
- if (!isPlainObject(workerChoiceStrategyOptions)) {
+ if (
+ workerChoiceStrategyOptions != null &&
+ !isPlainObject(workerChoiceStrategyOptions)
+ ) {
throw new TypeError(
'Invalid worker choice strategy options: must be a plain object'
)
}
if (
- workerChoiceStrategyOptions.retries != null &&
+ workerChoiceStrategyOptions?.retries != null &&
!Number.isSafeInteger(workerChoiceStrategyOptions.retries)
) {
throw new TypeError(
)
}
if (
- workerChoiceStrategyOptions.retries != null &&
+ workerChoiceStrategyOptions?.retries != null &&
workerChoiceStrategyOptions.retries < 0
) {
throw new RangeError(
)
}
if (
- workerChoiceStrategyOptions.weights != null &&
+ workerChoiceStrategyOptions?.weights != null &&
Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
) {
throw new Error(
)
}
if (
- workerChoiceStrategyOptions.measurement != null &&
+ workerChoiceStrategyOptions?.measurement != null &&
!Object.values(Measurements).includes(
workerChoiceStrategyOptions.measurement
)
}
if (
tasksQueueOptions?.concurrency != null &&
- !Number.isSafeInteger(tasksQueueOptions?.concurrency)
+ !Number.isSafeInteger(tasksQueueOptions.concurrency)
) {
throw new TypeError(
'Invalid worker node tasks concurrency: must be an integer'
}
if (
tasksQueueOptions?.concurrency != null &&
- tasksQueueOptions?.concurrency <= 0
+ tasksQueueOptions.concurrency <= 0
) {
throw new RangeError(
- `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
+ `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
)
}
if (
tasksQueueOptions?.size != null &&
- !Number.isSafeInteger(tasksQueueOptions?.size)
+ !Number.isSafeInteger(tasksQueueOptions.size)
) {
throw new TypeError(
'Invalid worker node tasks queue size: must be an integer'
)
}
- if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
+ if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
throw new RangeError(
- `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
+ `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
)
}
}
}
}
+ private buildTasksQueueOptions (
+ tasksQueueOptions: TasksQueueOptions
+ ): TasksQueueOptions {
+ return {
+ ...{
+ size: Math.pow(this.maxSize, 2),
+ concurrency: 1,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
+ },
+ ...tasksQueueOptions
+ }
+ }
+
private setTasksQueueSize (size: number): void {
for (const workerNode of this.workerNodes) {
workerNode.tasksQueueBackPressureSize = size
}
}
- private buildTasksQueueOptions (
- tasksQueueOptions: TasksQueueOptions
- ): TasksQueueOptions {
- return {
- ...{
- size: Math.pow(this.maxSize, 2),
- concurrency: 1,
- taskStealing: true,
- tasksStealingOnBackPressure: true
- },
- ...tasksQueueOptions
- }
- }
-
/**
* Whether the pool is full or not.
*
this.workerNodes[workerNodeKey].closeChannel()
this.emitter?.emit(PoolEvents.error, error)
if (
- this.opts.restartWorkerOnError === true &&
this.started &&
- !this.starting
+ !this.starting &&
+ this.opts.restartWorkerOnError === true
) {
if (workerInfo.dynamic) {
this.createAndSetupDynamicWorkerNode()
this.createAndSetupWorkerNode()
}
}
- if (this.opts.enableTasksQueue === true) {
+ if (this.started && this.opts.enableTasksQueue === true) {
this.redistributeQueuedTasks(workerNodeKey)
}
})
)
workerInfo.ready = message.ready as boolean
workerInfo.taskFunctions = message.taskFunctions
- if (this.emitter != null && this.ready) {
- this.emitter.emit(PoolEvents.ready, this.info)
+ if (this.ready) {
+ this.emitter?.emit(PoolEvents.ready, this.info)
}
}