MessageValue,
PromiseResponseWrapper,
Task
-} from '../utility-types'
+} from '../utility-types.js'
import {
DEFAULT_TASK_NAME,
EMPTY_FUNCTION,
min,
round,
sleep
-} from '../utils'
-import { KillBehaviors } from '../worker/worker-options'
-import type { TaskFunction } from '../worker/task-functions'
+} from '../utils.js'
+import { KillBehaviors } from '../worker/worker-options.js'
+import type { TaskFunction } from '../worker/task-functions.js'
import {
type IPool,
PoolEvents,
type PoolType,
PoolTypes,
type TasksQueueOptions
-} from './pool'
+} from './pool.js'
import type {
IWorker,
IWorkerNode,
- TaskStatistics,
WorkerInfo,
WorkerNodeEventDetail,
- WorkerType,
- WorkerUsage
-} from './worker'
+ WorkerType
+} from './worker.js'
import {
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
-} from './selection-strategies/selection-strategies-types'
-import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
-import { version } from './version'
-import { WorkerNode } from './worker-node'
+} from './selection-strategies/selection-strategies-types.js'
+import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+import { version } from './version.js'
+import { WorkerNode } from './worker-node.js'
import {
checkFilePath,
checkValidTasksQueueOptions,
updateTaskStatisticsWorkerUsage,
updateWaitTimeWorkerUsage,
waitWorkerNodeEvents
-} from './utils'
+} from './utils.js'
/**
* Base class that implements some shared logic for all poolifier pools.
/**
* Constructs a new poolifier pool.
*
- * @param minimumNumberOfWorkers - Minimum number of workers that this pool should manage.
+ * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
* @param filePath - Path to the worker file.
* @param opts - Options for the pool.
- * @param maximumNumberOfWorkers - Maximum number of workers that this pool should manage.
+ * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
*/
public constructor (
protected readonly minimumNumberOfWorkers: number,
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
- checkValidWorkerChoiceStrategy(
- opts.workerChoiceStrategy as WorkerChoiceStrategy
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy!)
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategyOptions(
- opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ opts.workerChoiceStrategyOptions!
)
if (opts.workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
- checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ checkValidTasksQueueOptions(opts.tasksQueueOptions!)
this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
- opts.tasksQueueOptions as TasksQueueOptions
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ opts.tasksQueueOptions!
)
}
} else {
worker: this.worker,
started: this.started,
ready: this.ready,
- strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ strategy: this.opts.workerChoiceStrategy!,
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
: accumulator,
0
),
+ ...(this.opts.enableTasksQueue === true && {
+ stealingWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.info.stealing ? accumulator + 1 : accumulator,
+ 0
+ )
+ }),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, _workerNode, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
this.workerChoiceStrategyContext.setOptions(
- this,
this.opts.workerChoiceStrategyOptions
)
}
this.flushTasksQueues()
}
this.opts.enableTasksQueue = enable
- this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.setTasksQueueOptions(tasksQueueOptions!)
}
/** @inheritDoc */
checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
- this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
if (this.opts.tasksQueueOptions.taskStealing === true) {
this.unsetTaskStealing()
this.setTaskStealing()
private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].on(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
- )
+ this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
+ 'idle',
+ this.handleWorkerNodeIdleEvent
)
}
}
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
workerNode =>
workerNode.info.ready &&
workerNode.usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
) === -1
)
}
if (this.opts.enableTasksQueue === true) {
return (
this.workerNodes[workerNodeKey].usage.tasks.executing >=
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
)
}
return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- const workerId = this.getWorkerInfo(workerNodeKey).id as number
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const workerId = this.getWorkerInfo(workerNodeKey).id!
if (
message.taskFunctionOperationStatus != null &&
message.workerId === workerId
new Error(
`Task function operation '${
message.taskFunctionOperation as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
}' failed on worker ${message.workerId} with error: '${
- message.workerError?.message as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ message.workerError!.message
}'`
)
)
new Error(
`Task function operation '${
message.taskFunctionOperation as string
- }' failed on worker ${
- errorResponse?.workerId as number
- } with error: '${
- errorResponse?.workerError?.message as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ }' failed on worker ${errorResponse!
+ .workerId!} with error: '${
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ errorResponse!.workerError!.message
}'`
)
)
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
this.workerNodes[workerNodeKey].usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
)
}
timestamp,
taskId: randomUUID()
}
- this.promiseResponseMap.set(task.taskId as string, {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.promiseResponseMap.set(task.taskId!, {
resolve,
reject,
workerNodeKey,
private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
- if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) {
- reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
+ if (this.workerNodes?.[workerNodeKey] == null) {
+ resolve()
return
}
const killMessageListener = (message: MessageValue<Response>): void => {
} else if (message.kill === 'failure') {
reject(
new Error(
- `Kill message handling failed on worker ${
- message.workerId as number
- }`
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ `Kill message handling failed on worker ${message.workerId!}`
)
)
}
}
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
- this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
- task.name as string
- ) != null
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
+ null
) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const taskFunctionWorkerUsage = this.workerNodes[
workerNodeKey
- ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(task.name!)!
++taskFunctionWorkerUsage.tasks.executing
updateWaitTimeWorkerUsage(
this.workerChoiceStrategyContext,
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
- message.taskPerformance?.name as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ message.taskPerformance!.name
) != null
) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const taskFunctionWorkerUsage = this.workerNodes[
workerNodeKey
- ].getTaskFunctionWorkerUsage(
- message.taskPerformance?.name as string
- ) as WorkerUsage
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
updateRunTimeWorkerUsage(
this.workerChoiceStrategyContext,
*
* @returns Whether to create a dynamic worker or not.
*/
- private shallCreateDynamicWorker (): boolean {
- return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
- }
+ protected abstract shallCreateDynamicWorker (): boolean
/**
* Sends a message to worker given its worker node key.
this.emitter?.emit(PoolEvents.error, error)
if (
this.started &&
- !this.starting &&
!this.destroying &&
this.opts.restartWorkerOnError === true
) {
this.createAndSetupWorkerNode()
}
}
- if (this.started && this.opts.enableTasksQueue === true) {
+ if (
+ this.started &&
+ !this.destroying &&
+ this.opts.enableTasksQueue === true
+ ) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
workerNode?.terminate().catch(error => {
})
}
})
- const workerInfo = this.getWorkerInfo(workerNodeKey)
this.sendToWorker(workerNodeKey, {
checkActive: true
})
})
}
}
- workerInfo.dynamic = true
+ const workerNode = this.workerNodes[workerNodeKey]
+ workerNode.info.dynamic = true
if (
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
) {
- workerInfo.ready = true
+ workerNode.info.ready = true
}
this.checkAndEmitDynamicWorkerCreationEvents()
return workerNodeKey
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
this.workerNodes[workerNodeKey].on(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
+ 'idle',
+ this.handleWorkerNodeIdleEvent
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
})
}
+ private cannotStealTask (): boolean {
+ return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
+ }
+
private handleTask (workerNodeKey: number, task: Task<Data>): void {
if (this.shallExecuteTask(workerNodeKey)) {
this.executeTask(workerNodeKey, task)
}
private redistributeQueuedTasks (workerNodeKey: number): void {
- if (workerNodeKey === -1) {
- return
- }
- if (this.workerNodes.length <= 1) {
+ if (workerNodeKey === -1 || this.cannotStealTask()) {
return
}
while (this.tasksQueueSize(workerNodeKey) > 0) {
)
this.handleTask(
destinationWorkerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.dequeueTask(workerNodeKey)!
)
}
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
++taskFunctionWorkerUsage.tasks.stolen
}
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
++taskFunctionWorkerUsage.tasks.sequentiallyStolen
}
}
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
workerNode.getTaskFunctionWorkerUsage(taskName) != null
) {
- const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
- taskName
- ) as WorkerUsage
+ const taskFunctionWorkerUsage =
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerNode.getTaskFunctionWorkerUsage(taskName)!
taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
}
}
- private readonly handleIdleWorkerNodeEvent = (
+ private readonly handleWorkerNodeIdleEvent = (
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
- if (this.workerNodes.length <= 1) {
- return
- }
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
- 'WorkerNode event detail workerNodeKey attribute must be defined'
+ 'WorkerNode event detail workerNodeKey property must be defined'
)
}
+ if (
+ this.cannotStealTask() ||
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
+ ) {
+ if (previousStolenTask != null) {
+ this.getWorkerInfo(workerNodeKey).stealing = false
+ }
+ return
+ }
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
previousStolenTask != null &&
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
+ this.getWorkerInfo(workerNodeKey).stealing = false
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
for (const taskName of this.workerNodes[workerNodeKey].info
- .taskFunctionNames as string[]) {
+ .taskFunctionNames!) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
taskName
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
+ this.getWorkerInfo(workerNodeKey).stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
stolenTask != null
) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const taskFunctionTasksWorkerUsage = this.workerNodes[
workerNodeKey
- ].getTaskFunctionWorkerUsage(stolenTask.name as string)
- ?.tasks as TaskStatistics
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
if (
taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
(previousStolenTask != null &&
) {
this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
- stolenTask.name as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ stolenTask.name!
)
} else {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
- stolenTask.name as string
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ stolenTask.name!
)
}
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
- this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
+ this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
.catch(EMPTY_FUNCTION)
const sourceWorkerNode = workerNodes.find(
(sourceWorkerNode, sourceWorkerNodeKey) =>
sourceWorkerNode.info.ready &&
+ !sourceWorkerNode.info.stealing &&
sourceWorkerNodeKey !== workerNodeKey &&
sourceWorkerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
- const task = sourceWorkerNode.popTask() as Task<Data>
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const task = sourceWorkerNode.popTask()!
this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
- this.updateTaskStolenStatisticsWorkerUsage(
- workerNodeKey,
- task.name as string
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
return task
}
}
- private readonly handleBackPressureEvent = (
+ private readonly handleWorkerNodeBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
- if (this.workerNodes.length <= 1) {
+ if (
+ this.cannotStealTask() ||
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
+ ) {
return
}
const { workerId } = eventDetail
const sizeOffset = 1
- if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
return
}
const sourceWorkerNode =
if (
sourceWorkerNode.usage.tasks.queued > 0 &&
workerNode.info.ready &&
+ !workerNode.info.stealing &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
- (this.opts.tasksQueueOptions?.size as number) - sizeOffset
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.size! - sizeOffset
) {
- const task = sourceWorkerNode.popTask() as Task<Data>
+ this.getWorkerInfo(workerNodeKey).stealing = true
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const task = sourceWorkerNode.popTask()!
this.handleTask(workerNodeKey, task)
- this.updateTaskStolenStatisticsWorkerUsage(
- workerNodeKey,
- task.name as string
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
+ this.getWorkerInfo(workerNodeKey).stealing = false
}
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
- if (ready === false) {
- throw new Error(`Worker ${workerId as number} failed to initialize`)
+ if (ready == null || !ready) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ throw new Error(`Worker ${workerId!} failed to initialize`)
}
- const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- )
- workerInfo.ready = ready as boolean
- workerInfo.taskFunctionNames = taskFunctionNames
+ const workerNode =
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ workerNode.info.ready = ready
+ workerNode.info.taskFunctionNames = taskFunctionNames
if (!this.readyEventEmitted && this.ready) {
this.readyEventEmitted = true
this.emitter?.emit(PoolEvents.ready, this.info)
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
const { workerId, taskId, workerError, data } = message
- const promiseResponse = this.promiseResponseMap.get(taskId as string)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const promiseResponse = this.promiseResponseMap.get(taskId!)
if (promiseResponse != null) {
const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
const workerNode = this.workerNodes[workerNodeKey]
}
asyncResource?.emitDestroy()
this.afterTaskExecutionHook(workerNodeKey, message)
- this.promiseResponseMap.delete(taskId as string)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.promiseResponseMap.delete(taskId!)
workerNode?.emit('taskFinished', taskId)
if (this.opts.enableTasksQueue === true && !this.destroying) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
workerNodeTasksUsage.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
}
if (
workerNodeTasksUsage.executing === 0 &&
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- workerNode.emit('idleWorkerNode', {
- workerId: workerId as number,
+ workerNode.emit('idle', {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerId: workerId!,
workerNodeKey
})
}
}
}
- private checkAndEmitDynamicWorkerCreationEvents (): void {
- if (this.type === PoolTypes.dynamic) {
- if (this.full) {
- this.emitter?.emit(PoolEvents.full, this.info)
- }
- }
- }
+ /**
+ * Emits dynamic worker creation events.
+ */
+ protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
/**
* Gets the worker information given its worker node key.
this.getWorkerInfo(workerNodeKey).ready = false
}
- /** @inheritDoc */
- public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
- return (
- this.opts.enableTasksQueue === true &&
- this.workerNodes[workerNodeKey].hasBackPressure()
- )
- }
-
private hasBackPressure (): boolean {
return (
this.opts.enableTasksQueue === true &&
protected flushTasksQueue (workerNodeKey: number): number {
let flushedTasks = 0
while (this.tasksQueueSize(workerNodeKey) > 0) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
++flushedTasks
}
this.workerNodes[workerNodeKey].clearTasksQueue()