+import { AsyncResource } from 'node:async_hooks'
import { randomUUID } from 'node:crypto'
+import { EventEmitterAsyncResource } from 'node:events'
import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
-import { EventEmitterAsyncResource } from 'node:events'
-import { AsyncResource } from 'node:async_hooks'
+
import type {
MessageValue,
PromiseResponseWrapper,
Task
} from '../utility-types.js'
import {
+ average,
DEFAULT_TASK_NAME,
EMPTY_FUNCTION,
- average,
exponentialDelay,
isKillBehavior,
isPlainObject,
round,
sleep
} from '../utils.js'
-import { KillBehaviors } from '../worker/worker-options.js'
import type { TaskFunction } from '../worker/task-functions.js'
+import { KillBehaviors } from '../worker/worker-options.js'
import {
type IPool,
PoolEvents,
PoolTypes,
type TasksQueueOptions
} from './pool.js'
-import type {
- IWorker,
- IWorkerNode,
- WorkerInfo,
- WorkerNodeEventDetail,
- WorkerType
-} from './worker.js'
import {
Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types.js'
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
-import { version } from './version.js'
-import { WorkerNode } from './worker-node.js'
import {
checkFilePath,
checkValidTasksQueueOptions,
updateWaitTimeWorkerUsage,
waitWorkerNodeEvents
} from './utils.js'
+import { version } from './version.js'
+import type {
+ IWorker,
+ IWorkerNode,
+ WorkerInfo,
+ WorkerNodeEventDetail,
+ WorkerType
+} from './worker.js'
+import { WorkerNode } from './worker-node.js'
/**
* Base class that implements some shared logic for all poolifier pools.
/**
* Worker choice strategy context referencing a worker choice algorithm implementation.
*/
- protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+ protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
Worker,
Data,
Response
* Whether the pool is destroying or not.
*/
private destroying: boolean
+ /**
+ * Whether the minimum number of workers is starting or not.
+ */
+ private startingMinimumNumberOfWorkers: boolean
/**
* Whether the pool ready event has been emitted or not.
*/
this.starting = false
this.destroying = false
this.readyEventEmitted = false
+ this.startingMinimumNumberOfWorkers = false
if (this.opts.startWorkers === true) {
this.start()
}
}
}
- private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
+ private checkMinimumNumberOfWorkers (
+ minimumNumberOfWorkers: number | undefined
+ ): void {
if (minimumNumberOfWorkers == null) {
throw new Error(
'Cannot instantiate a pool without specifying the number of workers'
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy!)
+ checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategyOptions(
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- opts.workerChoiceStrategyOptions!
+ opts.workerChoiceStrategyOptions
)
if (opts.workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- checkValidTasksQueueOptions(opts.tasksQueueOptions!)
+ checkValidTasksQueueOptions(opts.tasksQueueOptions)
this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- opts.tasksQueueOptions!
+ opts.tasksQueueOptions
)
}
} else {
}
private checkValidWorkerChoiceStrategyOptions (
- workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
): void {
if (
workerChoiceStrategyOptions != null &&
ready: this.ready,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
strategy: this.opts.workerChoiceStrategy!,
+ strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .runTime.aggregate &&
- this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .waitTime.aggregate && { utilization: round(this.utilization) }),
+ .runTime.aggregate === true &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && {
+ utilization: round(this.utilization)
+ }),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
...(this.opts.enableTasksQueue === true && {
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
+ accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
0
)
}),
0
),
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .runTime.aggregate && {
+ .runTime.aggregate === true && {
runTime: {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime?.minimum ?? Infinity
+ workerNode => workerNode.usage.runTime.minimum ?? Infinity
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
+ workerNode => workerNode.usage.runTime.maximum ?? -Infinity
)
)
),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.runTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.runTime.median && {
median: round(
median(
}
}),
...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
- .waitTime.aggregate && {
+ .waitTime.aggregate === true && {
waitTime: {
minimum: round(
min(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
+ workerNode => workerNode.usage.waitTime.minimum ?? Infinity
)
)
),
maximum: round(
max(
...this.workerNodes.map(
- workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
+ workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
)
)
),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.waitTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.waitTime.median && {
median: round(
median(
* The pool readiness boolean status.
*/
private get ready (): boolean {
+ if (this.empty) {
+ return false
+ }
return (
this.workerNodes.reduce(
(accumulator, workerNode) =>
)
}
+ /**
+ * The pool emptiness boolean status.
+ */
+ protected get empty (): boolean {
+ return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
+ }
+
/**
* The approximate pool utilization.
*
(this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
+ accumulator + (workerNode.usage.runTime.aggregate ?? 0),
0
)
const totalTasksWaitTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
+ accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
0
)
return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
): void {
checkValidWorkerChoiceStrategy(workerChoiceStrategy)
this.opts.workerChoiceStrategy = workerChoiceStrategy
- this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
this.opts.workerChoiceStrategy
)
if (workerChoiceStrategyOptions != null) {
/** @inheritDoc */
public setWorkerChoiceStrategyOptions (
- workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
): void {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
if (workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
- this.workerChoiceStrategyContext.setOptions(
+ this.workerChoiceStrategyContext?.setOptions(
this.opts.workerChoiceStrategyOptions
)
}
this.flushTasksQueues()
}
this.opts.enableTasksQueue = enable
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.setTasksQueueOptions(tasksQueueOptions!)
+ this.setTasksQueueOptions(tasksQueueOptions)
}
/** @inheritDoc */
- public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
+ public setTasksQueueOptions (
+ tasksQueueOptions: TasksQueueOptions | undefined
+ ): void {
if (this.opts.enableTasksQueue === true) {
checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
}
private buildTasksQueueOptions (
- tasksQueueOptions: TasksQueueOptions
+ tasksQueueOptions: TasksQueueOptions | undefined
): TasksQueueOptions {
return {
...getDefaultTasksQueueOptions(
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const workerId = this.getWorkerInfo(workerNodeKey).id!
+ const workerId = this.getWorkerInfo(workerNodeKey)?.id
if (
message.taskFunctionOperationStatus != null &&
message.workerId === workerId
) {
if (message.taskFunctionOperationStatus) {
resolve(true)
- } else if (!message.taskFunctionOperationStatus) {
+ } else {
reject(
new Error(
- `Task function operation '${
- message.taskFunctionOperation as string
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- }' failed on worker ${message.workerId} with error: '${
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- message.workerError!.message
- }'`
+ `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
)
)
}
new Error(
`Task function operation '${
message.taskFunctionOperation as string
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- }' failed on worker ${errorResponse!
- .workerId!} with error: '${
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- errorResponse!.workerError!.message
+ }' failed on worker ${errorResponse?.workerId} with error: '${
+ errorResponse?.workerError?.message
}'`
)
)
})
}
+ /**
+ * Starts the minimum number of workers.
+ */
+ private startMinimumNumberOfWorkers (): void {
+ this.startingMinimumNumberOfWorkers = true
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.minimumNumberOfWorkers
+ ) {
+ this.createAndSetupWorkerNode()
+ }
+ this.startingMinimumNumberOfWorkers = false
+ }
+
/** @inheritdoc */
public start (): void {
if (this.started) {
throw new Error('Cannot start a destroying pool')
}
this.starting = true
- while (
- this.workerNodes.reduce(
- (accumulator, workerNode) =>
- !workerNode.info.dynamic ? accumulator + 1 : accumulator,
- 0
- ) < this.minimumNumberOfWorkers
- ) {
- this.createAndSetupWorkerNode()
- }
+ this.startMinimumNumberOfWorkers()
this.starting = false
this.started = true
}
)
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
- this.emitter?.removeAllListeners()
this.readyEventEmitted = false
this.destroying = false
this.started = false
private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
- if (this.workerNodes?.[workerNodeKey] == null) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (this.workerNodes[workerNodeKey] == null) {
resolve()
return
}
} else if (message.kill === 'failure') {
reject(
new Error(
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- `Kill message handling failed on worker ${message.workerId!}`
+ `Kill message handling failed on worker ${message.workerId}`
)
)
}
workerNodeKey: number,
task: Task<Data>
): void {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
message: MessageValue<Response>
): void {
let needWorkerChoiceStrategyUpdate = false
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
updateTaskStatisticsWorkerUsage(workerUsage, message)
needWorkerChoiceStrategyUpdate = true
}
if (needWorkerChoiceStrategyUpdate) {
- this.workerChoiceStrategyContext.update(workerNodeKey)
+ this.workerChoiceStrategyContext?.update(workerNodeKey)
}
}
if (this.shallCreateDynamicWorker()) {
const workerNodeKey = this.createAndSetupDynamicWorkerNode()
if (
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ this.workerChoiceStrategyContext?.getStrategyPolicy()
+ .dynamicWorkerUsage === true
) {
return workerNodeKey
}
}
- return this.workerChoiceStrategyContext.execute()
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ return this.workerChoiceStrategyContext!.execute()
}
/**
'error',
this.opts.errorHandler ?? EMPTY_FUNCTION
)
- workerNode.registerWorkerEventHandler('error', (error: Error) => {
+ workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
workerNode.info.ready = false
this.emitter?.emit(PoolEvents.error, error)
if (
) {
if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
- } else {
- this.createAndSetupWorkerNode()
+ } else if (!this.startingMinimumNumberOfWorkers) {
+ this.startMinimumNumberOfWorkers()
}
}
if (
) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
- workerNode?.terminate().catch(error => {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode?.terminate().catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
})
)
workerNode.registerOnceWorkerEventHandler('exit', () => {
this.removeWorkerNode(workerNode)
+ if (
+ this.started &&
+ !this.startingMinimumNumberOfWorkers &&
+ !this.destroying
+ ) {
+ this.startMinimumNumberOfWorkers()
+ }
})
const workerNodeKey = this.addWorkerNode(workerNode)
this.afterWorkerNodeSetup(workerNodeKey)
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)
- const workerUsage = this.workerNodes[localWorkerNodeKey].usage
+ const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
// Kill message received from worker
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
) {
// Flag the worker node as not ready immediately
this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
- this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
+ this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
}
taskFunctionOperation: 'add',
taskFunctionName,
taskFunction: taskFunction.toString()
- }).catch(error => {
+ }).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
}
const workerNode = this.workerNodes[workerNodeKey]
workerNode.info.dynamic = true
if (
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
- this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ this.workerChoiceStrategyContext?.getStrategyPolicy()
+ .dynamicWorkerReady === true ||
+ this.workerChoiceStrategyContext?.getStrategyPolicy()
+ .dynamicWorkerUsage === true
) {
workerNode.info.ready = true
}
this.sendToWorker(workerNodeKey, {
statistics: {
runTime:
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .runTime.aggregate,
- elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .elu.aggregate
+ this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate ?? false,
+ elu:
+ this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
+ .aggregate ?? false
}
})
}
taskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
++workerNode.usage.tasks.stolen
}
workerNodeKey: number
): void {
const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
++workerNode.usage.tasks.sequentiallyStolen
}
workerNodeKey: number
): void {
const workerNode = this.workerNodes[workerNodeKey]
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
workerNode.usage.tasks.sequentiallyStolen = 0
}
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
- 'WorkerNode event detail workerNodeKey property must be defined'
+ "WorkerNode event detail 'workerNodeKey' property must be defined"
)
}
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
if (
this.cannotStealTask() ||
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
+ (this.info.stealingWorkerNodes ?? 0) >
+ Math.floor(this.workerNodes.length / 2)
) {
- if (previousStolenTask != null) {
- this.getWorkerInfo(workerNodeKey).stealing = false
+ if (workerInfo != null && previousStolenTask != null) {
+ workerInfo.stealing = false
}
return
}
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
+ workerInfo != null &&
previousStolenTask != null &&
workerNodeTasksUsage.sequentiallyStolen > 0 &&
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
- this.getWorkerInfo(workerNodeKey).stealing = false
+ workerInfo.stealing = false
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- for (const taskName of this.workerNodes[workerNodeKey].info
- .taskFunctionNames!) {
+ for (const taskName of workerInfo.taskFunctionNames!) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
taskName
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
- this.getWorkerInfo(workerNodeKey).stealing = true
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey}' not found in pool`
+ )
+ }
+ workerInfo.stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
- .catch(EMPTY_FUNCTION)
+ .catch((error: unknown) => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
}
private readonly workerNodeStealTask = (
): void => {
if (
this.cannotStealTask() ||
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2)
+ (this.info.stealingWorkerNodes ?? 0) >
+ Math.floor(this.workerNodes.length / 2)
) {
return
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.opts.tasksQueueOptions!.size! - sizeOffset
) {
- this.getWorkerInfo(workerNodeKey).stealing = true
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ throw new Error(
+ `Worker node with key '${workerNodeKey}' not found in pool`
+ )
+ }
+ workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const task = sourceWorkerNode.popTask()!
this.handleTask(workerNodeKey, task)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
- this.getWorkerInfo(workerNodeKey).stealing = false
+ workerInfo.stealing = false
}
}
}
this.handleTaskExecutionResponse(message)
} else if (taskFunctionNames != null) {
// Task function names message received from worker
- this.getWorkerInfo(
+ const workerInfo = this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(workerId)
- ).taskFunctionNames = taskFunctionNames
+ )
+ if (workerInfo != null) {
+ workerInfo.taskFunctionNames = taskFunctionNames
+ }
+ }
+ }
+
+ private checkAndEmitReadyEvent (): void {
+ if (!this.readyEventEmitted && this.ready) {
+ this.emitter?.emit(PoolEvents.ready, this.info)
+ this.readyEventEmitted = true
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
if (ready == null || !ready) {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- throw new Error(`Worker ${workerId!} failed to initialize`)
+ throw new Error(`Worker ${workerId} failed to initialize`)
}
const workerNode =
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
workerNode.info.ready = ready
workerNode.info.taskFunctionNames = taskFunctionNames
- if (!this.readyEventEmitted && this.ready) {
- this.readyEventEmitted = true
- this.emitter?.emit(PoolEvents.ready, this.info)
- }
+ this.checkAndEmitReadyEvent()
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
this.afterTaskExecutionHook(workerNodeKey, message)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.delete(taskId!)
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
workerNode?.emit('taskFinished', taskId)
- if (this.opts.enableTasksQueue === true && !this.destroying) {
+ if (
+ this.opts.enableTasksQueue === true &&
+ !this.destroying &&
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ workerNode != null
+ ) {
const workerNodeTasksUsage = workerNode.usage.tasks
if (
this.tasksQueueSize(workerNodeKey) > 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
workerNode.emit('idle', {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- workerId: workerId!,
+ workerId,
workerNodeKey
})
}
* @param workerNodeKey - The worker node key.
* @returns The worker information.
*/
- protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
+ protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
return this.workerNodes[workerNodeKey]?.info
}
return workerNodeKey
}
+ private checkAndEmitEmptyEvent (): void {
+ if (this.empty) {
+ this.emitter?.emit(PoolEvents.empty, this.info)
+ this.readyEventEmitted = false
+ }
+ }
+
/**
* Removes the worker node from the pool worker nodes.
*
const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey !== -1) {
this.workerNodes.splice(workerNodeKey, 1)
- this.workerChoiceStrategyContext.remove(workerNodeKey)
+ this.workerChoiceStrategyContext?.remove(workerNodeKey)
}
+ this.checkAndEmitEmptyEvent()
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
- this.getWorkerInfo(workerNodeKey).ready = false
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo != null) {
+ workerInfo.ready = false
+ }
}
private hasBackPressure (): boolean {