-} from '../utils'
-import { KillBehaviors } from '../worker/worker-options'
-import type { TaskFunction } from '../worker/task-functions'
+} from '../utils.js'
+import { KillBehaviors } from '../worker/worker-options.js'
+import type { TaskFunction } from '../worker/task-functions.js'
import {
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
import {
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
-} from './selection-strategies/selection-strategies-types'
-import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
-import { version } from './version'
-import { WorkerNode } from './worker-node'
+} from './selection-strategies/selection-strategies-types.js'
+import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+import { version } from './version.js'
+import { WorkerNode } from './worker-node.js'
updateTaskStatisticsWorkerUsage,
updateWaitTimeWorkerUsage,
waitWorkerNodeEvents
updateTaskStatisticsWorkerUsage,
updateWaitTimeWorkerUsage,
waitWorkerNodeEvents
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
- checkValidWorkerChoiceStrategy(
- opts.workerChoiceStrategy as WorkerChoiceStrategy
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy!)
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategyOptions(
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategyOptions(
)
if (opts.workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
)
if (opts.workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.opts.enableTasksQueue === true && {
+ stealingWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.info.stealing ? accumulator + 1 : accumulator,
+ 0
+ )
+ }),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, _workerNode, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, _workerNode, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
this.workerChoiceStrategyContext.setOptions(
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
this.workerChoiceStrategyContext.setOptions(
checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
- }' failed on worker ${
- errorResponse?.workerId as number
- } with error: '${
- errorResponse?.workerError?.message as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ }' failed on worker ${errorResponse!
+ .workerId!} with error: '${
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ errorResponse!.workerError!.message
private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
- `Kill message handling failed on worker ${
- message.workerId as number
- }`
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ `Kill message handling failed on worker ${message.workerId!}`
++taskFunctionWorkerUsage.tasks.executing
updateWaitTimeWorkerUsage(
this.workerChoiceStrategyContext,
++taskFunctionWorkerUsage.tasks.executing
updateWaitTimeWorkerUsage(
this.workerChoiceStrategyContext,
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
- ].getTaskFunctionWorkerUsage(
- message.taskPerformance?.name as string
- ) as WorkerUsage
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
updateRunTimeWorkerUsage(
this.workerChoiceStrategyContext,
updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
updateRunTimeWorkerUsage(
this.workerChoiceStrategyContext,
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
workerNode?.terminate().catch(error => {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
workerNode?.terminate().catch(error => {
if (
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
) {
if (
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
) {
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
this.workerNodes[workerNodeKey].on(
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
this.workerNodes[workerNodeKey].on(
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
this.workerNodes[workerNodeKey].on(
'backPressure',
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
this.workerNodes[workerNodeKey].on(
'backPressure',
private handleTask (workerNodeKey: number, task: Task<Data>): void {
if (this.shallExecuteTask(workerNodeKey)) {
this.executeTask(workerNodeKey, task)
private handleTask (workerNodeKey: number, task: Task<Data>): void {
if (this.shallExecuteTask(workerNodeKey)) {
this.executeTask(workerNodeKey, task)
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
+ if (
+ this.cannotStealTask() ||
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
+ ) {
+ if (previousStolenTask != null) {
+ this.getWorkerInfo(workerNodeKey).stealing = false
+ }
+ return
+ }
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
previousStolenTask != null &&
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
previousStolenTask != null &&
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
stolenTask != null
) {
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
stolenTask != null
) {
- ].getTaskFunctionWorkerUsage(stolenTask.name as string)
- ?.tasks as TaskStatistics
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
const sourceWorkerNode = workerNodes.find(
(sourceWorkerNode, sourceWorkerNodeKey) =>
sourceWorkerNode.info.ready &&
const sourceWorkerNode = workerNodes.find(
(sourceWorkerNode, sourceWorkerNodeKey) =>
sourceWorkerNode.info.ready &&
sourceWorkerNodeKey !== workerNodeKey &&
sourceWorkerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
sourceWorkerNodeKey !== workerNodeKey &&
sourceWorkerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
- if (this.workerNodes.length <= 1) {
+ if (
+ this.cannotStealTask() ||
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
+ ) {
- const task = sourceWorkerNode.popTask() as Task<Data>
+ this.getWorkerInfo(workerNodeKey).stealing = true
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const task = sourceWorkerNode.popTask()!
- this.updateTaskStolenStatisticsWorkerUsage(
- workerNodeKey,
- task.name as string
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
+ this.getWorkerInfo(workerNodeKey).stealing = false
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
- if (ready === false) {
- throw new Error(`Worker ${workerId as number} failed to initialize`)
+ if (ready == null || !ready) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ throw new Error(`Worker ${workerId!} failed to initialize`)
- const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- )
- workerInfo.ready = ready as boolean
- workerInfo.taskFunctionNames = taskFunctionNames
+ const workerNode =
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ workerNode.info.ready = ready
+ workerNode.info.taskFunctionNames = taskFunctionNames
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
const { workerId, taskId, workerError, data } = message
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
const { workerId, taskId, workerError, data } = message
if (promiseResponse != null) {
const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
const workerNode = this.workerNodes[workerNodeKey]
if (promiseResponse != null) {
const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
const workerNode = this.workerNodes[workerNodeKey]
}
asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
}
asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
workerNode?.emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true && !this.destroying) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
workerNodeTasksUsage.executing <
workerNode?.emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true && !this.destroying) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
workerNodeTasksUsage.executing <
}
if (
workerNodeTasksUsage.executing === 0 &&
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
}
if (
workerNodeTasksUsage.executing === 0 &&
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- workerNode.emit('idleWorkerNode', {
- workerId: workerId as number,
+ workerNode.emit('idle', {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerId: workerId!,
- private checkAndEmitDynamicWorkerCreationEvents (): void {
- if (this.type === PoolTypes.dynamic) {
- if (this.full) {
- this.emitter?.emit(PoolEvents.full, this.info)
- }
- }
- }
+ /**
+ * Emits dynamic worker creation events.
+ */
+ protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
protected flushTasksQueue (workerNodeKey: number): number {
let flushedTasks = 0
while (this.tasksQueueSize(workerNodeKey) > 0) {
protected flushTasksQueue (workerNodeKey: number): number {
let flushedTasks = 0
while (this.tasksQueueSize(workerNodeKey) > 0) {