DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
average,
+ exponentialDelay,
isKillBehavior,
isPlainObject,
max,
median,
min,
- round
+ round,
+ sleep
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
import type { TaskFunction } from '../worker/task-functions'
import type {
IWorker,
IWorkerNode,
+ TaskStatistics,
WorkerInfo,
+ WorkerNodeEventDetail,
WorkerType,
WorkerUsage
} from './worker'
/** @inheritDoc */
public emitter?: EventEmitterAsyncResource
+ /**
+ * Dynamic pool maximum size property placeholder.
+ */
+ protected readonly max?: number
+
/**
* The task execution response promise map:
* - `key`: The message id of each submitted task.
Response
>
- /**
- * Dynamic pool maximum size property placeholder.
- */
- protected readonly max?: number
-
/**
* The task functions added at runtime map:
* - `key`: The task function name.
* Whether the pool is starting or not.
*/
private starting: boolean
+ /**
+ * Whether the pool is destroying or not.
+ */
+ private destroying: boolean
+ /**
+ * Whether the pool ready event has been emitted or not.
+ */
+ private readyEventEmitted: boolean
/**
* The start timestamp of the pool.
*/
this.started = false
this.starting = false
+ this.destroying = false
+ this.readyEventEmitted = false
if (this.opts.startWorkers === true) {
this.start()
}
private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
if (message.workerId == null) {
throw new Error('Worker message received without worker id')
- } else if (
- message.workerId != null &&
- this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
- ) {
+ } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
throw new Error(
`Worker message received from unknown worker '${message.workerId}'`
)
this.buildTasksQueueOptions(tasksQueueOptions)
this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
if (this.opts.tasksQueueOptions.taskStealing === true) {
+ this.unsetTaskStealing()
this.setTaskStealing()
} else {
this.unsetTaskStealing()
}
if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+ this.unsetTasksStealingOnBackPressure()
this.setTasksStealingOnBackPressure()
} else {
this.unsetTasksStealingOnBackPressure()
private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].onEmptyQueue =
- this.taskStealingOnEmptyQueue.bind(this)
+ this.workerNodes[workerNodeKey].on(
+ 'idleWorkerNode',
+ this.handleIdleWorkerNodeEvent
+ )
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- delete this.workerNodes[workerNodeKey].onEmptyQueue
+ this.workerNodes[workerNodeKey].off(
+ 'idleWorkerNode',
+ this.handleIdleWorkerNodeEvent
+ )
}
}
private setTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].onBackPressure =
- this.tasksStealingOnBackPressure.bind(this)
+ this.workerNodes[workerNodeKey].on(
+ 'backPressure',
+ this.handleBackPressureEvent
+ )
}
}
private unsetTasksStealingOnBackPressure (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- delete this.workerNodes[workerNodeKey].onBackPressure
+ this.workerNodes[workerNodeKey].off(
+ 'backPressure',
+ this.handleBackPressureEvent
+ )
}
}
reject(new Error('Cannot execute a task on not started pool'))
return
}
+ if (this.destroying) {
+ reject(new Error('Cannot execute a task on destroying pool'))
+ return
+ }
if (name != null && typeof name !== 'string') {
reject(new TypeError('name argument must be a string'))
return
/** @inheritdoc */
public start (): void {
+ if (this.started) {
+ throw new Error('Cannot start an already started pool')
+ }
+ if (this.starting) {
+ throw new Error('Cannot start an already starting pool')
+ }
+ if (this.destroying) {
+ throw new Error('Cannot start a destroying pool')
+ }
this.starting = true
while (
this.workerNodes.reduce(
/** @inheritDoc */
public async destroy (): Promise<void> {
+ if (!this.started) {
+ throw new Error('Cannot destroy an already destroyed pool')
+ }
+ if (this.starting) {
+ throw new Error('Cannot destroy an starting pool')
+ }
+ if (this.destroying) {
+ throw new Error('Cannot destroy an already destroying pool')
+ }
+ this.destroying = true
await Promise.all(
- this.workerNodes.map(async (_, workerNodeKey) => {
+ this.workerNodes.map(async (_workerNode, workerNodeKey) => {
await this.destroyWorkerNode(workerNodeKey)
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
+ this.emitter?.removeAllListeners()
+ this.readyEventEmitted = false
+ this.destroying = false
this.started = false
}
)
}
}
+ // FIXME: should be registered only once
this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
this.sendToWorker(workerNodeKey, { kill: true })
})
worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
worker.on('error', error => {
const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ this.flagWorkerNodeAsNotReady(workerNodeKey)
const workerInfo = this.getWorkerInfo(workerNodeKey)
- workerInfo.ready = false
- this.workerNodes[workerNodeKey].closeChannel()
this.emitter?.emit(PoolEvents.error, error)
+ this.workerNodes[workerNodeKey].closeChannel()
if (
this.started &&
!this.starting &&
+ !this.destroying &&
this.opts.restartWorkerOnError === true
) {
if (workerInfo.dynamic) {
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
+ // Flag the worker node as not ready immediately
+ this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
*/
protected afterWorkerNodeSetup (workerNodeKey: number): void {
// Listen to worker messages.
- this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
+ this.registerWorkerMessageListener(
+ workerNodeKey,
+ this.workerMessageListener
+ )
// Send the startup message to worker.
this.sendStartupMessageToWorker(workerNodeKey)
// Send the statistics message to worker.
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
- this.workerNodes[workerNodeKey].onEmptyQueue =
- this.taskStealingOnEmptyQueue.bind(this)
+ this.workerNodes[workerNodeKey].on(
+ 'idleWorkerNode',
+ this.handleIdleWorkerNodeEvent
+ )
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
- this.workerNodes[workerNodeKey].onBackPressure =
- this.tasksStealingOnBackPressure.bind(this)
+ this.workerNodes[workerNodeKey].on(
+ 'backPressure',
+ this.handleBackPressureEvent
+ )
}
}
}
}
private redistributeQueuedTasks (workerNodeKey: number): void {
+ if (this.workerNodes.length <= 1) {
+ return
+ }
while (this.tasksQueueSize(workerNodeKey) > 0) {
const destinationWorkerNodeKey = this.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
}
}
- private taskStealingOnEmptyQueue (workerId: number): void {
- const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ private updateTaskSequentiallyStolenStatisticsWorkerUsage (
+ workerNodeKey: number
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ ++workerNode.usage.tasks.sequentiallyStolen
+ }
+ }
+
+ private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
+ }
+ }
+
+ private resetTaskSequentiallyStolenStatisticsWorkerUsage (
+ workerNodeKey: number
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerNode?.usage != null) {
+ workerNode.usage.tasks.sequentiallyStolen = 0
+ }
+ }
+
+ private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
+ workerNodeKey: number,
+ taskName: string
+ ): void {
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(taskName) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ taskName
+ ) as WorkerUsage
+ taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
+ }
+ }
+
+ private readonly handleIdleWorkerNodeEvent = (
+ eventDetail: WorkerNodeEventDetail,
+ previousStolenTask?: Task<Data>
+ ): void => {
+ if (this.workerNodes.length <= 1) {
+ return
+ }
+ const { workerNodeKey } = eventDetail
+ if (workerNodeKey == null) {
+ throw new Error(
+ 'WorkerNode event detail workerNodeKey attribute must be defined'
+ )
+ }
+ const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+ if (
+ previousStolenTask != null &&
+ workerNodeTasksUsage.sequentiallyStolen > 0 &&
+ (workerNodeTasksUsage.executing > 0 ||
+ this.tasksQueueSize(workerNodeKey) > 0)
+ ) {
+ for (const taskName of this.workerNodes[workerNodeKey].info
+ .taskFunctionNames as string[]) {
+ this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ taskName
+ )
+ }
+ this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
+ return
+ }
+ const stolenTask = this.workerNodeStealTask(workerNodeKey)
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ stolenTask != null
+ ) {
+ const taskFunctionTasksWorkerUsage = this.workerNodes[
+ workerNodeKey
+ ].getTaskFunctionWorkerUsage(stolenTask.name as string)
+ ?.tasks as TaskStatistics
+ if (
+ taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
+ (previousStolenTask != null &&
+ previousStolenTask.name === stolenTask.name &&
+ taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
+ ) {
+ this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ stolenTask.name as string
+ )
+ } else {
+ this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
+ workerNodeKey,
+ stolenTask.name as string
+ )
+ }
+ }
+ sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
+ .then(() => {
+ this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
+ return undefined
+ })
+ .catch(EMPTY_FUNCTION)
+ }
+
+ private readonly workerNodeStealTask = (
+ workerNodeKey: number
+ ): Task<Data> | undefined => {
const workerNodes = this.workerNodes
.slice()
.sort(
workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
)
const sourceWorkerNode = workerNodes.find(
- workerNode =>
- workerNode.info.ready &&
- workerNode.info.id !== workerId &&
- workerNode.usage.tasks.queued > 0
+ (sourceWorkerNode, sourceWorkerNodeKey) =>
+ sourceWorkerNode.info.ready &&
+ sourceWorkerNodeKey !== workerNodeKey &&
+ sourceWorkerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
const task = sourceWorkerNode.popTask() as Task<Data>
- if (this.shallExecuteTask(destinationWorkerNodeKey)) {
- this.executeTask(destinationWorkerNodeKey, task)
+ if (this.shallExecuteTask(workerNodeKey)) {
+ this.executeTask(workerNodeKey, task)
} else {
- this.enqueueTask(destinationWorkerNodeKey, task)
+ this.enqueueTask(workerNodeKey, task)
}
+ this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
this.updateTaskStolenStatisticsWorkerUsage(
- destinationWorkerNodeKey,
+ workerNodeKey,
task.name as string
)
+ return task
}
}
- private tasksStealingOnBackPressure (workerId: number): void {
+ private readonly handleBackPressureEvent = (
+ eventDetail: WorkerNodeEventDetail
+ ): void => {
+ if (this.workerNodes.length <= 1) {
+ return
+ }
+ const { workerId } = eventDetail
const sizeOffset = 1
if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
return
}
/**
- * This method is the listener registered for each worker message.
- *
- * @returns The listener function to execute when a message is received from a worker.
+ * This method is the message listener registered on each worker.
*/
- protected workerListener (): (message: MessageValue<Response>) => void {
- return message => {
- this.checkMessageWorkerId(message)
- if (message.ready != null && message.taskFunctionNames != null) {
- // Worker ready response received from worker
- this.handleWorkerReadyResponse(message)
- } else if (message.taskId != null) {
- // Task execution response received from worker
- this.handleTaskExecutionResponse(message)
- } else if (message.taskFunctionNames != null) {
- // Task function names message received from worker
- this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(message.workerId)
- ).taskFunctionNames = message.taskFunctionNames
- }
+ protected readonly workerMessageListener = (
+ message: MessageValue<Response>
+ ): void => {
+ this.checkMessageWorkerId(message)
+ const { workerId, ready, taskId, taskFunctionNames } = message
+ if (ready != null && taskFunctionNames != null) {
+ // Worker ready response received from worker
+ this.handleWorkerReadyResponse(message)
+ } else if (taskId != null) {
+ // Task execution response received from worker
+ this.handleTaskExecutionResponse(message)
+ } else if (taskFunctionNames != null) {
+ // Task function names message received from worker
+ this.getWorkerInfo(
+ this.getWorkerNodeKeyByWorkerId(workerId)
+ ).taskFunctionNames = taskFunctionNames
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
- if (message.ready === false) {
- throw new Error(
- `Worker ${message.workerId as number} failed to initialize`
- )
+ const { workerId, ready, taskFunctionNames } = message
+ if (ready === false) {
+ throw new Error(`Worker ${workerId as number} failed to initialize`)
}
const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(message.workerId)
+ this.getWorkerNodeKeyByWorkerId(workerId)
)
- workerInfo.ready = message.ready as boolean
- workerInfo.taskFunctionNames = message.taskFunctionNames
- if (this.ready) {
+ workerInfo.ready = ready as boolean
+ workerInfo.taskFunctionNames = taskFunctionNames
+ if (!this.readyEventEmitted && this.ready) {
+ this.readyEventEmitted = true
this.emitter?.emit(PoolEvents.ready, this.info)
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
- const { taskId, workerError, data } = message
+ const { workerId, taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
+ const { resolve, reject, workerNodeKey } = promiseResponse
if (workerError != null) {
this.emitter?.emit(PoolEvents.taskError, workerError)
- promiseResponse.reject(workerError.message)
+ reject(workerError.message)
} else {
- promiseResponse.resolve(data as Response)
+ resolve(data as Response)
}
- const workerNodeKey = promiseResponse.workerNodeKey
this.afterTaskExecutionHook(workerNodeKey, message)
this.workerChoiceStrategyContext.update(workerNodeKey)
this.promiseResponseMap.delete(taskId as string)
- if (
- this.opts.enableTasksQueue === true &&
- this.tasksQueueSize(workerNodeKey) > 0 &&
- this.workerNodes[workerNodeKey].usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
- ) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
+ if (this.opts.enableTasksQueue === true) {
+ const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
+ if (
+ this.tasksQueueSize(workerNodeKey) > 0 &&
+ workerNodeTasksUsage.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ if (
+ workerNodeTasksUsage.executing === 0 &&
+ this.tasksQueueSize(workerNodeKey) === 0 &&
+ workerNodeTasksUsage.sequentiallyStolen === 0
+ ) {
+ this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
+ workerId: workerId as number,
+ workerNodeKey
+ })
+ }
}
}
}
* @returns The worker information.
*/
protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
- return this.workerNodes[workerNodeKey].info
+ return this.workerNodes[workerNodeKey]?.info
}
/**
}
}
+ protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
+ this.getWorkerInfo(workerNodeKey).ready = false
+ }
+
/** @inheritDoc */
public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
return (