MessageValue,
PromiseResponseWrapper,
Task
-} from '../utility-types'
+} from '../utility-types.js'
import {
DEFAULT_TASK_NAME,
EMPTY_FUNCTION,
min,
round,
sleep
-} from '../utils'
-import { KillBehaviors } from '../worker/worker-options'
-import type { TaskFunction } from '../worker/task-functions'
+} from '../utils.js'
+import { KillBehaviors } from '../worker/worker-options.js'
+import type { TaskFunction } from '../worker/task-functions.js'
import {
type IPool,
PoolEvents,
type PoolType,
PoolTypes,
type TasksQueueOptions
-} from './pool'
+} from './pool.js'
import type {
IWorker,
IWorkerNode,
- TaskStatistics,
WorkerInfo,
WorkerNodeEventDetail,
- WorkerType,
- WorkerUsage
-} from './worker'
+ WorkerType
+} from './worker.js'
import {
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
-} from './selection-strategies/selection-strategies-types'
-import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
-import { version } from './version'
-import { WorkerNode } from './worker-node'
+} from './selection-strategies/selection-strategies-types.js'
+import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+import { version } from './version.js'
+import { WorkerNode } from './worker-node.js'
import {
checkFilePath,
checkValidTasksQueueOptions,
updateTaskStatisticsWorkerUsage,
updateWaitTimeWorkerUsage,
waitWorkerNodeEvents
-} from './utils'
+} from './utils.js'
/**
* Base class that implements some shared logic for all poolifier pools.
/**
* Worker choice strategy context referencing a worker choice algorithm implementation.
*/
- protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+ protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
Worker,
Data,
Response
}
}
- private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
+ private checkMinimumNumberOfWorkers (
+ minimumNumberOfWorkers: number | undefined
+ ): void {
if (minimumNumberOfWorkers == null) {
throw new Error(
'Cannot instantiate a pool without specifying the number of workers'
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
- checkValidWorkerChoiceStrategy(
- opts.workerChoiceStrategy as WorkerChoiceStrategy
- )
+ checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategyOptions(
- opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
+ opts.workerChoiceStrategyOptions
)
if (opts.workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
- checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
+ checkValidTasksQueueOptions(opts.tasksQueueOptions)
this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
- opts.tasksQueueOptions as TasksQueueOptions
+ opts.tasksQueueOptions
)
}
} else {
}
private checkValidWorkerChoiceStrategyOptions (
- workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
): void {
if (
workerChoiceStrategyOptions != null &&
worker: this.worker,
started: this.started,
ready: this.ready,
- strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ strategy: this.opts.workerChoiceStrategy!,
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .runTime.aggregate &&
- this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .waitTime.aggregate && { utilization: round(this.utilization) }),
+ ?.runTime.aggregate === true &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && {
+ utilization: round(this.utilization)
+ }),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
...(this.opts.enableTasksQueue === true && {
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
+ accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
0
)
}),
0
),
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .runTime.aggregate && {
+ ?.runTime.aggregate === true && {
runTime: {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime?.minimum ?? Infinity
+ workerNode => workerNode.usage.runTime.minimum ?? Infinity
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
+ workerNode => workerNode.usage.runTime.maximum ?? -Infinity
)
)
),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.runTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.runTime.median && {
median: round(
median(
}
}),
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .waitTime.aggregate && {
+ ?.waitTime.aggregate === true && {
waitTime: {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
+ workerNode => workerNode.usage.waitTime.minimum ?? Infinity
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
+ workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
)
)
),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.waitTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.waitTime.median && {
median: round(
median(
(this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
+ accumulator + (workerNode.usage.runTime.aggregate ?? 0),
0
)
const totalTasksWaitTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
+ accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
0
)
return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
): void {
checkValidWorkerChoiceStrategy(workerChoiceStrategy)
this.opts.workerChoiceStrategy = workerChoiceStrategy
- this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
this.opts.workerChoiceStrategy
)
if (workerChoiceStrategyOptions != null) {
/** @inheritDoc */
public setWorkerChoiceStrategyOptions (
- workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
): void {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
if (workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
- this.workerChoiceStrategyContext.setOptions(
- this,
+ this.workerChoiceStrategyContext?.setOptions(
this.opts.workerChoiceStrategyOptions
)
}
this.flushTasksQueues()
}
this.opts.enableTasksQueue = enable
- this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
+ this.setTasksQueueOptions(tasksQueueOptions)
}
/** @inheritDoc */
- public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
+ public setTasksQueueOptions (
+ tasksQueueOptions: TasksQueueOptions | undefined
+ ): void {
if (this.opts.enableTasksQueue === true) {
checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
- this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
if (this.opts.tasksQueueOptions.taskStealing === true) {
this.unsetTaskStealing()
this.setTaskStealing()
}
private buildTasksQueueOptions (
- tasksQueueOptions: TasksQueueOptions
+ tasksQueueOptions: TasksQueueOptions | undefined
): TasksQueueOptions {
return {
...getDefaultTasksQueueOptions(
private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].on(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
- )
+ this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
+ 'idle',
+ this.handleWorkerNodeIdleEvent
)
}
}
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
workerNode =>
workerNode.info.ready &&
workerNode.usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
) === -1
)
}
if (this.opts.enableTasksQueue === true) {
return (
this.workerNodes[workerNodeKey].usage.tasks.executing >=
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
)
}
return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- const workerId = this.getWorkerInfo(workerNodeKey).id as number
+ const workerId = this.getWorkerInfo(workerNodeKey)?.id
if (
message.taskFunctionOperationStatus != null &&
message.workerId === workerId
) {
if (message.taskFunctionOperationStatus) {
resolve(true)
- } else if (!message.taskFunctionOperationStatus) {
+ } else {
reject(
new Error(
- `Task function operation '${
- message.taskFunctionOperation as string
- }' failed on worker ${message.workerId} with error: '${
- message.workerError?.message as string
- }'`
+ `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
)
)
}
new Error(
`Task function operation '${
message.taskFunctionOperation as string
- }' failed on worker ${
- errorResponse?.workerId as number
- } with error: '${
- errorResponse?.workerError?.message as string
+ }' failed on worker ${errorResponse?.workerId} with error: '${
+ errorResponse?.workerError?.message
}'`
)
)
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
this.workerNodes[workerNodeKey].usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
)
}
timestamp,
taskId: randomUUID()
}
- this.promiseResponseMap.set(task.taskId as string, {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.promiseResponseMap.set(task.taskId!, {
resolve,
reject,
workerNodeKey,
private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
- if (this.workerNodes?.[workerNodeKey] == null) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (this.workerNodes[workerNodeKey] == null) {
resolve()
return
}
} else if (message.kill === 'failure') {
reject(
new Error(
- `Kill message handling failed on worker ${
- message.workerId as number
- }`
+ `Kill message handling failed on worker ${message.workerId}`
)
)
}
workerNodeKey: number,
task: Task<Data>
): void {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
- this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
- task.name as string
- ) != null
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
+ null
) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const taskFunctionWorkerUsage = this.workerNodes[
workerNodeKey
- ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(task.name!)!
++taskFunctionWorkerUsage.tasks.executing
updateWaitTimeWorkerUsage(
this.workerChoiceStrategyContext,
message: MessageValue<Response>
): void {
let needWorkerChoiceStrategyUpdate = false
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
updateTaskStatisticsWorkerUsage(workerUsage, message)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
- message.taskPerformance?.name as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ message.taskPerformance!.name
) != null
) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const taskFunctionWorkerUsage = this.workerNodes[
workerNodeKey
- ].getTaskFunctionWorkerUsage(
- message.taskPerformance?.name as string
- ) as WorkerUsage
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
updateRunTimeWorkerUsage(
this.workerChoiceStrategyContext,
needWorkerChoiceStrategyUpdate = true
}
if (needWorkerChoiceStrategyUpdate) {
- this.workerChoiceStrategyContext.update(workerNodeKey)
+ this.workerChoiceStrategyContext?.update(workerNodeKey)
}
}
if (this.shallCreateDynamicWorker()) {
const workerNodeKey = this.createAndSetupDynamicWorkerNode()
if (
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ this.workerChoiceStrategyContext?.getStrategyPolicy()
+ .dynamicWorkerUsage === true
) {
return workerNodeKey
}
}
- return this.workerChoiceStrategyContext.execute()
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ return this.workerChoiceStrategyContext!.execute()
}
/**
) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
workerNode?.terminate().catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
})
}
})
- const workerInfo = this.getWorkerInfo(workerNodeKey)
this.sendToWorker(workerNodeKey, {
checkActive: true
})
})
}
}
- workerInfo.dynamic = true
+ const workerNode = this.workerNodes[workerNodeKey]
+ workerNode.info.dynamic = true
if (
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ this.workerChoiceStrategyContext?.getStrategyPolicy()
+ .dynamicWorkerReady === true ||
+ this.workerChoiceStrategyContext?.getStrategyPolicy()
+ .dynamicWorkerUsage === true
) {
- workerInfo.ready = true
+ workerNode.info.ready = true
}
this.checkAndEmitDynamicWorkerCreationEvents()
return workerNodeKey
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
this.workerNodes[workerNodeKey].on(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
+ 'idle',
+ this.handleWorkerNodeIdleEvent
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
this.sendToWorker(workerNodeKey, {
statistics: {
runTime:
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .runTime.aggregate,
- elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .elu.aggregate
+ this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ?.runTime.aggregate ?? false,
+ elu:
+ this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu
+ .aggregate ?? false
}
})
}
)
this.handleTask(
destinationWorkerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.dequeueTask(workerNodeKey)!
)
}
}
taskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
++workerNode.usage.tasks.stolen
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
++taskFunctionWorkerUsage.tasks.stolen
}
}
workerNodeKey: number
): void {
const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
++workerNode.usage.tasks.sequentiallyStolen
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
++taskFunctionWorkerUsage.tasks.sequentiallyStolen
}
}
workerNodeKey: number
): void {
const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
workerNode.usage.tasks.sequentiallyStolen = 0
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
}
}
- private readonly handleIdleWorkerNodeEvent = (
+ private readonly handleWorkerNodeIdleEvent = (
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
- 'WorkerNode event detail workerNodeKey property must be defined'
+ "WorkerNode event detail 'workerNodeKey' property must be defined"
)
}
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
if (
this.cannotStealTask() ||
- (this.info.stealingWorkerNodes as number) >
+ (this.info.stealingWorkerNodes ?? 0) >
Math.floor(this.workerNodes.length / 2)
) {
- if (previousStolenTask != null) {
- this.getWorkerInfo(workerNodeKey).stealing = false
+ if (workerInfo != null && previousStolenTask != null) {
+ workerInfo.stealing = false
}
return
}
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
+ workerInfo != null &&
previousStolenTask != null &&
workerNodeTasksUsage.sequentiallyStolen > 0 &&
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
- this.getWorkerInfo(workerNodeKey).stealing = false
+ workerInfo.stealing = false
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
for (const taskName of this.workerNodes[workerNodeKey].info
- .taskFunctionNames as string[]) {
+ .taskFunctionNames!) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
taskName
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
- this.getWorkerInfo(workerNodeKey).stealing = true
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey}' not found in pool`
+ )
+ }
+ workerInfo.stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
stolenTask != null
) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const taskFunctionTasksWorkerUsage = this.workerNodes[
workerNodeKey
- ].getTaskFunctionWorkerUsage(stolenTask.name as string)
- ?.tasks as TaskStatistics
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
if (
taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
(previousStolenTask != null &&
) {
this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
- stolenTask.name as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ stolenTask.name!
)
} else {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
- stolenTask.name as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ stolenTask.name!
)
}
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
- this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
+ this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
.catch(EMPTY_FUNCTION)
sourceWorkerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
- const task = sourceWorkerNode.popTask() as Task<Data>
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const task = sourceWorkerNode.popTask()!
this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
- this.updateTaskStolenStatisticsWorkerUsage(
- workerNodeKey,
- task.name as string
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
return task
}
}
- private readonly handleBackPressureEvent = (
+ private readonly handleWorkerNodeBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
if (
this.cannotStealTask() ||
- (this.info.stealingWorkerNodes as number) >
+ (this.info.stealingWorkerNodes ?? 0) >
Math.floor(this.workerNodes.length / 2)
) {
return
}
const { workerId } = eventDetail
const sizeOffset = 1
- if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
return
}
const sourceWorkerNode =
!workerNode.info.stealing &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
- (this.opts.tasksQueueOptions?.size as number) - sizeOffset
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.size! - sizeOffset
) {
- this.getWorkerInfo(workerNodeKey).stealing = true
- const task = sourceWorkerNode.popTask() as Task<Data>
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey}' not found in pool`
+ )
+ }
+ workerInfo.stealing = true
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const task = sourceWorkerNode.popTask()!
this.handleTask(workerNodeKey, task)
- this.updateTaskStolenStatisticsWorkerUsage(
- workerNodeKey,
- task.name as string
- )
- this.getWorkerInfo(workerNodeKey).stealing = false
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
+ workerInfo.stealing = false
}
}
}
this.handleTaskExecutionResponse(message)
} else if (taskFunctionNames != null) {
// Task function names message received from worker
- this.getWorkerInfo(
+ const workerInfo = this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(workerId)
- ).taskFunctionNames = taskFunctionNames
+ )
+ if (workerInfo != null) {
+ workerInfo.taskFunctionNames = taskFunctionNames
+ }
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
- if (ready === false) {
- throw new Error(`Worker ${workerId as number} failed to initialize`)
+ if (ready == null || !ready) {
+ throw new Error(`Worker ${workerId} failed to initialize`)
}
- const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- )
- workerInfo.ready = ready as boolean
- workerInfo.taskFunctionNames = taskFunctionNames
+ const workerNode =
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ workerNode.info.ready = ready
+ workerNode.info.taskFunctionNames = taskFunctionNames
if (!this.readyEventEmitted && this.ready) {
- this.readyEventEmitted = true
this.emitter?.emit(PoolEvents.ready, this.info)
+ this.readyEventEmitted = true
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
const { workerId, taskId, workerError, data } = message
- const promiseResponse = this.promiseResponseMap.get(taskId as string)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const promiseResponse = this.promiseResponseMap.get(taskId!)
if (promiseResponse != null) {
const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
const workerNode = this.workerNodes[workerNodeKey]
}
asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
- this.promiseResponseMap.delete(taskId as string)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.promiseResponseMap.delete(taskId!)
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
workerNode?.emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true && !this.destroying) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
workerNodeTasksUsage.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
}
if (
workerNodeTasksUsage.executing === 0 &&
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- workerNode.emit('idleWorkerNode', {
- workerId: workerId as number,
+ workerNode.emit('idle', {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerId: workerId!,
workerNodeKey
})
}
* @param workerNodeKey - The worker node key.
* @returns The worker information.
*/
- protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
+ protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
return this.workerNodes[workerNodeKey]?.info
}
const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey !== -1) {
this.workerNodes.splice(workerNodeKey, 1)
- this.workerChoiceStrategyContext.remove(workerNodeKey)
+ this.workerChoiceStrategyContext?.remove(workerNodeKey)
}
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
- this.getWorkerInfo(workerNodeKey).ready = false
- }
-
- /** @inheritDoc */
- public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
- return (
- this.opts.enableTasksQueue === true &&
- this.workerNodes[workerNodeKey].hasBackPressure()
- )
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo != null) {
+ workerInfo.ready = false
+ }
}
private hasBackPressure (): boolean {
protected flushTasksQueue (workerNodeKey: number): number {
let flushedTasks = 0
while (this.tasksQueueSize(workerNodeKey) > 0) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
++flushedTasks
}
this.workerNodes[workerNodeKey].clearTasksQueue()