private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
+ this.checkValidWorkerChoiceStrategy(
+ opts.workerChoiceStrategy as WorkerChoiceStrategy
+ )
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
- this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
+ this.checkValidWorkerChoiceStrategyOptions(
+ opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
+ )
this.opts.workerChoiceStrategyOptions = {
...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
...opts.workerChoiceStrategyOptions
}
- this.checkValidWorkerChoiceStrategyOptions(
- this.opts.workerChoiceStrategyOptions
- )
this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
private checkValidWorkerChoiceStrategy (
workerChoiceStrategy: WorkerChoiceStrategy
): void {
- if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
+ if (
+ workerChoiceStrategy != null &&
+ !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
+ ) {
throw new Error(
`Invalid worker choice strategy '${workerChoiceStrategy}'`
)
private checkValidWorkerChoiceStrategyOptions (
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
): void {
- if (!isPlainObject(workerChoiceStrategyOptions)) {
+ if (
+ workerChoiceStrategyOptions != null &&
+ !isPlainObject(workerChoiceStrategyOptions)
+ ) {
throw new TypeError(
'Invalid worker choice strategy options: must be a plain object'
)
}
if (
- workerChoiceStrategyOptions.retries != null &&
+ workerChoiceStrategyOptions?.retries != null &&
!Number.isSafeInteger(workerChoiceStrategyOptions.retries)
) {
throw new TypeError(
)
}
if (
- workerChoiceStrategyOptions.retries != null &&
+ workerChoiceStrategyOptions?.retries != null &&
workerChoiceStrategyOptions.retries < 0
) {
throw new RangeError(
)
}
if (
- workerChoiceStrategyOptions.weights != null &&
+ workerChoiceStrategyOptions?.weights != null &&
Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
) {
throw new Error(
)
}
if (
- workerChoiceStrategyOptions.measurement != null &&
+ workerChoiceStrategyOptions?.measurement != null &&
!Object.values(Measurements).includes(
workerChoiceStrategyOptions.measurement
)
}
if (
tasksQueueOptions?.concurrency != null &&
- !Number.isSafeInteger(tasksQueueOptions?.concurrency)
+ !Number.isSafeInteger(tasksQueueOptions.concurrency)
) {
throw new TypeError(
'Invalid worker node tasks concurrency: must be an integer'
}
if (
tasksQueueOptions?.concurrency != null &&
- tasksQueueOptions?.concurrency <= 0
+ tasksQueueOptions.concurrency <= 0
) {
throw new RangeError(
- `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
+ `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
)
}
if (
tasksQueueOptions?.size != null &&
- !Number.isSafeInteger(tasksQueueOptions?.size)
+ !Number.isSafeInteger(tasksQueueOptions.size)
) {
throw new TypeError(
'Invalid worker node tasks queue size: must be an integer'
)
}
- if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
+ if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
throw new RangeError(
- `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
+ `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
)
}
}
tasksQueueOptions?: TasksQueueOptions
): void {
if (this.opts.enableTasksQueue === true && !enable) {
+ this.unsetTaskStealing()
+ this.unsetTasksStealingOnBackPressure()
this.flushTasksQueues()
}
this.opts.enableTasksQueue = enable
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
+ if (this.opts.tasksQueueOptions.taskStealing === true) {
+ this.setTaskStealing()
+ } else {
+ this.unsetTaskStealing()
+ }
+ if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+ this.setTasksStealingOnBackPressure()
+ } else {
+ this.unsetTasksStealingOnBackPressure()
+ }
} else if (this.opts.tasksQueueOptions != null) {
delete this.opts.tasksQueueOptions
}
}
}
+ private setTaskStealing (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.workerNodes[workerNodeKey].onEmptyQueue =
+ this.taskStealingOnEmptyQueue.bind(this)
+ }
+ }
+
+ private unsetTaskStealing (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ delete this.workerNodes[workerNodeKey].onEmptyQueue
+ }
+ }
+
+ private setTasksStealingOnBackPressure (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.workerNodes[workerNodeKey].onBackPressure =
+ this.tasksStealingOnBackPressure.bind(this)
+ }
+ }
+
+ private unsetTasksStealingOnBackPressure (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ delete this.workerNodes[workerNodeKey].onBackPressure
+ }
+ }
+
private buildTasksQueueOptions (
tasksQueueOptions: TasksQueueOptions
): TasksQueueOptions {