DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
average,
+ exponentialDelay,
isKillBehavior,
isPlainObject,
max,
median,
min,
- round
+ round,
+ sleep
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
import type { TaskFunction } from '../worker/task-functions'
}
}
+ private updateTaskSequentiallyStolenStatisticsWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ ++workerNode.usage.tasks.sequentiallyStolen
+ }
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ }
+ }
+
+ private resetTaskSequentiallyStolenStatisticsWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ workerNode.usage.tasks.sequentiallyStolen = 0
+ }
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ }
+ }
+
private readonly handleIdleWorkerNodeEvent = (
- event: CustomEvent<WorkerNodeEventDetail>
+ event: CustomEvent<WorkerNodeEventDetail>,
+ previousStolenTask?: Task<Data>
): void => {
- const { workerId } = event.detail
- const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const { workerNodeKey } = event.detail
+ if (workerNodeKey == null) {
+ throw new Error(
+ 'WorkerNode event detail workerNodeKey attribute must be defined'
+ )
+ }
+ const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+ if (
+ previousStolenTask != null &&
+ workerNodeTasksUsage.sequentiallyStolen > 0 &&
+ (workerNodeTasksUsage.executing > 0 ||
+ this.tasksQueueSize(workerNodeKey) > 0)
+ ) {
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ previousStolenTask.name as string
+ )
+ return
+ }
+ const stolenTask = this.workerNodeStealTask(workerNodeKey)
+ sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
+ .then(() => {
+ this.handleIdleWorkerNodeEvent(event, stolenTask)
+ return undefined
+ })
+ .catch(EMPTY_FUNCTION)
+ }
+
+ private readonly workerNodeStealTask = (
+ workerNodeKey: number
+ ): Task<Data> | undefined => {
const workerNodes = this.workerNodes
.slice()
.sort(
workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
)
const sourceWorkerNode = workerNodes.find(
- workerNode =>
- workerNode.info.ready &&
- workerNode.info.id !== workerId &&
- workerNode.usage.tasks.queued > 0
+ (sourceWorkerNode, sourceWorkerNodeKey) =>
+ sourceWorkerNode.info.ready &&
+ sourceWorkerNodeKey !== workerNodeKey &&
+ sourceWorkerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
const task = sourceWorkerNode.popTask() as Task<Data>
- if (this.shallExecuteTask(destinationWorkerNodeKey)) {
- this.executeTask(destinationWorkerNodeKey, task)
+ if (this.shallExecuteTask(workerNodeKey)) {
+ this.executeTask(workerNodeKey, task)
} else {
- this.enqueueTask(destinationWorkerNodeKey, task)
+ this.enqueueTask(workerNodeKey, task)
}
+ this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
+ workerNodeKey,
+ task.name as string
+ )
this.updateTaskStolenStatisticsWorkerUsage(
- destinationWorkerNodeKey,
+ workerNodeKey,
task.name as string
)
+ return task
}
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
- if (message.ready === false) {
- throw new Error(
- `Worker ${message.workerId as number} failed to initialize`
- )
+ const { workerId, ready, taskFunctionNames } = message
+ if (ready === false) {
+ throw new Error(`Worker ${workerId as number} failed to initialize`)
}
const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(message.workerId)
+ this.getWorkerNodeKeyByWorkerId(workerId)
)
- workerInfo.ready = message.ready as boolean
- workerInfo.taskFunctionNames = message.taskFunctionNames
+ workerInfo.ready = ready as boolean
+ workerInfo.taskFunctionNames = taskFunctionNames
if (!this.readyEventEmitted && this.ready) {
this.readyEventEmitted = true
this.emitter?.emit(PoolEvents.ready, this.info)
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
- const { taskId, workerError, data } = message
+ const { workerId, taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
const { resolve, reject, workerNodeKey } = promiseResponse
this.afterTaskExecutionHook(workerNodeKey, message)
this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)
- if (
- this.opts.enableTasksQueue === true &&
- this.tasksQueueSize(workerNodeKey) > 0 &&
- this.workerNodes[workerNodeKey].usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
- ) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
+ if (this.opts.enableTasksQueue === true) {
+ const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+ if (
+ this.tasksQueueSize(workerNodeKey) > 0 &&
+ workerNodeTasksUsage.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ if (
+ workerNodeTasksUsage.executing === 0 &&
+ this.tasksQueueSize(workerNodeKey) === 0 &&
+ workerNodeTasksUsage.sequentiallyStolen === 0
+ ) {
+ this.workerNodes[workerNodeKey].dispatchEvent(
+ new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
+ detail: { workerId: workerId as number, workerNodeKey }
+ })
+ )
+ }
}
}
}
import { MessageChannel } from 'node:worker_threads'
import { CircularArray } from '../circular-array'
import type { Task } from '../utility-types'
-import {
- DEFAULT_TASK_NAME,
- EMPTY_FUNCTION,
- exponentialDelay,
- getWorkerId,
- getWorkerType,
- sleep
-} from '../utils'
+import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
import { Deque } from '../deque'
import {
type IWorker,
public tasksQueueBackPressureSize: number
private readonly tasksQueue: Deque<Task<Data>>
private onBackPressureStarted: boolean
- private onIdleWorkerNodeCount: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
/**
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
this.onBackPressureStarted = false
- this.onIdleWorkerNodeCount = 0
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
- const task = this.tasksQueue.shift()
- if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
- this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
- }
- return task
+ return this.tasksQueue.shift()
}
/** @inheritdoc */
public popTask (): Task<Data> | undefined {
- const task = this.tasksQueue.pop()
- if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
- this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
- }
- return task
+ return this.tasksQueue.pop()
}
/** @inheritdoc */
return this.taskFunctionsUsage.delete(name)
}
- private async startOnIdleWorkerNode (): Promise<void> {
- if (
- this.onIdleWorkerNodeCount > 0 &&
- (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
- ) {
- this.onIdleWorkerNodeCount = 0
- return
- }
- ++this.onIdleWorkerNodeCount
- this.dispatchEvent(
- new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
- detail: { workerId: this.info.id as number }
- })
- )
- await sleep(exponentialDelay(this.onIdleWorkerNodeCount))
- await this.startOnIdleWorkerNode()
- }
-
- private isIdle (): boolean {
- return this.usage.tasks.executing === 0 && this.tasksQueue.size === 0
- }
-
private initWorkerInfo (worker: Worker): WorkerInfo {
return {
id: getWorkerId(worker),
get maxQueued (): number {
return getTasksQueueMaxSize()
},
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
get queued (): number {
return getTaskFunctionQueueSize()
},
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},