`taskFunctions` (mandatory) The task function or task functions object `{ name_1: fn_1, ..., name_n: fn_n }` that you want to execute on the worker
`opts` (optional) An object with these properties:
-- `maxInactiveTime` (optional) - Max time to wait tasks to work on in milliseconds, after this period the new worker will die.
- The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task.
+- `maxInactiveTime` (optional) - Maximum waiting time in milliseconds for tasks on newly created workers. After this time newly created workers will die.
+ The last active time of your worker will be updated when it terminates a task.
If `killBehavior` is set to `KillBehaviors.HARD` this value represents also the timeout for the tasks that you submit to the pool, when this timeout expires your tasks is interrupted before completion and removed. The worker is killed if is not part of the minimum size of the pool.
If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed.
Default: `60000`
-- `killBehavior` (optional) - Dictates if your worker will be deleted in case that a task is active on it.
+- `killBehavior` (optional) - Dictates if your worker will be deleted in case a task is active on it.
**KillBehaviors.SOFT**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing or queued, then the worker **won't** be deleted.
**KillBehaviors.HARD**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing or queued, then the worker will be deleted.
This option only apply to the newly created workers.
this.removeWorkerNode(worker)
})
- this.pushWorkerNode(worker)
+ this.addWorkerNode(worker)
this.afterWorkerSetup(worker)
}
})
const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
+ workerInfo.dynamic = true
if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
workerInfo.ready = true
}
- workerInfo.dynamic = true
this.sendToWorker(worker, {
- checkAlive: true,
+ checkActive: true,
workerId: workerInfo.id as number
})
return worker
}
/**
- * Pushes the given worker in the pool worker nodes.
+ * Adds the given worker in the pool worker nodes.
*
* @param worker - The worker.
* @returns The worker nodes length.
*/
- private pushWorkerNode (worker: Worker): number {
+ private addWorkerNode (worker: Worker): number {
const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
// Flag the worker node as ready at pool startup.
if (this.starting) {
}
}
+ /**
+ * Executes the given task on the given worker.
+ *
+ * @param worker - The worker.
+ * @param task - The task to execute.
+ */
private executeTask (workerNodeKey: number, task: Task<Data>): void {
this.beforeTaskExecutionHook(workerNodeKey, task)
this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
*/
readonly ready?: boolean
/**
- * Whether the worker starts or stops its aliveness check.
+ * Whether the worker starts or stops its activity check.
*/
- readonly checkAlive?: boolean
+ readonly checkActive?: boolean
}
/**
*/
protected statistics!: WorkerStatistics
/**
- * Handler id of the `aliveInterval` worker alive check.
+ * Handler id of the `activeInterval` worker activity check.
*/
- protected aliveInterval?: NodeJS.Timeout
+ protected activeInterval?: NodeJS.Timeout
/**
* Constructs a new poolifier worker.
*
*/
killBehavior: DEFAULT_KILL_BEHAVIOR,
/**
- * The maximum time to keep this worker alive while idle.
+ * The maximum time to keep this worker active while idle.
* The pool automatically checks and terminates this worker when the time expires.
*/
maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME
if (message.workerId === this.id) {
if (message.ready != null) {
// Startup message received
- this.workerReady()
+ this.sendReadyResponse()
} else if (message.statistics != null) {
// Statistics message received
this.statistics = message.statistics
- } else if (message.checkAlive != null) {
- // Check alive message received
- message.checkAlive ? this.startCheckAlive() : this.stopCheckAlive()
+ } else if (message.checkActive != null) {
+ // Check active message received
+ message.checkActive ? this.startCheckActive() : this.stopCheckActive()
} else if (message.id != null && message.data != null) {
// Task message received
this.run(message)
} else if (message.kill === true) {
// Kill message received
- this.stopCheckAlive()
+ this.stopCheckActive()
this.emitDestroy()
}
}
}
/**
- * Notifies the main worker that this worker is ready to process tasks.
+ * Sends to the main worker the ready response.
*/
- protected workerReady (): void {
+ protected sendReadyResponse (): void {
!this.isMain && this.sendToMainWorker({ ready: true, workerId: this.id })
}
/**
- * Starts the worker aliveness check interval.
+ * Starts the worker check active interval.
*/
- private startCheckAlive (): void {
+ private startCheckActive (): void {
this.lastTaskTimestamp = performance.now()
- this.aliveInterval = setInterval(
- this.checkAlive.bind(this),
+ this.activeInterval = setInterval(
+ this.checkActive.bind(this),
(this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2
)
}
/**
- * Stops the worker aliveness check interval.
+ * Stops the worker check active interval.
*/
- private stopCheckAlive (): void {
- this.aliveInterval != null && clearInterval(this.aliveInterval)
+ private stopCheckActive (): void {
+ this.activeInterval != null && clearInterval(this.activeInterval)
}
/**
* Checks if the worker should be terminated, because its living too long.
*/
- private checkAlive (): void {
+ private checkActive (): void {
if (
performance.now() - this.lastTaskTimestamp >
(this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
id: task.id
})
} finally {
- if (!this.isMain && this.aliveInterval != null) {
+ if (!this.isMain && this.activeInterval != null) {
this.lastTaskTimestamp = performance.now()
}
}
})
})
.finally(() => {
- if (!this.isMain && this.aliveInterval != null) {
+ if (!this.isMain && this.activeInterval != null) {
this.lastTaskTimestamp = performance.now()
}
})
*/
export interface WorkerOptions {
/**
- * Maximum waiting time in milliseconds for tasks.
+ * Maximum waiting time in milliseconds for tasks on newly created workers.
*
* After this time, newly created workers will be terminated.
- * The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task.
+ * The last active time of your worker will be updated when it terminates a task.
*
* - If `killBehavior` is set to `KillBehaviors.HARD` this value represents also the timeout for the tasks that you submit to the pool,
- * when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool.
+ * when this timeout expires your tasks is interrupted before completion and removed. The worker is killed if is not part of the minimum size of the pool.
* - If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed.
*
* @defaultValue 60000
*/
async?: boolean
/**
- * `killBehavior` dictates if your worker will be deleted in case that a task is active on it.
+ * `killBehavior` dictates if your worker will be deleted in case a task is active on it.
*
* - SOFT: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing or queued, then the worker **won't** be deleted.
* - HARD: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing or queued, then the worker will be deleted.