Merge branch 'master' into feature/task-functions
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 17 Sep 2023 13:15:09 +0000 (15:15 +0200)
committerGitHub <noreply@github.com>
Sun, 17 Sep 2023 13:15:09 +0000 (15:15 +0200)
1  2 
CHANGELOG.md
src/pools/abstract-pool.ts
tests/pools/abstract/abstract-pool.test.js

diff --combined CHANGELOG.md
index 570d4e9b2e5eff08336b4a1db9f8a109d1804af8,a6b20e1a0b0ea610fa50af70e616a8a5de12792a..d45b76a15f6e4c105f0d7ca35a593fa2ff325929
@@@ -7,14 -7,18 +7,20 @@@ and this project adheres to [Semantic V
  
  ## [Unreleased]
  
+ ### Fixed
+ - Fix task stealing related tasks queue options handling at runtime.
+ ## [2.6.45] - 2023-09-17
  ### Changed
  
  - Disable publication on GitHub packages registry on release until authentication issue is fixed.
 +- Rename `listTaskFunctions()` to `listTaskFunctionNames()` in pool and worker API.
  
  ### Added
  
 +- Add `addTaskFunction()`, `removeTaskFunction()`, `setDefaultTaskFunction()` methods to pool API.
  - Add `startWorkers` to pool options to whether start the minimum number of workers at pool creation or not.
  - Add `start()` method to pool API to start the minimum number of workers.
  - Add `taskStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing on back pressure or not.
index 6761882589bb14bd9d205c33684ac7e7b3ab5128,eb4c415b4a21be72446bb2da22db83ea9891acb7..0cb4c30ce14ca40b3c2784ffa9cab3e65de0004b
@@@ -21,7 -21,6 +21,7 @@@ import 
    updateMeasurementStatistics
  } from '../utils'
  import { KillBehaviors } from '../worker/worker-options'
 +import type { TaskFunction } from '../worker/task-functions'
  import {
    type IPool,
    PoolEmitter,
@@@ -92,13 -91,6 +92,13 @@@ export abstract class AbstractPool
     */
    protected readonly max?: number
  
 +  /**
 +   * The task functions added at runtime map:
 +   * - `key`: The task function name.
 +   * - `value`: The task function itself.
 +   */
 +  private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
 +
    /**
     * Whether the pool is started or not.
     */
  
      this.setupHook()
  
 +    this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
 +
      this.started = false
      this.starting = false
      if (this.opts.startWorkers === true) {
     * @param workerId - The worker id.
     * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
     */
 -  private getWorkerNodeKeyByWorkerId (workerId: number): number {
 +  private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
      return this.workerNodes.findIndex(
        workerNode => workerNode.info.id === workerId
      )
      tasksQueueOptions?: TasksQueueOptions
    ): void {
      if (this.opts.enableTasksQueue === true && !enable) {
+       this.unsetTaskStealing()
+       this.unsetTasksStealingOnBackPressure()
        this.flushTasksQueues()
      }
      this.opts.enableTasksQueue = enable
        this.opts.tasksQueueOptions =
          this.buildTasksQueueOptions(tasksQueueOptions)
        this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
+       if (this.opts.tasksQueueOptions.taskStealing === true) {
+         this.setTaskStealing()
+       } else {
+         this.unsetTaskStealing()
+       }
+       if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+         this.setTasksStealingOnBackPressure()
+       } else {
+         this.unsetTasksStealingOnBackPressure()
+       }
      } else if (this.opts.tasksQueueOptions != null) {
        delete this.opts.tasksQueueOptions
      }
      }
    }
  
+   private setTaskStealing (): void {
+     for (const [workerNodeKey] of this.workerNodes.entries()) {
+       this.workerNodes[workerNodeKey].onEmptyQueue =
+         this.taskStealingOnEmptyQueue.bind(this)
+     }
+   }
+   private unsetTaskStealing (): void {
+     for (const [workerNodeKey] of this.workerNodes.entries()) {
+       delete this.workerNodes[workerNodeKey].onEmptyQueue
+     }
+   }
+   private setTasksStealingOnBackPressure (): void {
+     for (const [workerNodeKey] of this.workerNodes.entries()) {
+       this.workerNodes[workerNodeKey].onBackPressure =
+         this.tasksStealingOnBackPressure.bind(this)
+     }
+   }
+   private unsetTasksStealingOnBackPressure (): void {
+     for (const [workerNodeKey] of this.workerNodes.entries()) {
+       delete this.workerNodes[workerNodeKey].onBackPressure
+     }
+   }
    private buildTasksQueueOptions (
      tasksQueueOptions: TasksQueueOptions
    ): TasksQueueOptions {
      )
    }
  
 +  private async sendTaskFunctionOperationToWorker (
 +    workerNodeKey: number,
 +    message: MessageValue<Data>
 +  ): Promise<boolean> {
 +    const workerId = this.getWorkerInfo(workerNodeKey).id as number
 +    return await new Promise<boolean>((resolve, reject) => {
 +      this.registerWorkerMessageListener(workerNodeKey, message => {
 +        if (
 +          message.workerId === workerId &&
 +          message.taskFunctionOperationStatus === true
 +        ) {
 +          resolve(true)
 +        } else if (
 +          message.workerId === workerId &&
 +          message.taskFunctionOperationStatus === false
 +        ) {
 +          reject(
 +            new Error(
 +              `Task function operation ${
 +                message.taskFunctionOperation as string
 +              } failed on worker ${message.workerId}`
 +            )
 +          )
 +        }
 +      })
 +      this.sendToWorker(workerNodeKey, message)
 +    })
 +  }
 +
 +  private async sendTaskFunctionOperationToWorkers (
 +    message: Omit<MessageValue<Data>, 'workerId'>
 +  ): Promise<boolean> {
 +    return await new Promise<boolean>((resolve, reject) => {
 +      const responsesReceived = new Array<MessageValue<Data | Response>>()
 +      for (const [workerNodeKey] of this.workerNodes.entries()) {
 +        this.registerWorkerMessageListener(workerNodeKey, message => {
 +          if (message.taskFunctionOperationStatus != null) {
 +            responsesReceived.push(message)
 +            if (
 +              responsesReceived.length === this.workerNodes.length &&
 +              responsesReceived.every(
 +                message => message.taskFunctionOperationStatus === true
 +              )
 +            ) {
 +              resolve(true)
 +            } else if (
 +              responsesReceived.length === this.workerNodes.length &&
 +              responsesReceived.some(
 +                message => message.taskFunctionOperationStatus === false
 +              )
 +            ) {
 +              reject(
 +                new Error(
 +                  `Task function operation ${
 +                    message.taskFunctionOperation as string
 +                  } failed on worker ${message.workerId as number}`
 +                )
 +              )
 +            }
 +          }
 +        })
 +        this.sendToWorker(workerNodeKey, message)
 +      }
 +    })
 +  }
 +
 +  /** @inheritDoc */
 +  public hasTaskFunction (name: string): boolean {
 +    for (const workerNode of this.workerNodes) {
 +      if (
 +        Array.isArray(workerNode.info.taskFunctionNames) &&
 +        workerNode.info.taskFunctionNames.includes(name)
 +      ) {
 +        return true
 +      }
 +    }
 +    return false
 +  }
 +
 +  /** @inheritDoc */
 +  public async addTaskFunction (
 +    name: string,
 +    taskFunction: TaskFunction<Data, Response>
 +  ): Promise<boolean> {
 +    this.taskFunctions.set(name, taskFunction)
 +    return await this.sendTaskFunctionOperationToWorkers({
 +      taskFunctionOperation: 'add',
 +      taskFunctionName: name,
 +      taskFunction: taskFunction.toString()
 +    })
 +  }
 +
 +  /** @inheritDoc */
 +  public async removeTaskFunction (name: string): Promise<boolean> {
 +    this.taskFunctions.delete(name)
 +    return await this.sendTaskFunctionOperationToWorkers({
 +      taskFunctionOperation: 'remove',
 +      taskFunctionName: name
 +    })
 +  }
 +
    /** @inheritDoc */
 -  public listTaskFunctions (): string[] {
 +  public listTaskFunctionNames (): string[] {
      for (const workerNode of this.workerNodes) {
        if (
 -        Array.isArray(workerNode.info.taskFunctions) &&
 -        workerNode.info.taskFunctions.length > 0
 +        Array.isArray(workerNode.info.taskFunctionNames) &&
 +        workerNode.info.taskFunctionNames.length > 0
        ) {
 -        return workerNode.info.taskFunctions
 +        return workerNode.info.taskFunctionNames
        }
      }
      return []
    }
  
 +  /** @inheritDoc */
 +  public async setDefaultTaskFunction (name: string): Promise<boolean> {
 +    return await this.sendTaskFunctionOperationToWorkers({
 +      taskFunctionOperation: 'default',
 +      taskFunctionName: name
 +    })
 +  }
 +
    private shallExecuteTask (workerNodeKey: number): boolean {
      return (
        this.tasksQueueSize(workerNodeKey) === 0 &&
          data: data ?? ({} as Data),
          transferList,
          timestamp,
 -        workerId: this.getWorkerInfo(workerNodeKey).id as number,
          taskId: randomUUID()
        }
        this.promiseResponseMap.set(task.taskId as string, {
    }
  
    protected async sendKillMessageToWorker (
 -    workerNodeKey: number,
 -    workerId: number
 +    workerNodeKey: number
    ): Promise<void> {
      await new Promise<void>((resolve, reject) => {
        this.registerWorkerMessageListener(workerNodeKey, message => {
          if (message.kill === 'success') {
            resolve()
          } else if (message.kill === 'failure') {
 -          reject(new Error(`Worker ${workerId} kill message handling failed`))
 +          reject(
 +            new Error(
 +              `Worker ${
 +                message.workerId as number
 +              } kill message handling failed`
 +            )
 +          )
          }
        })
 -      this.sendToWorker(workerNodeKey, { kill: true, workerId })
 +      this.sendToWorker(workerNodeKey, { kill: true })
      })
    }
  
      const workerInfo = this.getWorkerInfo(workerNodeKey)
      return (
        workerInfo != null &&
 -      Array.isArray(workerInfo.taskFunctions) &&
 -      workerInfo.taskFunctions.length > 2
 +      Array.isArray(workerInfo.taskFunctionNames) &&
 +      workerInfo.taskFunctionNames.length > 2
      )
    }
  
      ) {
        --workerTaskStatistics.executing
      }
 -    if (message.taskError == null) {
 +    if (message.workerError == null) {
        ++workerTaskStatistics.executed
      } else {
        ++workerTaskStatistics.failed
      workerUsage: WorkerUsage,
      message: MessageValue<Response>
    ): void {
 -    if (message.taskError != null) {
 +    if (message.workerError != null) {
        return
      }
      updateMeasurementStatistics(
      workerUsage: WorkerUsage,
      message: MessageValue<Response>
    ): void {
 -    if (message.taskError != null) {
 +    if (message.workerError != null) {
        return
      }
      const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
      })
      const workerInfo = this.getWorkerInfo(workerNodeKey)
      this.sendToWorker(workerNodeKey, {
 -      checkActive: true,
 -      workerId: workerInfo.id as number
 +      checkActive: true
      })
 +    if (this.taskFunctions.size > 0) {
 +      for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
 +        this.sendTaskFunctionOperationToWorker(workerNodeKey, {
 +          taskFunctionOperation: 'add',
 +          taskFunctionName,
 +          taskFunction: taskFunction.toString()
 +        }).catch(error => {
 +          this.emitter?.emit(PoolEvents.error, error)
 +        })
 +      }
 +    }
      workerInfo.dynamic = true
      if (
        this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
              .runTime.aggregate,
          elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
            .elu.aggregate
 -      },
 -      workerId: this.getWorkerInfo(workerNodeKey).id as number
 +      }
      })
    }
  
          },
          0
        )
 -      const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
 -      const task = {
 -        ...(this.dequeueTask(workerNodeKey) as Task<Data>),
 -        workerId: destinationWorkerNode.info.id as number
 -      }
 +      const task = this.dequeueTask(workerNodeKey) as Task<Data>
        if (this.shallExecuteTask(destinationWorkerNodeKey)) {
          this.executeTask(destinationWorkerNodeKey, task)
        } else {
  
    private taskStealingOnEmptyQueue (workerId: number): void {
      const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
 -    const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
      const workerNodes = this.workerNodes
        .slice()
        .sort(
          workerNode.usage.tasks.queued > 0
      )
      if (sourceWorkerNode != null) {
 -      const task = {
 -        ...(sourceWorkerNode.popTask() as Task<Data>),
 -        workerId: destinationWorkerNode.info.id as number
 -      }
 +      const task = sourceWorkerNode.popTask() as Task<Data>
        if (this.shallExecuteTask(destinationWorkerNodeKey)) {
          this.executeTask(destinationWorkerNodeKey, task)
        } else {
          workerNode.usage.tasks.queued <
            (this.opts.tasksQueueOptions?.size as number) - sizeOffset
        ) {
 -        const task = {
 -          ...(sourceWorkerNode.popTask() as Task<Data>),
 -          workerId: workerNode.info.id as number
 -        }
 +        const task = sourceWorkerNode.popTask() as Task<Data>
          if (this.shallExecuteTask(workerNodeKey)) {
            this.executeTask(workerNodeKey, task)
          } else {
    protected workerListener (): (message: MessageValue<Response>) => void {
      return message => {
        this.checkMessageWorkerId(message)
 -      if (message.ready != null && message.taskFunctions != null) {
 +      if (message.ready != null && message.taskFunctionNames != null) {
          // Worker ready response received from worker
          this.handleWorkerReadyResponse(message)
        } else if (message.taskId != null) {
          // Task execution response received from worker
          this.handleTaskExecutionResponse(message)
 -      } else if (message.taskFunctions != null) {
 -        // Task functions message received from worker
 +      } else if (message.taskFunctionNames != null) {
 +        // Task function names message received from worker
          this.getWorkerInfo(
            this.getWorkerNodeKeyByWorkerId(message.workerId)
 -        ).taskFunctions = message.taskFunctions
 +        ).taskFunctionNames = message.taskFunctionNames
        }
      }
    }
  
    private handleWorkerReadyResponse (message: MessageValue<Response>): void {
      if (message.ready === false) {
 -      throw new Error(`Worker ${message.workerId} failed to initialize`)
 +      throw new Error(
 +        `Worker ${message.workerId as number} failed to initialize`
 +      )
      }
      const workerInfo = this.getWorkerInfo(
        this.getWorkerNodeKeyByWorkerId(message.workerId)
      )
      workerInfo.ready = message.ready as boolean
 -    workerInfo.taskFunctions = message.taskFunctions
 +    workerInfo.taskFunctionNames = message.taskFunctionNames
      if (this.emitter != null && this.ready) {
        this.emitter.emit(PoolEvents.ready, this.info)
      }
    }
  
    private handleTaskExecutionResponse (message: MessageValue<Response>): void {
 -    const { taskId, taskError, data } = message
 +    const { taskId, workerError, data } = message
      const promiseResponse = this.promiseResponseMap.get(taskId as string)
      if (promiseResponse != null) {
 -      if (taskError != null) {
 -        this.emitter?.emit(PoolEvents.taskError, taskError)
 -        promiseResponse.reject(taskError.message)
 +      if (workerError != null) {
 +        this.emitter?.emit(PoolEvents.taskError, workerError)
 +        promiseResponse.reject(workerError.message)
        } else {
          promiseResponse.resolve(data as Response)
        }
index 46f48be526ca3832fd9f46dcde13bb9779f86eea,aeede25d7149a9c5918626ae9e860e7733cea4b4..fef19a00a783da5a87ed521abae20bb42d5cb9c5
@@@ -630,6 -630,10 +630,10 @@@ describe('Abstract pool test suite', (
      )
      expect(pool.opts.enableTasksQueue).toBe(false)
      expect(pool.opts.tasksQueueOptions).toBeUndefined()
+     for (const workerNode of pool.workerNodes) {
+       expect(workerNode.onEmptyQueue).toBeUndefined()
+       expect(workerNode.onBackPressure).toBeUndefined()
+     }
      pool.enableTasksQueue(true)
      expect(pool.opts.enableTasksQueue).toBe(true)
      expect(pool.opts.tasksQueueOptions).toStrictEqual({
        taskStealing: true,
        tasksStealingOnBackPressure: true
      })
+     for (const workerNode of pool.workerNodes) {
+       expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
+       expect(workerNode.onBackPressure).toBeInstanceOf(Function)
+     }
      pool.enableTasksQueue(true, { concurrency: 2 })
      expect(pool.opts.enableTasksQueue).toBe(true)
      expect(pool.opts.tasksQueueOptions).toStrictEqual({
        taskStealing: true,
        tasksStealingOnBackPressure: true
      })
+     for (const workerNode of pool.workerNodes) {
+       expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
+       expect(workerNode.onBackPressure).toBeInstanceOf(Function)
+     }
      pool.enableTasksQueue(false)
      expect(pool.opts.enableTasksQueue).toBe(false)
      expect(pool.opts.tasksQueueOptions).toBeUndefined()
+     for (const workerNode of pool.workerNodes) {
+       expect(workerNode.onEmptyQueue).toBeUndefined()
+       expect(workerNode.onBackPressure).toBeUndefined()
+     }
      await pool.destroy()
    })
  
        taskStealing: true,
        tasksStealingOnBackPressure: true
      })
-     pool.setTasksQueueOptions({ concurrency: 2 })
+     for (const workerNode of pool.workerNodes) {
+       expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
+       expect(workerNode.onBackPressure).toBeInstanceOf(Function)
+     }
+     pool.setTasksQueueOptions({
+       concurrency: 2,
+       taskStealing: false,
+       tasksStealingOnBackPressure: false
+     })
      expect(pool.opts.tasksQueueOptions).toStrictEqual({
        concurrency: 2,
        size: 4,
+       taskStealing: false,
+       tasksStealingOnBackPressure: false
+     })
+     for (const workerNode of pool.workerNodes) {
+       expect(workerNode.onEmptyQueue).toBeUndefined()
+       expect(workerNode.onBackPressure).toBeUndefined()
+     }
+     pool.setTasksQueueOptions({
+       concurrency: 1,
+       taskStealing: true,
+       tasksStealingOnBackPressure: true
+     })
+     expect(pool.opts.tasksQueueOptions).toStrictEqual({
+       concurrency: 1,
+       size: 4,
        taskStealing: true,
        tasksStealingOnBackPressure: true
      })
+     for (const workerNode of pool.workerNodes) {
+       expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
+       expect(workerNode.onBackPressure).toBeInstanceOf(Function)
+     }
      expect(() =>
        pool.setTasksQueueOptions('invalidTasksQueueOptions')
      ).toThrowError(
        './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
      )
      await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
 -    expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
 +    expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
        DEFAULT_TASK_NAME,
        'jsonIntegerSerialization',
        'factorial',
        './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
      )
      await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
 -    expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
 +    expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
        DEFAULT_TASK_NAME,
        'jsonIntegerSerialization',
        'factorial',
      expect(pool.info.executingTasks).toBe(0)
      expect(pool.info.executedTasks).toBe(4)
      for (const workerNode of pool.workerNodes) {
 -      expect(workerNode.info.taskFunctions).toStrictEqual([
 +      expect(workerNode.info.taskFunctionNames).toStrictEqual([
          DEFAULT_TASK_NAME,
          'jsonIntegerSerialization',
          'factorial',
          'fibonacci'
        ])
        expect(workerNode.taskFunctionsUsage.size).toBe(3)
 -      for (const name of pool.listTaskFunctions()) {
 +      for (const name of pool.listTaskFunctionNames()) {
          expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
            tasks: {
              executed: expect.any(Number),
        expect(
          workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
        ).toStrictEqual(
 -        workerNode.getTaskFunctionWorkerUsage(workerNode.info.taskFunctions[1])
 +        workerNode.getTaskFunctionWorkerUsage(
 +          workerNode.info.taskFunctionNames[1]
 +        )
        )
      }
      await pool.destroy()