feat: conditional task performance computation at the worker level
authorJérôme Benoit <jerome.benoit@sap.com>
Tue, 6 Jun 2023 17:56:29 +0000 (19:56 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Tue, 6 Jun 2023 17:56:29 +0000 (19:56 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
21 files changed:
CHANGELOG.md
src/index.ts
src/pools/abstract-pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/least-busy-worker-choice-strategy.ts
src/pools/selection-strategies/least-used-worker-choice-strategy.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
src/pools/worker.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts
src/worker/worker-functions.ts [new file with mode: 0644]
tests/pools/abstract/abstract-pool.test.js
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/selection-strategies/worker-choice-strategy-context.test.js

index d8dc4c7a06c8193e2232560ff306bae62e380f95..dc94cb94b0909ce6bde32179231c6d2a058f9e1c 100644 (file)
@@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Added
+
+- Add Event Loop Utilization (ELU) statistics to worker tasks usage.
+
+### Changed
+
+- Compute statistics at the worker level only if needed.
+
 ## [2.5.3] - 2023-06-04
 
 ### Changed
index 4431b0f98f2617376df1d82937316633267f16de..993cbe933e6109b28a2984a126bd734d2b7f10f1 100644 (file)
@@ -26,7 +26,7 @@ export type {
 export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types'
 export type {
   IWorkerChoiceStrategy,
-  RequiredStatistics,
+  TaskStatistics,
   WorkerChoiceStrategy,
   WorkerChoiceStrategyOptions
 } from './pools/selection-strategies/selection-strategies-types'
@@ -40,13 +40,15 @@ export { ThreadWorker } from './worker/thread-worker'
 export { KillBehaviors } from './worker/worker-options'
 export type { KillBehavior, WorkerOptions } from './worker/worker-options'
 export type {
-  Draft,
-  MessageValue,
-  PromiseResponseWrapper,
   TaskFunctions,
   WorkerAsyncFunction,
   WorkerFunction,
   WorkerSyncFunction
+} from './worker/worker-functions'
+export type {
+  Draft,
+  MessageValue,
+  PromiseResponseWrapper
 } from './utility-types'
 export type { CircularArray } from './circular-array'
 export type { Queue } from './queue'
index 065b228ada73e6d21f6fb8238bddf403cc04ac83..ce645cc87af3c9638cf418cb300af4ea18c2dcca 100644 (file)
@@ -95,12 +95,6 @@ export abstract class AbstractPool<
     this.enqueueTask = this.enqueueTask.bind(this)
     this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
 
-    this.setupHook()
-
-    for (let i = 1; i <= this.numberOfWorkers; i++) {
-      this.createAndSetupWorker()
-    }
-
     if (this.opts.enableEvents === true) {
       this.emitter = new PoolEmitter()
     }
@@ -113,6 +107,12 @@ export abstract class AbstractPool<
       this.opts.workerChoiceStrategy,
       this.opts.workerChoiceStrategyOptions
     )
+
+    this.setupHook()
+
+    for (let i = 1; i <= this.numberOfWorkers; i++) {
+      this.createAndSetupWorker()
+    }
   }
 
   private checkFilePath (filePath: string): void {
@@ -288,6 +288,12 @@ export abstract class AbstractPool<
   ): void {
     this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
     this.opts.workerChoiceStrategy = workerChoiceStrategy
+    this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
+      this.opts.workerChoiceStrategy
+    )
+    if (workerChoiceStrategyOptions != null) {
+      this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+    }
     for (const workerNode of this.workerNodes) {
       this.setWorkerNodeTasksUsage(workerNode, {
         ran: 0,
@@ -303,12 +309,7 @@ export abstract class AbstractPool<
         error: 0,
         elu: undefined
       })
-    }
-    this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      this.opts.workerChoiceStrategy
-    )
-    if (workerChoiceStrategyOptions != null) {
-      this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+      this.setWorkerStatistics(workerNode.worker)
     }
   }
 
@@ -380,13 +381,13 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public async execute (data?: Data, name?: string): Promise<Response> {
-    const submissionTimestamp = performance.now()
+    const timestamp = performance.now()
     const workerNodeKey = this.chooseWorkerNode()
     const submittedTask: Task<Data> = {
       name,
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
       data: data ?? ({} as Data),
-      submissionTimestamp,
+      timestamp,
       id: crypto.randomUUID()
     }
     const res = new Promise<Response>((resolve, reject) => {
@@ -483,17 +484,17 @@ export abstract class AbstractPool<
     workerTasksUsage: TasksUsage,
     message: MessageValue<Response>
   ): void {
-    if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
+    if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) {
       workerTasksUsage.runTime += message.runTime ?? 0
       if (
-        this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
+        this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime &&
         workerTasksUsage.ran !== 0
       ) {
         workerTasksUsage.avgRunTime =
           workerTasksUsage.runTime / workerTasksUsage.ran
       }
       if (
-        this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
+        this.workerChoiceStrategyContext.getTaskStatistics().medRunTime &&
         message.runTime != null
       ) {
         workerTasksUsage.runTimeHistory.push(message.runTime)
@@ -506,17 +507,17 @@ export abstract class AbstractPool<
     workerTasksUsage: TasksUsage,
     message: MessageValue<Response>
   ): void {
-    if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
+    if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) {
       workerTasksUsage.waitTime += message.waitTime ?? 0
       if (
-        this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
+        this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime &&
         workerTasksUsage.ran !== 0
       ) {
         workerTasksUsage.avgWaitTime =
           workerTasksUsage.waitTime / workerTasksUsage.ran
       }
       if (
-        this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
+        this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime &&
         message.waitTime != null
       ) {
         workerTasksUsage.waitTimeHistory.push(message.waitTime)
@@ -529,9 +530,8 @@ export abstract class AbstractPool<
     workerTasksUsage: TasksUsage,
     message: MessageValue<Response>
   ): void {
-    if (this.workerChoiceStrategyContext.getRequiredStatistics().elu) {
+    if (this.workerChoiceStrategyContext.getTaskStatistics().elu) {
       if (workerTasksUsage.elu != null && message.elu != null) {
-        // TODO: cumulative or delta?
         workerTasksUsage.elu = {
           idle: workerTasksUsage.elu.idle + message.elu.idle,
           active: workerTasksUsage.elu.active + message.elu.active,
@@ -638,6 +638,8 @@ export abstract class AbstractPool<
 
     this.pushWorkerNode(worker)
 
+    this.setWorkerStatistics(worker)
+
     this.afterWorkerSetup(worker)
 
     return worker
@@ -800,4 +802,14 @@ export abstract class AbstractPool<
       this.flushTasksQueue(workerNodeKey)
     }
   }
+
+  private setWorkerStatistics (worker: Worker): void {
+    this.sendToWorker(worker, {
+      statistics: {
+        runTime: this.workerChoiceStrategyContext.getTaskStatistics().runTime,
+        waitTime: this.workerChoiceStrategyContext.getTaskStatistics().waitTime,
+        elu: this.workerChoiceStrategyContext.getTaskStatistics().elu
+      }
+    })
+  }
 }
index 9b14f3c58430dd143d8f84cf79b7ee635428eafc..285a68c20c856efb15b084f94b593fc7d2a89dc2 100644 (file)
@@ -4,7 +4,7 @@ import type { IPool } from '../pool'
 import type { IWorker } from '../worker'
 import type {
   IWorkerChoiceStrategy,
-  RequiredStatistics,
+  TaskStatistics,
   WorkerChoiceStrategyOptions
 } from './selection-strategies-types'
 
@@ -25,7 +25,7 @@ export abstract class AbstractWorkerChoiceStrategy<
    */
   private toggleFindLastFreeWorkerNodeKey: boolean = false
   /** @inheritDoc */
-  public readonly requiredStatistics: RequiredStatistics = {
+  public readonly taskStatistics: TaskStatistics = {
     runTime: false,
     avgRunTime: false,
     medRunTime: false,
@@ -48,22 +48,22 @@ export abstract class AbstractWorkerChoiceStrategy<
     this.choose = this.choose.bind(this)
   }
 
-  protected setRequiredStatistics (opts: WorkerChoiceStrategyOptions): void {
-    if (this.requiredStatistics.avgRunTime && opts.medRunTime === true) {
-      this.requiredStatistics.avgRunTime = false
-      this.requiredStatistics.medRunTime = opts.medRunTime as boolean
+  protected setTaskStatistics (opts: WorkerChoiceStrategyOptions): void {
+    if (this.taskStatistics.avgRunTime && opts.medRunTime === true) {
+      this.taskStatistics.avgRunTime = false
+      this.taskStatistics.medRunTime = opts.medRunTime as boolean
     }
-    if (this.requiredStatistics.medRunTime && opts.medRunTime === false) {
-      this.requiredStatistics.avgRunTime = true
-      this.requiredStatistics.medRunTime = opts.medRunTime as boolean
+    if (this.taskStatistics.medRunTime && opts.medRunTime === false) {
+      this.taskStatistics.avgRunTime = true
+      this.taskStatistics.medRunTime = opts.medRunTime as boolean
     }
-    if (this.requiredStatistics.avgWaitTime && opts.medWaitTime === true) {
-      this.requiredStatistics.avgWaitTime = false
-      this.requiredStatistics.medWaitTime = opts.medWaitTime as boolean
+    if (this.taskStatistics.avgWaitTime && opts.medWaitTime === true) {
+      this.taskStatistics.avgWaitTime = false
+      this.taskStatistics.medWaitTime = opts.medWaitTime as boolean
     }
-    if (this.requiredStatistics.medWaitTime && opts.medWaitTime === false) {
-      this.requiredStatistics.avgWaitTime = true
-      this.requiredStatistics.medWaitTime = opts.medWaitTime as boolean
+    if (this.taskStatistics.medWaitTime && opts.medWaitTime === false) {
+      this.taskStatistics.avgWaitTime = true
+      this.taskStatistics.medWaitTime = opts.medWaitTime as boolean
     }
   }
 
@@ -82,7 +82,7 @@ export abstract class AbstractWorkerChoiceStrategy<
   /** @inheritDoc */
   public setOptions (opts: WorkerChoiceStrategyOptions): void {
     opts = opts ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
-    this.setRequiredStatistics(opts)
+    this.setTaskStatistics(opts)
     this.opts = opts
   }
 
@@ -109,7 +109,7 @@ export abstract class AbstractWorkerChoiceStrategy<
    * @returns The worker task runtime.
    */
   protected getWorkerTaskRunTime (workerNodeKey: number): number {
-    return this.requiredStatistics.medRunTime
+    return this.taskStatistics.medRunTime
       ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime
       : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime
   }
@@ -123,7 +123,7 @@ export abstract class AbstractWorkerChoiceStrategy<
    * @returns The worker task wait time.
    */
   protected getWorkerWaitTime (workerNodeKey: number): number {
-    return this.requiredStatistics.medWaitTime
+    return this.taskStatistics.medWaitTime
       ? this.pool.workerNodes[workerNodeKey].tasksUsage.medWaitTime
       : this.pool.workerNodes[workerNodeKey].tasksUsage.avgWaitTime
   }
index a0e7ee1a35d2620b956eb9e240192754434097f8..f6fdc4af462823981ad5e646974548b6acbb2951 100644 (file)
@@ -4,7 +4,7 @@ import type { IWorker } from '../worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
-  RequiredStatistics,
+  TaskStatistics,
   WorkerChoiceStrategyOptions
 } from './selection-strategies-types'
 
@@ -24,7 +24,7 @@ export class FairShareWorkerChoiceStrategy<
   extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
   implements IWorkerChoiceStrategy {
   /** @inheritDoc */
-  public readonly requiredStatistics: RequiredStatistics = {
+  public readonly taskStatistics: TaskStatistics = {
     runTime: true,
     avgRunTime: true,
     medRunTime: false,
@@ -45,7 +45,7 @@ export class FairShareWorkerChoiceStrategy<
     opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
   ) {
     super(pool, opts)
-    this.setRequiredStatistics(this.opts)
+    this.setTaskStatistics(this.opts)
   }
 
   /** @inheritDoc */
index 7248f03a90ae8f9ec599eb3c062eb6349337c95c..d7d78953036f163ad0b02d4fb06910213e226e31 100644 (file)
@@ -45,7 +45,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
     opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
   ) {
     super(pool, opts)
-    this.setRequiredStatistics(this.opts)
+    this.setTaskStatistics(this.opts)
     this.defaultWorkerWeight = this.computeDefaultWorkerWeight()
     this.roundWeights = this.getRoundWeights()
   }
index 2dfb6083d4d85e6a4fe9b58d5a2eddca07e55819..16b2988b93a290e83b9c6954820bbd6c9b5e2727 100644 (file)
@@ -4,7 +4,7 @@ import type { IWorker } from '../worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
-  RequiredStatistics,
+  TaskStatistics,
   WorkerChoiceStrategyOptions
 } from './selection-strategies-types'
 
@@ -23,7 +23,7 @@ export class LeastBusyWorkerChoiceStrategy<
   extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
   implements IWorkerChoiceStrategy {
   /** @inheritDoc */
-  public readonly requiredStatistics: RequiredStatistics = {
+  public readonly taskStatistics: TaskStatistics = {
     runTime: true,
     avgRunTime: false,
     medRunTime: false,
@@ -39,7 +39,7 @@ export class LeastBusyWorkerChoiceStrategy<
     opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
   ) {
     super(pool, opts)
-    this.setRequiredStatistics(this.opts)
+    this.setTaskStatistics(this.opts)
   }
 
   /** @inheritDoc */
index 86a1ef23f1871ca85c0376db73b3517666e977ba..d9e3c20e1d56e951238cdbfc47ab2c4698ca999a 100644 (file)
@@ -27,7 +27,7 @@ export class LeastUsedWorkerChoiceStrategy<
     opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
   ) {
     super(pool, opts)
-    this.setRequiredStatistics(this.opts)
+    this.setTaskStatistics(this.opts)
   }
 
   /** @inheritDoc */
index be4fe3326b9585c8bd4f7261023cda4fe5af710e..afdff788bf69eb999c2c4cc4ec08a59692c28fce 100644 (file)
@@ -32,7 +32,7 @@ export class RoundRobinWorkerChoiceStrategy<
     opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
   ) {
     super(pool, opts)
-    this.setRequiredStatistics(this.opts)
+    this.setTaskStatistics(this.opts)
   }
 
   /** @inheritDoc */
index c8497c276f07d06adc81dcb76e7255747203cb0e..4742d069594fed9324d6c880ef44b921593a7622 100644 (file)
@@ -65,7 +65,7 @@ export interface WorkerChoiceStrategyOptions {
  *
  * @internal
  */
-export interface RequiredStatistics {
+export interface TaskStatistics {
   /**
    * Require tasks runtime.
    */
@@ -101,9 +101,9 @@ export interface RequiredStatistics {
  */
 export interface IWorkerChoiceStrategy {
   /**
-   * Required tasks usage statistics.
+   * Required tasks statistics.
    */
-  readonly requiredStatistics: RequiredStatistics
+  readonly taskStatistics: TaskStatistics
   /**
    * Resets strategy internals.
    *
index 1ea091c6f712be7be7472534692581fd3b56de0f..729dbe66ad90e1a723b2e5bdb77e3c4a1d2b8341 100644 (file)
@@ -4,7 +4,7 @@ import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
-  RequiredStatistics,
+  TaskStatistics,
   WorkerChoiceStrategyOptions
 } from './selection-strategies-types'
 
@@ -24,7 +24,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
   implements IWorkerChoiceStrategy {
   /** @inheritDoc */
-  public readonly requiredStatistics: RequiredStatistics = {
+  public readonly taskStatistics: TaskStatistics = {
     runTime: true,
     avgRunTime: true,
     medRunTime: false,
@@ -53,7 +53,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
     opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
   ) {
     super(pool, opts)
-    this.setRequiredStatistics(this.opts)
+    this.setTaskStatistics(this.opts)
     this.defaultWorkerWeight = this.computeDefaultWorkerWeight()
   }
 
index 43bcb3adee19a9aaedf072e526f0b59235e55b40..3d32dad68eee8b700a6aed3b2a4811876d978427 100644 (file)
@@ -8,7 +8,7 @@ import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strate
 import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
-  RequiredStatistics,
+  TaskStatistics,
   WorkerChoiceStrategy,
   WorkerChoiceStrategyOptions
 } from './selection-strategies-types'
@@ -97,16 +97,16 @@ export class WorkerChoiceStrategyContext<
   }
 
   /**
-   * Gets the worker choice strategy in the context required statistics.
+   * Gets the worker choice strategy task statistics in the context.
    *
-   * @returns The required statistics.
+   * @returns The task statistics.
    */
-  public getRequiredStatistics (): RequiredStatistics {
+  public getTaskStatistics (): TaskStatistics {
     return (
       this.workerChoiceStrategies.get(
         this.workerChoiceStrategy
       ) as IWorkerChoiceStrategy
-    ).requiredStatistics
+    ).taskStatistics
   }
 
   /**
index d10a27c9171c978ce44e7b9f6a1d2a9fb66131db..841a2bcb1513ba3dc017d4b599ef61e3329611ab 100644 (file)
@@ -47,9 +47,9 @@ export interface Task<Data = unknown> {
    */
   readonly data?: Data
   /**
-   * Submission timestamp.
+   * Timestamp.
    */
-  readonly submissionTimestamp?: number
+  readonly timestamp?: number
   /**
    * Message UUID.
    */
index cc29801f9afc81e23d25ce6f9e91e49ed60ae831..9cdf6b0a7e82f9d1e3da5fb434537ca8e2d943f9 100644 (file)
@@ -11,6 +11,15 @@ import type { IWorker, Task } from './pools/worker'
  */
 export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
 
+/**
+ * Performance statistics computation.
+ */
+export interface WorkerStatistics {
+  runTime: boolean
+  waitTime: boolean
+  elu: boolean
+}
+
 /**
  * Message object that is passed between main worker and worker.
  *
@@ -50,54 +59,12 @@ export interface MessageValue<
    * Reference to main worker.
    */
   readonly parent?: MainWorker
+  /**
+   * Whether to compute the given statistics or not.
+   */
+  readonly statistics?: WorkerStatistics
 }
 
-/**
- * Worker synchronous function that can be executed.
- *
- * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
- * @typeParam Response - Type of execution response. This can only be serializable data.
- */
-export type WorkerSyncFunction<Data = unknown, Response = unknown> = (
-  data?: Data
-) => Response
-
-/**
- * Worker asynchronous function that can be executed.
- * This function must return a promise.
- *
- * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
- * @typeParam Response - Type of execution response. This can only be serializable data.
- */
-export type WorkerAsyncFunction<Data = unknown, Response = unknown> = (
-  data?: Data
-) => Promise<Response>
-
-/**
- * Worker function that can be executed.
- * This function can be synchronous or asynchronous.
- *
- * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
- * @typeParam Response - Type of execution response. This can only be serializable data.
- */
-export type WorkerFunction<Data = unknown, Response = unknown> =
-  | WorkerSyncFunction<Data, Response>
-  | WorkerAsyncFunction<Data, Response>
-
-/**
- * Worker functions that can be executed.
- * This object can contain synchronous or asynchronous functions.
- * The key is the name of the function.
- * The value is the function itself.
- *
- * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
- * @typeParam Response - Type of execution response. This can only be serializable data.
- */
-export type TaskFunctions<Data = unknown, Response = unknown> = Record<
-string,
-WorkerFunction<Data, Response>
->
-
 /**
  * An object holding the execution response promise resolve/reject callbacks.
  *
index a484dce12927c1cd29721e9529aa08a822c37b43..f40acf3786540eb350576612c068c4e851968449 100644 (file)
@@ -2,29 +2,32 @@ import { AsyncResource } from 'node:async_hooks'
 import type { Worker } from 'node:cluster'
 import type { MessagePort } from 'node:worker_threads'
 import { type EventLoopUtilization, performance } from 'node:perf_hooks'
-import type {
-  MessageValue,
-  TaskFunctions,
-  WorkerAsyncFunction,
-  WorkerFunction,
-  WorkerSyncFunction
-} from '../utility-types'
+import type { MessageValue, WorkerStatistics } from '../utility-types'
 import { EMPTY_FUNCTION, isPlainObject } from '../utils'
 import {
   type KillBehavior,
   KillBehaviors,
   type WorkerOptions
 } from './worker-options'
+import type {
+  TaskFunctions,
+  WorkerAsyncFunction,
+  WorkerFunction,
+  WorkerSyncFunction
+} from './worker-functions'
 
 const DEFAULT_FUNCTION_NAME = 'default'
 const DEFAULT_MAX_INACTIVE_TIME = 60000
 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT
 
+/**
+ * Task performance.
+ */
 interface TaskPerformance {
   timestamp: number
-  waitTime: number
+  waitTime?: number
   runTime?: number
-  elu: EventLoopUtilization
+  elu?: EventLoopUtilization
 }
 
 /**
@@ -47,6 +50,10 @@ export abstract class AbstractWorker<
    * Timestamp of the last task processed by this worker.
    */
   protected lastTaskTimestamp!: number
+  /**
+   * Performance statistics computation.
+   */
+  protected statistics!: WorkerStatistics
   /**
    * Handler id of the `aliveInterval` worker alive check.
    */
@@ -90,7 +97,6 @@ export abstract class AbstractWorker<
       )
       this.checkAlive.bind(this)()
     }
-
     this.mainWorker?.on('message', this.messageListener.bind(this))
   }
 
@@ -162,6 +168,9 @@ export abstract class AbstractWorker<
       // Kill message received
       this.aliveInterval != null && clearInterval(this.aliveInterval)
       this.emitDestroy()
+    } else if (message.statistics != null) {
+      // Statistics message received
+      this.statistics = message.statistics
     }
   }
 
@@ -292,22 +301,25 @@ export abstract class AbstractWorker<
   }
 
   private beforeTaskRunHook (message: MessageValue<Data>): TaskPerformance {
-    // TODO: conditional accounting
     const timestamp = performance.now()
     return {
       timestamp,
-      waitTime: timestamp - (message.submissionTimestamp ?? 0),
-      elu: performance.eventLoopUtilization()
+      ...(this.statistics.waitTime && {
+        waitTime: timestamp - (message.timestamp ?? timestamp)
+      }),
+      ...(this.statistics.elu && { elu: performance.eventLoopUtilization() })
     }
   }
 
   private afterTaskRunHook (taskPerformance: TaskPerformance): TaskPerformance {
     return {
       ...taskPerformance,
-      ...{
-        runTime: performance.now() - taskPerformance.timestamp,
+      ...(this.statistics.runTime && {
+        runTime: performance.now() - taskPerformance.timestamp
+      }),
+      ...(this.statistics.elu && {
         elu: performance.eventLoopUtilization(taskPerformance.elu)
-      }
+      })
     }
   }
 }
index 0ff15cced17931de93638f41d38803ee59a8116b..2a3961b37f02836d52c7972f98ee150c02904281 100644 (file)
@@ -1,11 +1,8 @@
 import cluster, { type Worker } from 'node:cluster'
-import type {
-  MessageValue,
-  TaskFunctions,
-  WorkerFunction
-} from '../utility-types'
+import type { MessageValue } from '../utility-types'
 import { AbstractWorker } from './abstract-worker'
 import type { WorkerOptions } from './worker-options'
+import type { TaskFunctions, WorkerFunction } from './worker-functions'
 
 /**
  * A cluster worker used by a poolifier `ClusterPool`.
index ded967f106189ef9515a35356a29274d01ca6595..81d98aa5c2fed44c67f84f1ea8145d24e4e3b676 100644 (file)
@@ -1,11 +1,8 @@
 import { type MessagePort, isMainThread, parentPort } from 'node:worker_threads'
-import type {
-  MessageValue,
-  TaskFunctions,
-  WorkerFunction
-} from '../utility-types'
+import type { MessageValue } from '../utility-types'
 import { AbstractWorker } from './abstract-worker'
 import type { WorkerOptions } from './worker-options'
+import type { TaskFunctions, WorkerFunction } from './worker-functions'
 
 /**
  * A thread worker used by a poolifier `ThreadPool`.
diff --git a/src/worker/worker-functions.ts b/src/worker/worker-functions.ts
new file mode 100644 (file)
index 0000000..28b9ed5
--- /dev/null
@@ -0,0 +1,45 @@
+/**
+ * Worker synchronous function that can be executed.
+ *
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
+ */
+export type WorkerSyncFunction<Data = unknown, Response = unknown> = (
+  data?: Data
+) => Response
+
+/**
+ * Worker asynchronous function that can be executed.
+ * This function must return a promise.
+ *
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
+ */
+export type WorkerAsyncFunction<Data = unknown, Response = unknown> = (
+  data?: Data
+) => Promise<Response>
+
+/**
+ * Worker function that can be executed.
+ * This function can be synchronous or asynchronous.
+ *
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
+ */
+export type WorkerFunction<Data = unknown, Response = unknown> =
+  | WorkerSyncFunction<Data, Response>
+  | WorkerAsyncFunction<Data, Response>
+
+/**
+ * Worker functions that can be executed.
+ * This object can contain synchronous or asynchronous functions.
+ * The key is the name of the function.
+ * The value is the function itself.
+ *
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
+ */
+export type TaskFunctions<Data = unknown, Response = unknown> = Record<
+string,
+WorkerFunction<Data, Response>
+>
index b88e489b7f05065b6ad780f454789b853cb8e85f..e85f112797c3267c79f11bf5d43132b2bbe85ff6 100644 (file)
@@ -193,9 +193,7 @@ describe('Abstract pool test suite', () => {
         medWaitTime: false
       })
     }
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: true,
       avgRunTime: true,
       medRunTime: false,
@@ -212,9 +210,7 @@ describe('Abstract pool test suite', () => {
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
     }
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: true,
       avgRunTime: false,
       medRunTime: true,
@@ -231,9 +227,7 @@ describe('Abstract pool test suite', () => {
       .workerChoiceStrategies) {
       expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
     }
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: true,
       avgRunTime: true,
       medRunTime: false,
index e9f5983e9dac1ad5cfbf75d84fa0a7ef715de379..eedee42765e25c475310b10a07e4fb50fee097fc 100644 (file)
@@ -121,9 +121,7 @@ describe('Selection strategies test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { workerChoiceStrategy }
     )
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: false,
       avgRunTime: false,
       medRunTime: false,
@@ -139,9 +137,7 @@ describe('Selection strategies test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { workerChoiceStrategy }
     )
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: false,
       avgRunTime: false,
       medRunTime: false,
@@ -304,9 +300,7 @@ describe('Selection strategies test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { workerChoiceStrategy }
     )
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: false,
       avgRunTime: false,
       medRunTime: false,
@@ -322,9 +316,7 @@ describe('Selection strategies test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { workerChoiceStrategy }
     )
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: false,
       avgRunTime: false,
       medRunTime: false,
@@ -411,9 +403,7 @@ describe('Selection strategies test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { workerChoiceStrategy }
     )
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: true,
       avgRunTime: false,
       medRunTime: false,
@@ -429,9 +419,7 @@ describe('Selection strategies test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { workerChoiceStrategy }
     )
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: true,
       avgRunTime: false,
       medRunTime: false,
@@ -524,9 +512,7 @@ describe('Selection strategies test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { workerChoiceStrategy }
     )
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: true,
       avgRunTime: true,
       medRunTime: false,
@@ -542,9 +528,7 @@ describe('Selection strategies test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { workerChoiceStrategy }
     )
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: true,
       avgRunTime: true,
       medRunTime: false,
@@ -765,9 +749,7 @@ describe('Selection strategies test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { workerChoiceStrategy }
     )
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: true,
       avgRunTime: true,
       medRunTime: false,
@@ -783,9 +765,7 @@ describe('Selection strategies test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { workerChoiceStrategy }
     )
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: true,
       avgRunTime: true,
       medRunTime: false,
@@ -1032,9 +1012,7 @@ describe('Selection strategies test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { workerChoiceStrategy }
     )
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: false,
       avgRunTime: false,
       medRunTime: false,
@@ -1050,9 +1028,7 @@ describe('Selection strategies test suite', () => {
       './tests/worker-files/thread/testWorker.js',
       { workerChoiceStrategy }
     )
-    expect(
-      pool.workerChoiceStrategyContext.getRequiredStatistics()
-    ).toStrictEqual({
+    expect(pool.workerChoiceStrategyContext.getTaskStatistics()).toStrictEqual({
       runTime: false,
       avgRunTime: false,
       medRunTime: false,
index d40fae4e028d8affa6375375b4f6d0d71eeafc47..5e202039f67ba4872c594294d480ed8cecb90da6 100644 (file)
@@ -368,10 +368,10 @@ describe('Worker choice strategy context test suite', () => {
         medRunTime: true
       }
     )
-    expect(workerChoiceStrategyContext.getRequiredStatistics().avgRunTime).toBe(
+    expect(workerChoiceStrategyContext.getTaskStatistics().avgRunTime).toBe(
       false
     )
-    expect(workerChoiceStrategyContext.getRequiredStatistics().medRunTime).toBe(
+    expect(workerChoiceStrategyContext.getTaskStatistics().medRunTime).toBe(
       true
     )
     workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
@@ -381,10 +381,10 @@ describe('Worker choice strategy context test suite', () => {
         medRunTime: true
       }
     )
-    expect(workerChoiceStrategyContext.getRequiredStatistics().avgRunTime).toBe(
+    expect(workerChoiceStrategyContext.getTaskStatistics().avgRunTime).toBe(
       false
     )
-    expect(workerChoiceStrategyContext.getRequiredStatistics().medRunTime).toBe(
+    expect(workerChoiceStrategyContext.getTaskStatistics().medRunTime).toBe(
       true
     )
     const fsWorkerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
@@ -395,10 +395,10 @@ describe('Worker choice strategy context test suite', () => {
         medRunTime: true
       }
     )
-    expect(workerChoiceStrategyContext.getRequiredStatistics().avgRunTime).toBe(
+    expect(workerChoiceStrategyContext.getTaskStatistics().avgRunTime).toBe(
       false
     )
-    expect(workerChoiceStrategyContext.getRequiredStatistics().medRunTime).toBe(
+    expect(workerChoiceStrategyContext.getTaskStatistics().medRunTime).toBe(
       true
     )
     workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
@@ -408,10 +408,10 @@ describe('Worker choice strategy context test suite', () => {
         medRunTime: true
       }
     )
-    expect(workerChoiceStrategyContext.getRequiredStatistics().avgRunTime).toBe(
+    expect(workerChoiceStrategyContext.getTaskStatistics().avgRunTime).toBe(
       false
     )
-    expect(workerChoiceStrategyContext.getRequiredStatistics().medRunTime).toBe(
+    expect(workerChoiceStrategyContext.getTaskStatistics().medRunTime).toBe(
       true
     )
   })