this.checkFilePath(this.filePath)
this.checkPoolOptions(this.opts)
- this.chooseWorker.bind(this)
+ this.chooseWorkerNode.bind(this)
this.internalExecute.bind(this)
this.checkAndEmitFull.bind(this)
this.checkAndEmitBusy.bind(this)
/** @inheritDoc */
public async execute (data: Data): Promise<Response> {
- const [workerNodeKey, worker] = this.chooseWorker()
- const messageId = crypto.randomUUID()
- const res = this.internalExecute(workerNodeKey, worker, messageId)
- this.checkAndEmitFull()
- this.checkAndEmitBusy()
- this.sendToWorker(worker, {
+ const [workerNodeKey, workerNode] = this.chooseWorkerNode()
+ const submittedTask: Task<Data> = {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
- id: messageId
- })
+ id: crypto.randomUUID()
+ }
+ const res = this.internalExecute(workerNodeKey, workerNode, submittedTask)
+ let currentTask: Task<Data>
+ // FIXME: Add sensible conditions to start tasks queuing on the worker node.
+ if (this.tasksQueueLength(workerNodeKey) > 0) {
+ currentTask = this.dequeueTask(workerNodeKey) as Task<Data>
+ this.enqueueTask(workerNodeKey, submittedTask)
+ } else {
+ currentTask = submittedTask
+ }
+ this.sendToWorker(workerNode.worker, currentTask)
+ this.checkAndEmitFull()
+ this.checkAndEmitBusy()
// eslint-disable-next-line @typescript-eslint/return-await
return res
}
*
* The default uses a round robin algorithm to distribute the load.
*
- * @returns [worker node key, worker].
+ * @returns [worker node key, worker node].
*/
- protected chooseWorker (): [number, Worker] {
+ protected chooseWorkerNode (): [number, WorkerNode<Worker, Data>] {
let workerNodeKey: number
if (
this.type === PoolType.DYNAMIC &&
!this.full &&
this.findFreeWorkerNodeKey() === -1
) {
- const createdWorker = this.createAndSetupWorker()
- this.registerWorkerMessageListener(createdWorker, message => {
+ const workerCreated = this.createAndSetupWorker()
+ this.registerWorkerMessageListener(workerCreated, message => {
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
(message.kill != null &&
- this.getWorkerTasksUsage(createdWorker)?.running === 0)
+ this.getWorkerTasksUsage(workerCreated)?.running === 0)
) {
// Kill message received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- void this.destroyWorker(createdWorker)
+ void this.destroyWorker(workerCreated)
}
})
- workerNodeKey = this.getWorkerNodeKey(createdWorker)
+ workerNodeKey = this.getWorkerNodeKey(workerCreated)
} else {
workerNodeKey = this.workerChoiceStrategyContext.execute()
}
- return [workerNodeKey, this.workerNodes[workerNodeKey].worker]
+ return [workerNodeKey, this.workerNodes[workerNodeKey]]
}
/**
private async internalExecute (
workerNodeKey: number,
- worker: Worker,
- messageId: string
+ workerNode: WorkerNode<Worker, Data>,
+ task: Task<Data>
): Promise<Response> {
this.beforePromiseResponseHook(workerNodeKey)
return await new Promise<Response>((resolve, reject) => {
- this.promiseResponseMap.set(messageId, { resolve, reject, worker })
+ this.promiseResponseMap.set(task.id, {
+ resolve,
+ reject,
+ worker: workerNode.worker
+ })
})
}
this.workerNodes.splice(workerNodeKey, 1)
this.workerChoiceStrategyContext.remove(workerNodeKey)
}
+
+ protected enqueueTask (workerNodeKey: number, task: Task<Data>): void {
+ this.workerNodes[workerNodeKey].tasksQueue.push(task)
+ }
+
+ protected dequeueTask (workerNodeKey: number): Task<Data> | undefined {
+ return this.workerNodes[workerNodeKey].tasksQueue.shift()
+ }
+
+ protected tasksQueueLength (workerNodeKey: number): number {
+ return this.workerNodes[workerNodeKey].tasksQueue.length
+ }
}