## [Unreleased]
+### Added
+
+- Add queued tasks end timeout support to worker node termination.
+
## [3.1.4] - 2023-12-18
### Fixed
### Fixed
-- Wait for queued tasks to end at worker termination.
+- Wait for queued tasks to end at worker node termination.
## [3.1.1] - 2023-12-16
- `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
- `taskStealing` (optional) - Task stealing enablement on idle.
- `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement under back pressure.
+ - `tasksFinishedTimeout` (optional) - Queued tasks finished timeout in milliseconds at worker termination.
- Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true }`
+ Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true, tasksFinishedTimeout: 1000 }`
- `workerOptions` (optional) - An object with the worker options to pass to worker. See [worker_threads](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for more details.
checkFilePath,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
+ getDefaultTasksQueueOptions,
updateEluWorkerUsage,
updateRunTimeWorkerUsage,
updateTaskStatisticsWorkerUsage,
}
}
- /**
- * Gets the given worker its worker node key.
- *
- * @param worker - The worker.
- * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
- */
- private getWorkerNodeKeyByWorker (worker: Worker): number {
- return this.workerNodes.findIndex(
- workerNode => workerNode.worker === worker
- )
- }
-
/**
* Gets the worker node key given its worker id.
*
tasksQueueOptions: TasksQueueOptions
): TasksQueueOptions {
return {
- ...{
- size: Math.pow(this.maxSize, 2),
- concurrency: 1,
- taskStealing: true,
- tasksStealingOnBackPressure: true
- },
+ ...getDefaultTasksQueueOptions(this.maxSize),
...tasksQueueOptions
}
}
this.flagWorkerNodeAsNotReady(workerNodeKey)
const flushedTasks = this.flushTasksQueue(workerNodeKey)
const workerNode = this.workerNodes[workerNodeKey]
- await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks)
+ await waitWorkerNodeEvents(
+ workerNode,
+ 'taskFinished',
+ flushedTasks,
+ this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
+ getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout
+ )
await this.sendKillMessageToWorker(workerNodeKey)
await workerNode.terminate()
}
if (this.started && this.opts.enableTasksQueue === true) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
- workerNode.terminate().catch(error => {
+ workerNode?.terminate().catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
})
}
private redistributeQueuedTasks (workerNodeKey: number): void {
+ if (workerNodeKey === -1) {
+ return
+ }
if (this.workerNodes.length <= 1) {
return
}
env: this.opts.env,
workerOptions: this.opts.workerOptions,
tasksQueueBackPressureSize:
- this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+ this.opts.tasksQueueOptions?.size ??
+ getDefaultTasksQueueOptions(this.maxSize).size
}
)
// Flag the worker node as ready at pool startup.
* @defaultValue true
*/
readonly tasksStealingOnBackPressure?: boolean
+ /**
+ * Queued tasks finished timeout in milliseconds at worker node termination.
+ *
+ * @defaultValue 1000
+ */
+ readonly tasksFinishedTimeout?: number
}
/**
} from './worker'
import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+export const getDefaultTasksQueueOptions = (
+ poolMaxSize: number
+): Required<TasksQueueOptions> => {
+ return {
+ size: Math.pow(poolMaxSize, 2),
+ concurrency: 1,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 1000
+ }
+}
+
export const checkFilePath = (filePath: string): void => {
if (filePath == null) {
throw new TypeError('The worker file path must be specified')
>(
workerNode: IWorkerNode<Worker, Data>,
workerNodeEvent: string,
- numberOfEventsToWait: number
+ numberOfEventsToWait: number,
+ timeout: number
): Promise<number> => {
return await new Promise<number>(resolve => {
let events = 0
resolve(events)
}
})
+ if (timeout > 0) {
+ setTimeout(() => {
+ resolve(events)
+ }, timeout)
+ }
})
}
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 1000
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
workerChoiceStrategyOptions: {
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 1000
})
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 1000
})
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 1000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
concurrency: 2,
size: 2,
taskStealing: false,
- tasksStealingOnBackPressure: false
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 2000
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
size: 2,
taskStealing: false,
- tasksStealingOnBackPressure: false
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 1000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
} from '../../lib/circular-array.js'
import {
createWorker,
+ getDefaultTasksQueueOptions,
updateMeasurementStatistics
} from '../../lib/pools/utils.js'
import { WorkerTypes } from '../../lib/index.js'
describe('Pool utils test suite', () => {
+ it('Verify getDefaultTasksQueueOptions() behavior', () => {
+ const poolMaxSize = 4
+ expect(getDefaultTasksQueueOptions(poolMaxSize)).toStrictEqual({
+ concurrency: 1,
+ size: Math.pow(poolMaxSize, 2),
+ taskStealing: true,
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 1000
+ })
+ })
+
it('Verify updateMeasurementStatistics() behavior', () => {
const measurementStatistics = {
history: new CircularArray()