+ public async removeTaskFunction (name: string): Promise<boolean> {
+ if (!this.taskFunctions.has(name)) {
+ throw new Error(
+ 'Cannot remove a task function not handled on the pool side'
+ )
+ }
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
+ taskFunctionOperation: 'remove',
+ taskFunctionProperties: buildTaskFunctionProperties(
+ name,
+ this.taskFunctions.get(name)
+ ),
+ })
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
+ this.taskFunctions.delete(name)
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerChoiceStrategies()
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
+ return opResult
+ }
+
+ /** @inheritDoc */
+ public listTaskFunctionsProperties (): TaskFunctionProperties[] {
+ for (const workerNode of this.workerNodes) {
+ if (
+ Array.isArray(workerNode.info.taskFunctionsProperties) &&
+ workerNode.info.taskFunctionsProperties.length > 0
+ ) {
+ return workerNode.info.taskFunctionsProperties
+ }
+ }
+ return []
+ }
+
+ /**
+ * Gets task function worker choice strategy, if any.
+ * @param name - The task function name.
+ * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
+ */
+ private readonly getTaskFunctionWorkerChoiceStrategy = (
+ name?: string
+ ): WorkerChoiceStrategy | undefined => {
+ name = name ?? DEFAULT_TASK_NAME
+ const taskFunctionsProperties = this.listTaskFunctionsProperties()
+ if (name === DEFAULT_TASK_NAME) {
+ name = taskFunctionsProperties[1]?.name
+ }
+ return taskFunctionsProperties.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
+ }
+
+ /**
+ * Gets worker node task function worker choice strategy, if any.
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = (
+ workerNodeKey: number,
+ name?: string
+ ): WorkerChoiceStrategy | undefined => {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ return
+ }
+ name = name ?? DEFAULT_TASK_NAME
+ if (name === DEFAULT_TASK_NAME) {
+ name = workerInfo.taskFunctionsProperties?.[1]?.name
+ }
+ return workerInfo.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
+ }
+
+ /**
+ * Gets worker node task function priority, if any.
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionPriority = (
+ workerNodeKey: number,
+ name?: string
+ ): number | undefined => {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ return
+ }
+ name = name ?? DEFAULT_TASK_NAME
+ if (name === DEFAULT_TASK_NAME) {
+ name = workerInfo.taskFunctionsProperties?.[1]?.name
+ }
+ return workerInfo.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.priority
+ }
+
+ /**
+ * Gets the worker choice strategies registered in this pool.
+ * @returns The worker choice strategies.
+ */
+ private readonly getWorkerChoiceStrategies =
+ (): Set<WorkerChoiceStrategy> => {
+ return new Set([
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.workerChoiceStrategy!,
+ ...(this.listTaskFunctionsProperties()
+ .map(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.strategy
+ )
+ .filter(
+ (strategy: WorkerChoiceStrategy | undefined) => strategy != null
+ ) as WorkerChoiceStrategy[]),
+ ])
+ }
+
+ /** @inheritDoc */
+ public async setDefaultTaskFunction (name: string): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorkers({
+ taskFunctionOperation: 'default',
+ taskFunctionProperties: buildTaskFunctionProperties(
+ name,
+ this.taskFunctions.get(name)
+ ),
+ })
+ }
+
+ private shallExecuteTask (workerNodeKey: number): boolean {
+ return (
+ this.tasksQueueSize(workerNodeKey) === 0 &&
+ this.workerNodes[workerNodeKey].usage.tasks.executing <
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
+ )
+ }
+
+ /** @inheritDoc */
+ public async execute (
+ data?: Data,
+ name?: string,
+ transferList?: readonly TransferListItem[]