import type {
IWorker,
IWorkerNode,
- TaskStatistics,
WorkerInfo,
WorkerNodeEventDetail,
- WorkerType,
- WorkerUsage
+ WorkerType
} from './worker.js'
import {
Measurements,
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
- checkValidWorkerChoiceStrategy(
- opts.workerChoiceStrategy as WorkerChoiceStrategy
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy!)
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategyOptions(
- opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ opts.workerChoiceStrategyOptions!
)
if (opts.workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
- checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ checkValidTasksQueueOptions(opts.tasksQueueOptions!)
this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
- opts.tasksQueueOptions as TasksQueueOptions
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ opts.tasksQueueOptions!
)
}
} else {
worker: this.worker,
started: this.started,
ready: this.ready,
- strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ strategy: this.opts.workerChoiceStrategy!,
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
this.workerChoiceStrategyContext.setOptions(
- this,
this.opts.workerChoiceStrategyOptions
)
}
this.flushTasksQueues()
}
this.opts.enableTasksQueue = enable
- this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.setTasksQueueOptions(tasksQueueOptions!)
}
/** @inheritDoc */
checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
- this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
if (this.opts.tasksQueueOptions.taskStealing === true) {
this.unsetTaskStealing()
this.setTaskStealing()
private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].on(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
- )
+ this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
+ 'idle',
+ this.handleWorkerNodeIdleEvent
)
}
}
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
workerNode =>
workerNode.info.ready &&
workerNode.usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
) === -1
)
}
if (this.opts.enableTasksQueue === true) {
return (
this.workerNodes[workerNodeKey].usage.tasks.executing >=
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
)
}
return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- const workerId = this.getWorkerInfo(workerNodeKey).id as number
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const workerId = this.getWorkerInfo(workerNodeKey).id!
if (
message.taskFunctionOperationStatus != null &&
message.workerId === workerId
new Error(
`Task function operation '${
message.taskFunctionOperation as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
}' failed on worker ${message.workerId} with error: '${
- message.workerError?.message as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ message.workerError!.message
}'`
)
)
new Error(
`Task function operation '${
message.taskFunctionOperation as string
- }' failed on worker ${
- errorResponse?.workerId as number
- } with error: '${
- errorResponse?.workerError?.message as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ }' failed on worker ${errorResponse!
+ .workerId!} with error: '${
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ errorResponse!.workerError!.message
}'`
)
)
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
this.workerNodes[workerNodeKey].usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
)
}
timestamp,
taskId: randomUUID()
}
- this.promiseResponseMap.set(task.taskId as string, {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.promiseResponseMap.set(task.taskId!, {
resolve,
reject,
workerNodeKey,
} else if (message.kill === 'failure') {
reject(
new Error(
- `Kill message handling failed on worker ${
- message.workerId as number
- }`
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ `Kill message handling failed on worker ${message.workerId!}`
)
)
}
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
- this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
- task.name as string
- ) != null
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
+ null
) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const taskFunctionWorkerUsage = this.workerNodes[
workerNodeKey
- ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(task.name!)!
++taskFunctionWorkerUsage.tasks.executing
updateWaitTimeWorkerUsage(
this.workerChoiceStrategyContext,
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
- message.taskPerformance?.name as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ message.taskPerformance!.name
) != null
) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const taskFunctionWorkerUsage = this.workerNodes[
workerNodeKey
- ].getTaskFunctionWorkerUsage(
- message.taskPerformance?.name as string
- ) as WorkerUsage
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
updateRunTimeWorkerUsage(
this.workerChoiceStrategyContext,
})
}
})
- const workerInfo = this.getWorkerInfo(workerNodeKey)
this.sendToWorker(workerNodeKey, {
checkActive: true
})
})
}
}
- workerInfo.dynamic = true
+ const workerNode = this.workerNodes[workerNodeKey]
+ workerNode.info.dynamic = true
if (
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
) {
- workerInfo.ready = true
+ workerNode.info.ready = true
}
this.checkAndEmitDynamicWorkerCreationEvents()
return workerNodeKey
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
this.workerNodes[workerNodeKey].on(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
+ 'idle',
+ this.handleWorkerNodeIdleEvent
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
)
this.handleTask(
destinationWorkerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.dequeueTask(workerNodeKey)!
)
}
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
++taskFunctionWorkerUsage.tasks.stolen
}
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
++taskFunctionWorkerUsage.tasks.sequentiallyStolen
}
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
}
}
- private readonly handleIdleWorkerNodeEvent = (
+ private readonly handleWorkerNodeIdleEvent = (
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
}
if (
this.cannotStealTask() ||
- (this.info.stealingWorkerNodes as number) >
- Math.floor(this.workerNodes.length / 2)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
) {
if (previousStolenTask != null) {
this.getWorkerInfo(workerNodeKey).stealing = false
this.tasksQueueSize(workerNodeKey) > 0)
) {
this.getWorkerInfo(workerNodeKey).stealing = false
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
for (const taskName of this.workerNodes[workerNodeKey].info
- .taskFunctionNames as string[]) {
+ .taskFunctionNames!) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
taskName
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
stolenTask != null
) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const taskFunctionTasksWorkerUsage = this.workerNodes[
workerNodeKey
- ].getTaskFunctionWorkerUsage(stolenTask.name as string)
- ?.tasks as TaskStatistics
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
if (
taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
(previousStolenTask != null &&
) {
this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
- stolenTask.name as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ stolenTask.name!
)
} else {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
- stolenTask.name as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ stolenTask.name!
)
}
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
- this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
+ this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
.catch(EMPTY_FUNCTION)
sourceWorkerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
- const task = sourceWorkerNode.popTask() as Task<Data>
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const task = sourceWorkerNode.popTask()!
this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
- this.updateTaskStolenStatisticsWorkerUsage(
- workerNodeKey,
- task.name as string
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
return task
}
}
- private readonly handleBackPressureEvent = (
+ private readonly handleWorkerNodeBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
if (
this.cannotStealTask() ||
- (this.info.stealingWorkerNodes as number) >
- Math.floor(this.workerNodes.length / 2)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
) {
return
}
const { workerId } = eventDetail
const sizeOffset = 1
- if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
return
}
const sourceWorkerNode =
!workerNode.info.stealing &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
- (this.opts.tasksQueueOptions?.size as number) - sizeOffset
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.size! - sizeOffset
) {
this.getWorkerInfo(workerNodeKey).stealing = true
- const task = sourceWorkerNode.popTask() as Task<Data>
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const task = sourceWorkerNode.popTask()!
this.handleTask(workerNodeKey, task)
- this.updateTaskStolenStatisticsWorkerUsage(
- workerNodeKey,
- task.name as string
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
this.getWorkerInfo(workerNodeKey).stealing = false
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
- if (ready === false) {
- throw new Error(`Worker ${workerId as number} failed to initialize`)
+ if (ready == null || !ready) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ throw new Error(`Worker ${workerId!} failed to initialize`)
}
- const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- )
- workerInfo.ready = ready as boolean
- workerInfo.taskFunctionNames = taskFunctionNames
+ const workerNode =
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ workerNode.info.ready = ready
+ workerNode.info.taskFunctionNames = taskFunctionNames
if (!this.readyEventEmitted && this.ready) {
this.readyEventEmitted = true
this.emitter?.emit(PoolEvents.ready, this.info)
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
const { workerId, taskId, workerError, data } = message
- const promiseResponse = this.promiseResponseMap.get(taskId as string)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const promiseResponse = this.promiseResponseMap.get(taskId!)
if (promiseResponse != null) {
const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
const workerNode = this.workerNodes[workerNodeKey]
}
asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
- this.promiseResponseMap.delete(taskId as string)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.promiseResponseMap.delete(taskId!)
workerNode?.emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true && !this.destroying) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
workerNodeTasksUsage.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
}
if (
workerNodeTasksUsage.executing === 0 &&
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- workerNode.emit('idleWorkerNode', {
- workerId: workerId as number,
+ workerNode.emit('idle', {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerId: workerId!,
workerNodeKey
})
}
protected flushTasksQueue (workerNodeKey: number): number {
let flushedTasks = 0
while (this.tasksQueueSize(workerNodeKey) > 0) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
++flushedTasks
}
this.workerNodes[workerNodeKey].clearTasksQueue()