: accumulator,
0
),
+ ...(this.opts.enableTasksQueue === true && {
+ stealingWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.info.stealing ? accumulator + 1 : accumulator,
+ 0
+ )
+ }),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, _workerNode, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
- if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) {
- reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
+ if (this.workerNodes?.[workerNodeKey] == null) {
+ resolve()
return
}
const killMessageListener = (message: MessageValue<Response>): void => {
*
* @returns Whether to create a dynamic worker or not.
*/
- private shallCreateDynamicWorker (): boolean {
- return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
- }
+ protected abstract shallCreateDynamicWorker (): boolean
/**
* Sends a message to worker given its worker node key.
})
}
+ private cannotStealTask (): boolean {
+ return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
+ }
+
private handleTask (workerNodeKey: number, task: Task<Data>): void {
if (this.shallExecuteTask(workerNodeKey)) {
this.executeTask(workerNodeKey, task)
if (workerNodeKey === -1) {
return
}
- if (this.workerNodes.length <= 1) {
+ if (this.cannotStealTask()) {
return
}
while (this.tasksQueueSize(workerNodeKey) > 0) {
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
- if (this.workerNodes.length <= 1) {
- return
- }
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
- 'WorkerNode event detail workerNodeKey attribute must be defined'
+ 'WorkerNode event detail workerNodeKey property must be defined'
)
}
+ if (
+ this.cannotStealTask() ||
+ (this.info.stealingWorkerNodes as number) >
+ Math.floor(this.workerNodes.length / 2)
+ ) {
+ if (previousStolenTask != null) {
+ this.getWorkerInfo(workerNodeKey).stealing = false
+ }
+ return
+ }
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
previousStolenTask != null &&
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
+ this.getWorkerInfo(workerNodeKey).stealing = false
for (const taskName of this.workerNodes[workerNodeKey].info
.taskFunctionNames as string[]) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
+ this.getWorkerInfo(workerNodeKey).stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
const sourceWorkerNode = workerNodes.find(
(sourceWorkerNode, sourceWorkerNodeKey) =>
sourceWorkerNode.info.ready &&
+ !sourceWorkerNode.info.stealing &&
sourceWorkerNodeKey !== workerNodeKey &&
sourceWorkerNode.usage.tasks.queued > 0
)
private readonly handleBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
- if (this.workerNodes.length <= 1) {
+ if (
+ this.cannotStealTask() ||
+ (this.info.stealingWorkerNodes as number) >
+ Math.floor(this.workerNodes.length / 2)
+ ) {
return
}
const { workerId } = eventDetail
if (
sourceWorkerNode.usage.tasks.queued > 0 &&
workerNode.info.ready &&
+ !workerNode.info.stealing &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
+ this.getWorkerInfo(workerNodeKey).stealing = true
const task = sourceWorkerNode.popTask() as Task<Data>
this.handleTask(workerNodeKey, task)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
task.name as string
)
+ this.getWorkerInfo(workerNodeKey).stealing = false
}
}
}
}
}
- private checkAndEmitDynamicWorkerCreationEvents (): void {
- if (this.type === PoolTypes.dynamic) {
- if (this.full) {
- this.emitter?.emit(PoolEvents.full, this.info)
- }
- }
- }
+ /**
+ * Emits dynamic worker creation events.
+ */
+ protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
/**
* Gets the worker information given its worker node key.