- public execute (data: Data): Promise<Response> {
- // Configure worker to handle message with the specified task
- const worker = this.chooseWorker()
- const messageId = ++this.nextMessageId
- const res = this.internalExecute(worker, messageId)
- this.checkAndEmitBusy()
- data = data ?? ({} as Data)
- this.sendToWorker(worker, { data, id: messageId })
+ public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
+ if (this.opts.enableTasksQueue === true) {
+ this.checkValidTasksQueueOptions(tasksQueueOptions)
+ this.opts.tasksQueueOptions =
+ this.buildTasksQueueOptions(tasksQueueOptions)
+ } else {
+ delete this.opts.tasksQueueOptions
+ }
+ }
+
+ private buildTasksQueueOptions (
+ tasksQueueOptions: TasksQueueOptions
+ ): TasksQueueOptions {
+ return {
+ concurrency: tasksQueueOptions?.concurrency ?? 1
+ }
+ }
+
+ /**
+ * Whether the pool is full or not.
+ *
+ * The pool filling boolean status.
+ */
+ protected abstract get full (): boolean
+
+ /**
+ * Whether the pool is busy or not.
+ *
+ * The pool busyness boolean status.
+ */
+ protected abstract get busy (): boolean
+
+ protected internalBusy (): boolean {
+ return (
+ this.workerNodes.findIndex(workerNode => {
+ return workerNode.tasksUsage.running === 0
+ }) === -1
+ )
+ }
+
+ /** @inheritDoc */
+ public async execute (data?: Data, name?: string): Promise<Response> {
+ const [workerNodeKey, workerNode] = this.chooseWorkerNode()
+ const submittedTask: Task<Data> = {
+ name,
+ // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
+ data: data ?? ({} as Data),
+ id: crypto.randomUUID()
+ }
+ const res = new Promise<Response>((resolve, reject) => {
+ this.promiseResponseMap.set(submittedTask.id as string, {
+ resolve,
+ reject,
+ worker: workerNode.worker
+ })
+ })
+ if (
+ this.opts.enableTasksQueue === true &&
+ (this.busy ||
+ this.workerNodes[workerNodeKey].tasksUsage.running >=
+ ((this.opts.tasksQueueOptions as TasksQueueOptions)
+ .concurrency as number))
+ ) {
+ this.enqueueTask(workerNodeKey, submittedTask)
+ } else {
+ this.executeTask(workerNodeKey, submittedTask)
+ }
+ this.workerChoiceStrategyContext.update(workerNodeKey)
+ this.checkAndEmitEvents()
+ // eslint-disable-next-line @typescript-eslint/return-await