/**
* Worker choice strategy context referencing a worker choice algorithm implementation.
*/
- protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+ protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
Worker,
Data,
Response
}
}
- private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
+ private checkMinimumNumberOfWorkers (
+ minimumNumberOfWorkers: number | undefined
+ ): void {
if (minimumNumberOfWorkers == null) {
throw new Error(
'Cannot instantiate a pool without specifying the number of workers'
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy!)
+ checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategyOptions(
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- opts.workerChoiceStrategyOptions!
+ opts.workerChoiceStrategyOptions
)
if (opts.workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- checkValidTasksQueueOptions(opts.tasksQueueOptions!)
+ checkValidTasksQueueOptions(opts.tasksQueueOptions)
this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- opts.tasksQueueOptions!
+ opts.tasksQueueOptions
)
}
} else {
}
private checkValidWorkerChoiceStrategyOptions (
- workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
): void {
if (
workerChoiceStrategyOptions != null &&
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .runTime.aggregate &&
- this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .waitTime.aggregate && { utilization: round(this.utilization) }),
+ ?.runTime.aggregate === true &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && {
+ utilization: round(this.utilization)
+ }),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
...(this.opts.enableTasksQueue === true && {
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
+ accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
0
)
}),
0
),
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .runTime.aggregate && {
+ ?.runTime.aggregate === true && {
runTime: {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime?.minimum ?? Infinity
+ workerNode => workerNode.usage.runTime.minimum ?? Infinity
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
+ workerNode => workerNode.usage.runTime.maximum ?? -Infinity
)
)
),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.runTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.runTime.median && {
median: round(
median(
}
}),
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .waitTime.aggregate && {
+ ?.waitTime.aggregate === true && {
waitTime: {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
+ workerNode => workerNode.usage.waitTime.minimum ?? Infinity
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
+ workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
)
)
),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.waitTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.waitTime.median && {
median: round(
median(
(this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
+ accumulator + (workerNode.usage.runTime.aggregate ?? 0),
0
)
const totalTasksWaitTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
+ accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
0
)
return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
): void {
checkValidWorkerChoiceStrategy(workerChoiceStrategy)
this.opts.workerChoiceStrategy = workerChoiceStrategy
- this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
this.opts.workerChoiceStrategy
)
if (workerChoiceStrategyOptions != null) {
/** @inheritDoc */
public setWorkerChoiceStrategyOptions (
- workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
): void {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
if (workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
- this.workerChoiceStrategyContext.setOptions(
+ this.workerChoiceStrategyContext?.setOptions(
this.opts.workerChoiceStrategyOptions
)
}
this.flushTasksQueues()
}
this.opts.enableTasksQueue = enable
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.setTasksQueueOptions(tasksQueueOptions!)
+ this.setTasksQueueOptions(tasksQueueOptions)
}
/** @inheritDoc */
- public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
+ public setTasksQueueOptions (
+ tasksQueueOptions: TasksQueueOptions | undefined
+ ): void {
if (this.opts.enableTasksQueue === true) {
checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
}
private buildTasksQueueOptions (
- tasksQueueOptions: TasksQueueOptions
+ tasksQueueOptions: TasksQueueOptions | undefined
): TasksQueueOptions {
return {
...getDefaultTasksQueueOptions(
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const workerId = this.getWorkerInfo(workerNodeKey).id!
+ const workerId = this.getWorkerInfo(workerNodeKey).id
if (
message.taskFunctionOperationStatus != null &&
message.workerId === workerId
) {
if (message.taskFunctionOperationStatus) {
resolve(true)
- } else if (!message.taskFunctionOperationStatus) {
+ } else {
reject(
new Error(
- `Task function operation '${
- message.taskFunctionOperation as string
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- }' failed on worker ${message.workerId} with error: '${
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- message.workerError!.message
- }'`
+ `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
)
)
}
new Error(
`Task function operation '${
message.taskFunctionOperation as string
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- }' failed on worker ${errorResponse!
- .workerId!} with error: '${
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- errorResponse!.workerError!.message
+ }' failed on worker ${errorResponse?.workerId} with error: '${
+ errorResponse?.workerError?.message
}'`
)
)
private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
- if (this.workerNodes?.[workerNodeKey] == null) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (this.workerNodes[workerNodeKey] == null) {
resolve()
return
}
} else if (message.kill === 'failure') {
reject(
new Error(
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- `Kill message handling failed on worker ${message.workerId!}`
+ `Kill message handling failed on worker ${message.workerId}`
)
)
}
workerNodeKey: number,
task: Task<Data>
): void {
- if (this.workerNodes[workerNodeKey]?.usage != null) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (this.workerNodes[workerNodeKey].usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
updateWaitTimeWorkerUsage(
message: MessageValue<Response>
): void {
let needWorkerChoiceStrategyUpdate = false
- if (this.workerNodes[workerNodeKey]?.usage != null) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (this.workerNodes[workerNodeKey].usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
updateTaskStatisticsWorkerUsage(workerUsage, message)
updateRunTimeWorkerUsage(
needWorkerChoiceStrategyUpdate = true
}
if (needWorkerChoiceStrategyUpdate) {
- this.workerChoiceStrategyContext.update(workerNodeKey)
+ this.workerChoiceStrategyContext?.update(workerNodeKey)
}
}
private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
- workerInfo != null &&
Array.isArray(workerInfo.taskFunctionNames) &&
workerInfo.taskFunctionNames.length > 2
)
if (this.shallCreateDynamicWorker()) {
const workerNodeKey = this.createAndSetupDynamicWorkerNode()
if (
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ this.workerChoiceStrategyContext?.getStrategyPolicy()
+ .dynamicWorkerUsage === true
) {
return workerNodeKey
}
}
- return this.workerChoiceStrategyContext.execute()
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ return this.workerChoiceStrategyContext!.execute()!
}
/**
) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
- workerNode?.terminate().catch(error => {
+ workerNode.terminate().catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
})
const workerNode = this.workerNodes[workerNodeKey]
workerNode.info.dynamic = true
if (
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ this.workerChoiceStrategyContext?.getStrategyPolicy()
+ .dynamicWorkerReady === true ||
+ this.workerChoiceStrategyContext?.getStrategyPolicy()
+ .dynamicWorkerUsage === true
) {
workerNode.info.ready = true
}
this.sendToWorker(workerNodeKey, {
statistics: {
runTime:
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .runTime.aggregate,
- elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .elu.aggregate
+ this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ?.runTime.aggregate ?? false,
+ elu:
+ this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu
+ .aggregate ?? false
}
})
}
taskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
- if (workerNode?.usage != null) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (workerNode.usage != null) {
++workerNode.usage.tasks.stolen
}
if (
workerNodeKey: number
): void {
const workerNode = this.workerNodes[workerNodeKey]
- if (workerNode?.usage != null) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (workerNode.usage != null) {
++workerNode.usage.tasks.sequentiallyStolen
}
}
workerNodeKey: number
): void {
const workerNode = this.workerNodes[workerNodeKey]
- if (workerNode?.usage != null) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (workerNode.usage != null) {
workerNode.usage.tasks.sequentiallyStolen = 0
}
}
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
- 'WorkerNode event detail workerNodeKey property must be defined'
+ "WorkerNode event detail 'workerNodeKey' property must be defined"
)
}
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
if (
this.cannotStealTask() ||
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
+ (this.info.stealingWorkerNodes ?? 0) >
+ Math.floor(this.workerNodes.length / 2)
) {
if (previousStolenTask != null) {
- this.getWorkerInfo(workerNodeKey).stealing = false
+ workerInfo.stealing = false
}
return
}
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
- this.getWorkerInfo(workerNodeKey).stealing = false
+ workerInfo.stealing = false
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
for (const taskName of this.workerNodes[workerNodeKey].info
.taskFunctionNames!) {
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
- this.getWorkerInfo(workerNodeKey).stealing = true
+ workerInfo.stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
): void => {
if (
this.cannotStealTask() ||
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
+ (this.info.stealingWorkerNodes ?? 0) >
+ Math.floor(this.workerNodes.length / 2)
) {
return
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.opts.tasksQueueOptions!.size! - sizeOffset
) {
- this.getWorkerInfo(workerNodeKey).stealing = true
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const task = sourceWorkerNode.popTask()!
this.handleTask(workerNodeKey, task)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
- this.getWorkerInfo(workerNodeKey).stealing = false
+ workerInfo.stealing = false
}
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
if (ready == null || !ready) {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- throw new Error(`Worker ${workerId!} failed to initialize`)
+ throw new Error(`Worker ${workerId} failed to initialize`)
}
const workerNode =
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
this.afterTaskExecutionHook(workerNodeKey, message)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.delete(taskId!)
- workerNode?.emit('taskFinished', taskId)
+ workerNode.emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true && !this.destroying) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
* @returns The worker information.
*/
protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
- return this.workerNodes[workerNodeKey]?.info
+ const workerInfo = this.workerNodes[workerNodeKey]?.info
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (workerInfo == null) {
+ throw new Error(`Worker node with key '${workerNodeKey}' not found`)
+ }
+ return workerInfo
}
/**
const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey !== -1) {
this.workerNodes.splice(workerNodeKey, 1)
- this.workerChoiceStrategyContext.remove(workerNodeKey)
+ this.workerChoiceStrategyContext?.remove(workerNodeKey)
}
}
}
}
-export const checkFilePath = (filePath: string): void => {
+export const checkFilePath = (filePath: string | undefined): void => {
if (filePath == null) {
throw new TypeError('The worker file path must be specified')
}
}
}
-export const checkDynamicPoolSize = (min: number, max: number): void => {
+export const checkDynamicPoolSize = (
+ min: number,
+ max: number | undefined
+): void => {
if (max == null) {
throw new TypeError(
'Cannot instantiate a dynamic pool without specifying the maximum pool size'
}
export const checkValidWorkerChoiceStrategy = (
- workerChoiceStrategy: WorkerChoiceStrategy
+ workerChoiceStrategy: WorkerChoiceStrategy | undefined
): void => {
if (
workerChoiceStrategy != null &&
}
export const checkValidTasksQueueOptions = (
- tasksQueueOptions: TasksQueueOptions
+ tasksQueueOptions: TasksQueueOptions | undefined
): void => {
if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
throw new TypeError('Invalid tasks queue options: must be a plain object')
}
export const checkWorkerNodeArguments = (
- type: WorkerType,
- filePath: string,
- opts: WorkerNodeOptions
+ type: WorkerType | undefined,
+ filePath: string | undefined,
+ opts: WorkerNodeOptions | undefined
): void => {
if (type == null) {
throw new TypeError('Cannot construct a worker node without a worker type')
*/
const updateMeasurementStatistics = (
measurementStatistics: MeasurementStatistics,
- measurementRequirements: MeasurementStatisticsRequirements,
- measurementValue: number
+ measurementRequirements: MeasurementStatisticsRequirements | undefined,
+ measurementValue: number | undefined
): void => {
- if (measurementRequirements.aggregate) {
+ if (
+ measurementRequirements != null &&
+ measurementValue != null &&
+ measurementRequirements.aggregate
+ ) {
measurementStatistics.aggregate =
(measurementStatistics.aggregate ?? 0) + measurementValue
measurementStatistics.minimum = min(
measurementValue,
measurementStatistics.maximum ?? -Infinity
)
- if (
- (measurementRequirements.average || measurementRequirements.median) &&
- measurementValue != null
- ) {
+ if (measurementRequirements.average || measurementRequirements.median) {
measurementStatistics.history.push(measurementValue)
if (measurementRequirements.average) {
measurementStatistics.average = average(measurementStatistics.history)
Data = unknown,
Response = unknown
>(
- workerChoiceStrategyContext: WorkerChoiceStrategyContext<
- Worker,
- Data,
- Response
- >,
+ workerChoiceStrategyContext:
+ | WorkerChoiceStrategyContext<Worker, Data, Response>
+ | undefined,
workerUsage: WorkerUsage,
task: Task<Data>
): void => {
const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
updateMeasurementStatistics(
workerUsage.waitTime,
- workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
+ workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.waitTime,
taskWaitTime
)
}
): void => {
const workerTaskStatistics = workerUsage.tasks
if (
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
workerTaskStatistics.executing != null &&
workerTaskStatistics.executing > 0
) {
Data = unknown,
Response = unknown
>(
- workerChoiceStrategyContext: WorkerChoiceStrategyContext<
- Worker,
- Data,
- Response
- >,
+ workerChoiceStrategyContext:
+ | WorkerChoiceStrategyContext<Worker, Data, Response>
+ | undefined,
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void => {
}
updateMeasurementStatistics(
workerUsage.runTime,
- workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
+ workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.runTime,
message.taskPerformance?.runTime ?? 0
)
}
Data = unknown,
Response = unknown
>(
- workerChoiceStrategyContext: WorkerChoiceStrategyContext<
- Worker,
- Data,
- Response
- >,
+ workerChoiceStrategyContext:
+ | WorkerChoiceStrategyContext<Worker, Data, Response>
+ | undefined,
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void => {
if (message.workerError != null) {
return
}
- const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
- workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ const eluTaskStatisticsRequirements =
+ workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu
updateMeasurementStatistics(
workerUsage.elu.active,
eluTaskStatisticsRequirements,
eluTaskStatisticsRequirements,
message.taskPerformance?.elu?.idle ?? 0
)
- if (eluTaskStatisticsRequirements.aggregate) {
+ if (eluTaskStatisticsRequirements?.aggregate === true) {
if (message.taskPerformance?.elu != null) {
if (workerUsage.elu.utilization != null) {
workerUsage.elu.utilization =
case WorkerTypes.thread:
return new Worker(filePath, {
env: SHARE_ENV,
- ...opts?.workerOptions
+ ...opts.workerOptions
}) as unknown as Worker
case WorkerTypes.cluster:
- return cluster.fork(opts?.env) as unknown as Worker
+ return cluster.fork(opts.env) as unknown as Worker
default:
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
throw new Error(`Unknown worker type '${type}'`)