this.chooseWorkerNode.bind(this)
this.internalExecute.bind(this)
- this.checkAndEmitFull.bind(this)
- this.checkAndEmitBusy.bind(this)
+ this.checkAndEmitEvents.bind(this)
this.sendToWorker.bind(this)
this.setupHook()
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
this.opts.enableEvents = opts.enableEvents ?? true
+ this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
}
private checkValidWorkerChoiceStrategy (
public abstract get type (): PoolType
/**
- * Number of tasks concurrently running in the pool.
+ * Number of tasks running in the pool.
*/
private get numberOfRunningTasks (): number {
- return this.promiseResponseMap.size
+ return this.workerNodes.reduce(
+ (accumulator, workerNode) => accumulator + workerNode.tasksUsage.running,
+ 0
+ )
+ }
+
+ /**
+ * Number of tasks queued in the pool.
+ */
+ private get numberOfQueuedTasks (): number {
+ if (this.opts.enableTasksQueue === false) {
+ return 0
+ }
+ return this.workerNodes.reduce(
+ (accumulator, workerNode) => accumulator + workerNode.tasksQueue.length,
+ 0
+ )
}
/**
id: crypto.randomUUID()
}
const res = this.internalExecute(workerNodeKey, workerNode, submittedTask)
- let currentTask: Task<Data>
- // FIXME: Add sensible conditions to start tasks queuing on the worker node
- if (this.tasksQueueLength(workerNodeKey) > 0) {
- currentTask = this.dequeueTask(workerNodeKey) as Task<Data>
+ let currentTask: Task<Data> = submittedTask
+ if (
+ this.opts.enableTasksQueue === true &&
+ (this.busy || this.tasksQueueLength(workerNodeKey) > 0)
+ ) {
this.enqueueTask(workerNodeKey, submittedTask)
- } else {
- currentTask = submittedTask
+ currentTask = this.dequeueTask(workerNodeKey) as Task<Data>
}
this.sendToWorker(workerNode.worker, currentTask)
- this.checkAndEmitFull()
- this.checkAndEmitBusy()
+ this.checkAndEmitEvents()
// eslint-disable-next-line @typescript-eslint/return-await
return res
}
public async destroy (): Promise<void> {
await Promise.all(
this.workerNodes.map(async workerNode => {
+ this.flushTasksQueueByWorker(workerNode.worker)
await this.destroyWorker(workerNode.worker)
})
)
(message.kill != null &&
this.getWorkerTasksUsage(workerCreated)?.running === 0)
) {
- // Kill message received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
+ // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
+ this.flushTasksQueueByWorker(workerCreated)
void this.destroyWorker(workerCreated)
}
})
}
/**
- * This function is the listener registered for each worker.
+ * This function is the listener registered for each worker message.
*
* @returns The listener function to execute when a message is received from a worker.
*/
}
this.afterPromiseResponseHook(promiseResponse.worker, message)
this.promiseResponseMap.delete(message.id)
+ const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
+ if (
+ this.opts.enableTasksQueue === true &&
+ this.tasksQueueLength(workerNodeKey) > 0
+ ) {
+ this.sendToWorker(
+ promiseResponse.worker,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
}
}
}
})
}
- private checkAndEmitBusy (): void {
- if (this.opts.enableEvents === true && this.busy) {
- this.emitter?.emit(PoolEvents.busy)
- }
- }
-
- private checkAndEmitFull (): void {
- if (
- this.type === PoolType.DYNAMIC &&
- this.opts.enableEvents === true &&
- this.full
- ) {
- this.emitter?.emit(PoolEvents.full)
+ private checkAndEmitEvents (): void {
+ if (this.opts.enableEvents === true) {
+ if (this.busy) {
+ this.emitter?.emit(PoolEvents.busy)
+ }
+ if (this.type === PoolType.DYNAMIC && this.full) {
+ this.emitter?.emit(PoolEvents.full)
+ }
}
}
protected tasksQueueLength (workerNodeKey: number): number {
return this.workerNodes[workerNodeKey].tasksQueue.length
}
+
+ protected flushTasksQueue (workerNodeKey: number): void {
+ if (this.tasksQueueLength(workerNodeKey) > 0) {
+ for (const task of this.workerNodes[workerNodeKey].tasksQueue) {
+ this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
+ }
+ this.workerNodes[workerNodeKey].tasksQueue = []
+ }
+ }
+
+ protected flushTasksQueueByWorker (worker: Worker): void {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ this.flushTasksQueue(workerNodeKey)
+ }
}