+ if (executeTask) {
+ this.executeTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ } else {
+ this.enqueueTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+ }
+
+ /**
+ * This method is the listener registered for each worker message.
+ *
+ * @returns The listener function to execute when a message is received from a worker.
+ */
+ protected workerListener (): (message: MessageValue<Response>) => void {
+ return message => {
+ this.checkMessageWorkerId(message)
+ if (message.ready != null) {
+ // Worker ready response received from worker
+ this.handleWorkerReadyResponse(message)
+ } else if (message.id != null) {
+ // Task execution response received from worker
+ this.handleTaskExecutionResponse(message)
+ }
+ }
+ }
+
+ private handleWorkerReadyResponse (message: MessageValue<Response>): void {
+ this.getWorkerInfo(
+ this.getWorkerNodeKeyByWorkerId(message.workerId)
+ ).ready = message.ready as boolean
+ if (this.emitter != null && this.ready) {
+ this.emitter.emit(PoolEvents.ready, this.info)
+ }
+ }
+
+ private handleTaskExecutionResponse (message: MessageValue<Response>): void {
+ const promiseResponse = this.promiseResponseMap.get(message.id as string)
+ if (promiseResponse != null) {
+ if (message.taskError != null) {
+ this.emitter?.emit(PoolEvents.taskError, message.taskError)
+ promiseResponse.reject(message.taskError.message)
+ } else {
+ promiseResponse.resolve(message.data as Response)
+ }
+ const workerNodeKey = promiseResponse.workerNodeKey
+ this.afterTaskExecutionHook(workerNodeKey, message)
+ this.promiseResponseMap.delete(message.id as string)
+ if (
+ this.opts.enableTasksQueue === true &&
+ this.tasksQueueSize(workerNodeKey) > 0 &&
+ this.workerNodes[workerNodeKey].usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ this.workerChoiceStrategyContext.update(workerNodeKey)