): boolean {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
if (workerChoiceStrategyOptions != null) {
- this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+ this.opts.workerChoiceStrategyOptions = {
+ ...this.opts.workerChoiceStrategyOptions,
+ ...workerChoiceStrategyOptions,
+ }
this.workerChoiceStrategiesContext?.setOptions(
this.opts.workerChoiceStrategyOptions
)
...getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
),
+ ...this.opts.tasksQueueOptions,
...tasksQueueOptions,
}
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const taskFunctionWorkerUsage = this.workerNodes[
workerNodeKey
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
].getTaskFunctionWorkerUsage(task.name!)!
++taskFunctionWorkerUsage.tasks.executing
updateWaitTimeWorkerUsage(
if (
this.cannotStealTask() ||
(this.info.stealingWorkerNodes ?? 0) >
- Math.floor(this.workerNodes.length / 2)
+ Math.round(
+ this.workerNodes.length *
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.tasksStealingRatio!
+ )
) {
if (previousStolenTask != null) {
workerInfo.stealing = false
this.cannotStealTask() ||
this.hasBackPressure() ||
(this.info.stealingWorkerNodes ?? 0) >
- Math.floor(this.workerNodes.length / 2)
+ Math.round(
+ this.workerNodes.length *
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.tasksStealingRatio!
+ )
) {
return
}
concurrency: 1,
taskStealing: true,
tasksStealingOnBackPressure: false,
+ tasksStealingRatio: 0.6,
tasksFinishedTimeout: 2000,
}
}
`Invalid worker node tasks queue size: ${tasksQueueOptions.size.toString()} is a negative integer or zero`
)
}
+ if (
+ tasksQueueOptions?.tasksStealingRatio != null &&
+ typeof tasksQueueOptions.tasksStealingRatio !== 'number'
+ ) {
+ throw new TypeError(
+ 'Invalid worker node tasks stealing ratio: must be a number'
+ )
+ }
+ if (
+ tasksQueueOptions?.tasksStealingRatio != null &&
+ (tasksQueueOptions.tasksStealingRatio < 0 ||
+ tasksQueueOptions.tasksStealingRatio > 1)
+ ) {
+ throw new RangeError(
+ 'Invalid worker node tasks stealing ratio: must be between 0 and 1'
+ )
+ }
}
export const checkWorkerNodeArguments = (
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: false,
+ tasksStealingRatio: 0.6,
tasksFinishedTimeout: 2000,
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
).toThrow(
new TypeError('Invalid worker node tasks queue size: must be an integer')
)
+ expect(
+ () =>
+ new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { tasksStealingRatio: '' },
+ }
+ )
+ ).toThrow(
+ new TypeError(
+ 'Invalid worker node tasks stealing ratio: must be a number'
+ )
+ )
+ expect(
+ () =>
+ new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { tasksStealingRatio: 1.1 },
+ }
+ )
+ ).toThrow(
+ new RangeError(
+ 'Invalid worker node tasks stealing ratio: must be between 0 and 1'
+ )
+ )
})
it('Verify that pool worker choice strategy options can be set', async () => {
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: false,
+ tasksStealingRatio: 0.6,
tasksFinishedTimeout: 2000,
})
pool.enableTasksQueue(true, { concurrency: 2 })
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: false,
+ tasksStealingRatio: 0.6,
tasksFinishedTimeout: 2000,
})
pool.enableTasksQueue(false)
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: false,
+ tasksStealingRatio: 0.6,
tasksFinishedTimeout: 2000,
})
for (const workerNode of pool.workerNodes) {
size: 2,
taskStealing: false,
tasksStealingOnBackPressure: false,
+ tasksStealingRatio: 0.5,
tasksFinishedTimeout: 3000,
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
size: 2,
taskStealing: false,
tasksStealingOnBackPressure: false,
+ tasksStealingRatio: 0.5,
tasksFinishedTimeout: 3000,
})
for (const workerNode of pool.workerNodes) {
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: Math.pow(numberOfWorkers, 2),
+ size: 2,
taskStealing: true,
tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 2000,
+ tasksStealingRatio: 0.5,
+ tasksFinishedTimeout: 3000,
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
new TypeError('Invalid worker node tasks queue size: must be an integer')
)
+ expect(() => pool.setTasksQueueOptions({ tasksStealingRatio: '' })).toThrow(
+ new TypeError(
+ 'Invalid worker node tasks stealing ratio: must be a number'
+ )
+ )
+ expect(() =>
+ pool.setTasksQueueOptions({ tasksStealingRatio: 1.1 })
+ ).toThrow(
+ new TypeError(
+ 'Invalid worker node tasks stealing ratio: must be between 0 and 1'
+ )
+ )
await pool.destroy()
})