WorkerUsage
} from './worker'
import {
+ Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
'Invalid worker choice strategy options: must have a weight for each worker node'
)
}
+ if (
+ workerChoiceStrategyOptions.measurement != null &&
+ !Object.values(Measurements).includes(
+ workerChoiceStrategyOptions.measurement
+ )
+ ) {
+ throw new Error(
+ `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
+ )
+ }
}
private checkValidTasksQueueOptions (
if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
throw new TypeError('Invalid tasks queue options: must be a plain object')
}
- if ((tasksQueueOptions?.concurrency as number) <= 0) {
+ if (
+ tasksQueueOptions?.concurrency != null &&
+ !Number.isSafeInteger(tasksQueueOptions.concurrency)
+ ) {
+ throw new TypeError(
+ 'Invalid worker tasks concurrency: must be an integer'
+ )
+ }
+ if (
+ tasksQueueOptions?.concurrency != null &&
+ tasksQueueOptions.concurrency <= 0
+ ) {
throw new Error(
- `Invalid worker tasks concurrency '${
- tasksQueueOptions.concurrency as number
- }'`
+ `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
)
}
}
*/
protected abstract get busy (): boolean
+ /**
+ * Whether worker nodes are executing at least one task.
+ *
+ * @returns Worker nodes busyness boolean status.
+ */
protected internalBusy (): boolean {
return (
this.workerNodes.findIndex(workerNode => {
} else {
this.executeTask(workerNodeKey, submittedTask)
}
- this.workerChoiceStrategyContext.update(workerNodeKey)
this.checkAndEmitEvents()
// eslint-disable-next-line @typescript-eslint/return-await
return res
}
/**
- * Shutdowns the given worker.
+ * Terminates the given worker.
*
* @param worker - A worker within `workerNodes`.
*/
/**
* Chooses a worker node for the next task.
*
- * The default worker choice strategy uses a round robin algorithm to distribute the load.
+ * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
*
* @returns The worker node key
*/
- protected chooseWorkerNode (): number {
- let workerNodeKey: number
- if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
- const workerCreated = this.createAndSetupWorker()
- this.registerWorkerMessageListener(workerCreated, message => {
- const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
- if (
- isKillBehavior(KillBehaviors.HARD, message.kill) ||
- (message.kill != null &&
- this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
- .executing === 0)
- ) {
- // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- this.flushTasksQueue(currentWorkerNodeKey)
- // FIXME: wait for tasks to be finished
- void (this.destroyWorker(workerCreated) as Promise<void>)
- }
- })
- workerNodeKey = this.getWorkerNodeKey(workerCreated)
- } else {
- workerNodeKey = this.workerChoiceStrategyContext.execute()
+ private chooseWorkerNode (): number {
+ if (this.shallCreateDynamicWorker()) {
+ const worker = this.createAndSetupDynamicWorker()
+ if (
+ this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
+ ) {
+ return this.getWorkerNodeKey(worker)
+ }
}
- return workerNodeKey
+ return this.workerChoiceStrategyContext.execute()
+ }
+
+ /**
+ * Conditions for dynamic worker creation.
+ *
+ * @returns Whether to create a dynamic worker or not.
+ */
+ private shallCreateDynamicWorker (): boolean {
+ return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
}
/**
>(worker: Worker, listener: (message: MessageValue<Message>) => void): void
/**
- * Returns a newly created worker.
+ * Creates a new worker.
+ *
+ * @returns Newly created worker.
*/
protected abstract createWorker (): Worker
return worker
}
+ /**
+ * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
+ *
+ * @returns New, completely set up dynamic worker.
+ */
+ protected createAndSetupDynamicWorker (): Worker {
+ const worker = this.createAndSetupWorker()
+ this.registerWorkerMessageListener(worker, message => {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ if (
+ isKillBehavior(KillBehaviors.HARD, message.kill) ||
+ (message.kill != null &&
+ ((this.opts.enableTasksQueue === false &&
+ this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
+ 0) ||
+ (this.opts.enableTasksQueue === true &&
+ this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
+ 0 &&
+ this.tasksQueueSize(workerNodeKey) === 0)))
+ ) {
+ // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
+ void (this.destroyWorker(worker) as Promise<void>)
+ }
+ })
+ return worker
+ }
+
/**
* This function is the listener registered for each worker message.
*
const promiseResponse = this.promiseResponseMap.get(message.id)
if (promiseResponse != null) {
if (message.taskError != null) {
- promiseResponse.reject(message.taskError.message)
if (this.emitter != null) {
this.emitter.emit(PoolEvents.taskError, message.taskError)
}
+ promiseResponse.reject(message.taskError.message)
} else {
promiseResponse.resolve(message.data as Response)
}
this.dequeueTask(workerNodeKey) as Task<Data>
)
}
+ this.workerChoiceStrategyContext.update(workerNodeKey)
}
}
}