- public execute (data: Data): Promise<Response> {
- // Configure worker to handle message with the specified task
- const worker = this.chooseWorker()
- const messageId = ++this.nextMessageId
- const res = this.internalExecute(worker, messageId)
+ public async execute (data: Data): Promise<Response> {
+ const [workerNodeKey, workerNode] = this.chooseWorkerNode()
+ const submittedTask: Task<Data> = {
+ // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
+ data: data ?? ({} as Data),
+ id: crypto.randomUUID()
+ }
+ const res = this.internalExecute(workerNodeKey, workerNode, submittedTask)
+ let currentTask: Task<Data>
+ // FIXME: Add sensible conditions to start tasks queuing on the worker node
+ if (this.tasksQueueLength(workerNodeKey) > 0) {
+ currentTask = this.dequeueTask(workerNodeKey) as Task<Data>
+ this.enqueueTask(workerNodeKey, submittedTask)
+ } else {
+ currentTask = submittedTask
+ }
+ this.sendToWorker(workerNode.worker, currentTask)
+ this.checkAndEmitFull()