-} from '../utils'
-import { KillBehaviors } from '../worker/worker-options'
-import type { TaskFunction } from '../worker/task-functions'
+} from '../utils.js'
+import { KillBehaviors } from '../worker/worker-options.js'
+import type { TaskFunction } from '../worker/task-functions.js'
import {
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
import {
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
-} from './selection-strategies/selection-strategies-types'
-import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
-import { version } from './version'
-import { WorkerNode } from './worker-node'
+} from './selection-strategies/selection-strategies-types.js'
+import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+import { version } from './version.js'
+import { WorkerNode } from './worker-node.js'
updateTaskStatisticsWorkerUsage,
updateWaitTimeWorkerUsage,
waitWorkerNodeEvents
updateTaskStatisticsWorkerUsage,
updateWaitTimeWorkerUsage,
waitWorkerNodeEvents
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
- checkValidWorkerChoiceStrategy(
- opts.workerChoiceStrategy as WorkerChoiceStrategy
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy!)
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategyOptions(
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategyOptions(
)
if (opts.workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
)
if (opts.workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
this.workerChoiceStrategyContext.setOptions(
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
this.workerChoiceStrategyContext.setOptions(
checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
- }' failed on worker ${
- errorResponse?.workerId as number
- } with error: '${
- errorResponse?.workerError?.message as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ }' failed on worker ${errorResponse!
+ .workerId!} with error: '${
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ errorResponse!.workerError!.message
- `Kill message handling failed on worker ${
- message.workerId as number
- }`
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ `Kill message handling failed on worker ${message.workerId!}`
++taskFunctionWorkerUsage.tasks.executing
updateWaitTimeWorkerUsage(
this.workerChoiceStrategyContext,
++taskFunctionWorkerUsage.tasks.executing
updateWaitTimeWorkerUsage(
this.workerChoiceStrategyContext,
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
- ].getTaskFunctionWorkerUsage(
- message.taskPerformance?.name as string
- ) as WorkerUsage
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
updateRunTimeWorkerUsage(
this.workerChoiceStrategyContext,
updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
updateRunTimeWorkerUsage(
this.workerChoiceStrategyContext,
if (
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
) {
if (
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
) {
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
this.workerNodes[workerNodeKey].on(
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
this.workerNodes[workerNodeKey].on(
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
this.workerNodes[workerNodeKey].on(
'backPressure',
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
this.workerNodes[workerNodeKey].on(
'backPressure',
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
- (this.info.stealingWorkerNodes as number) >
- Math.floor(this.workerNodes.length / 2)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
- ].getTaskFunctionWorkerUsage(stolenTask.name as string)
- ?.tasks as TaskStatistics
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
- (this.info.stealingWorkerNodes as number) >
- Math.floor(this.workerNodes.length / 2)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
!workerNode.info.stealing &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
!workerNode.info.stealing &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
- if (ready === false) {
- throw new Error(`Worker ${workerId as number} failed to initialize`)
+ if (ready == null || !ready) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ throw new Error(`Worker ${workerId!} failed to initialize`)
- const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- )
- workerInfo.ready = ready as boolean
- workerInfo.taskFunctionNames = taskFunctionNames
+ const workerNode =
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ workerNode.info.ready = ready
+ workerNode.info.taskFunctionNames = taskFunctionNames
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
const { workerId, taskId, workerError, data } = message
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
const { workerId, taskId, workerError, data } = message
if (promiseResponse != null) {
const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
const workerNode = this.workerNodes[workerNodeKey]
if (promiseResponse != null) {
const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
const workerNode = this.workerNodes[workerNodeKey]
}
asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
}
asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
workerNode?.emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true && !this.destroying) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
workerNodeTasksUsage.executing <
workerNode?.emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true && !this.destroying) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
workerNodeTasksUsage.executing <
}
if (
workerNodeTasksUsage.executing === 0 &&
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
}
if (
workerNodeTasksUsage.executing === 0 &&
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- workerNode.emit('idleWorkerNode', {
- workerId: workerId as number,
+ workerNode.emit('idle', {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerId: workerId!,
protected flushTasksQueue (workerNodeKey: number): number {
let flushedTasks = 0
while (this.tasksQueueSize(workerNodeKey) > 0) {
protected flushTasksQueue (workerNodeKey: number): number {
let flushedTasks = 0
while (this.tasksQueueSize(workerNodeKey) > 0) {