+ private updateRunTimeTasksUsage (
+ workerTasksUsage: TasksUsage,
+ message: MessageValue<Response>
+ ): void {
+ if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
+ workerTasksUsage.runTime += message.runTime ?? 0
+ if (
+ this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
+ workerTasksUsage.run !== 0
+ ) {
+ workerTasksUsage.avgRunTime =
+ workerTasksUsage.runTime / workerTasksUsage.run
+ }
+ if (
+ this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
+ message.runTime != null
+ ) {
+ workerTasksUsage.runTimeHistory.push(message.runTime)
+ workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
+ }
+ }
+ }
+
+ private updateWaitTasksUsage (
+ workerTasksUsage: TasksUsage,
+ message: MessageValue<Response>
+ ): void {
+ if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
+ workerTasksUsage.waitTime += message.waitTime ?? 0
+ if (
+ this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
+ workerTasksUsage.run !== 0
+ ) {
+ workerTasksUsage.avgWaitTime =
+ workerTasksUsage.waitTime / workerTasksUsage.run
+ }
+ if (
+ this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
+ message.waitTime != null
+ ) {
+ workerTasksUsage.waitTimeHistory.push(message.waitTime)
+ workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
+ }
+ }
+ }
+
+ /**
+ * Chooses a worker node for the next task.
+ *
+ * The default worker choice strategy uses a round robin algorithm to distribute the load.
+ *
+ * @returns The worker node key
+ */
+ protected chooseWorkerNode (): number {
+ let workerNodeKey: number
+ if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
+ const workerCreated = this.createAndSetupWorker()
+ this.registerWorkerMessageListener(workerCreated, message => {
+ const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
+ if (
+ isKillBehavior(KillBehaviors.HARD, message.kill) ||
+ (message.kill != null &&
+ this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
+ ) {
+ // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
+ this.flushTasksQueue(currentWorkerNodeKey)
+ void (this.destroyWorker(workerCreated) as Promise<void>)
+ }
+ })
+ workerNodeKey = this.getWorkerNodeKey(workerCreated)
+ } else {
+ workerNodeKey = this.workerChoiceStrategyContext.execute()
+ }
+ return workerNodeKey
+ }
+
+ /**
+ * Sends a message to the given worker.
+ *
+ * @param worker - The worker which should receive the message.
+ * @param message - The message.
+ */
+ protected abstract sendToWorker (
+ worker: Worker,
+ message: MessageValue<Data>