perf: use worker node key instead of heavy worker reference in hot code path
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 20 Jul 2023 21:44:31 +0000 (23:44 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Thu, 20 Jul 2023 21:44:31 +0000 (23:44 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
src/utility-types.ts

index 77991d7b95b76c67816d01faab881c1711f6c788..00bfaf8eea87f991221664e7cea3059cca4f5457 100644 (file)
@@ -71,10 +71,8 @@ export abstract class AbstractPool<
    *
    * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
    */
-  protected promiseResponseMap: Map<
-  string,
-  PromiseResponseWrapper<Worker, Response>
-  > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
+  protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
+    new Map<string, PromiseResponseWrapper<Response>>()
 
   /**
    * Worker choice strategy context referencing a worker choice algorithm implementation.
@@ -617,7 +615,7 @@ export abstract class AbstractPool<
     return await new Promise<Response>((resolve, reject) => {
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
-      const submittedTask: Task<Data> = {
+      const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
@@ -625,10 +623,10 @@ export abstract class AbstractPool<
         workerId: this.getWorkerInfo(workerNodeKey).id as number,
         id: randomUUID()
       }
-      this.promiseResponseMap.set(submittedTask.id as string, {
+      this.promiseResponseMap.set(task.id as string, {
         resolve,
         reject,
-        worker: this.workerNodes[workerNodeKey].worker
+        workerNodeKey
       })
       if (
         this.opts.enableTasksQueue === true &&
@@ -637,9 +635,9 @@ export abstract class AbstractPool<
             ((this.opts.tasksQueueOptions as TasksQueueOptions)
               .concurrency as number))
       ) {
-        this.enqueueTask(workerNodeKey, submittedTask)
+        this.enqueueTask(workerNodeKey, task)
       } else {
-        this.executeTask(workerNodeKey, submittedTask)
+        this.executeTask(workerNodeKey, task)
       }
       this.checkAndEmitEvents()
     })
@@ -709,14 +707,13 @@ export abstract class AbstractPool<
    * Hook executed after the worker task execution.
    * Can be overridden.
    *
-   * @param worker - The worker.
+   * @param workerNodeKey - The worker node key.
    * @param message - The received message.
    */
   protected afterTaskExecutionHook (
-    worker: Worker,
+    workerNodeKey: number,
     message: MessageValue<Response>
   ): void {
-    const workerNodeKey = this.getWorkerNodeKey(worker)
     const workerUsage = this.workerNodes[workerNodeKey].usage
     this.updateTaskStatisticsWorkerUsage(workerUsage, message)
     this.updateRunTimeWorkerUsage(workerUsage, message)
@@ -1018,9 +1015,9 @@ export abstract class AbstractPool<
       } else {
         promiseResponse.resolve(message.data as Response)
       }
-      this.afterTaskExecutionHook(promiseResponse.worker, message)
+      const workerNodeKey = promiseResponse.workerNodeKey
+      this.afterTaskExecutionHook(workerNodeKey, message)
       this.promiseResponseMap.delete(message.id as string)
-      const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
       if (
         this.opts.enableTasksQueue === true &&
         this.tasksQueueSize(workerNodeKey) > 0
index 2bc99a7b47f14b86b0f43104723b1735f2eed0a1..28e3af081db65595fb985f39e30a930a43d70620 100644 (file)
@@ -179,7 +179,7 @@ export class WorkerChoiceStrategyContext<
   /**
    * Removes the worker node key from the worker choice strategy in the context.
    *
-   * @param workerNodeKey - The key of the worker node.
+   * @param workerNodeKey - The worker node key.
    * @returns `true` if the removal is successful, `false` otherwise.
    */
   public remove (workerNodeKey: number): boolean {
index 91644713c18d7bcc064ed071a4bb5578bda5ca22..c4a8bf115174d6d60bb114975dfabc81f4a75c9f 100644 (file)
@@ -1,7 +1,6 @@
 import type { EventLoopUtilization } from 'node:perf_hooks'
 import type { MessagePort } from 'node:worker_threads'
 import type { KillBehavior } from './worker/worker-options'
-import type { IWorker } from './pools/worker'
 
 /**
  * Task error.
@@ -128,14 +127,10 @@ export interface MessageValue<Data = unknown, ErrorData = unknown>
 /**
  * An object holding the execution response promise resolve/reject callbacks.
  *
- * @typeParam Worker - Type of worker.
  * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
  * @internal
  */
-export interface PromiseResponseWrapper<
-  Worker extends IWorker,
-  Response = unknown
-> {
+export interface PromiseResponseWrapper<Response = unknown> {
   /**
    * Resolve callback to fulfill the promise.
    */
@@ -145,7 +140,7 @@ export interface PromiseResponseWrapper<
    */
   readonly reject: (reason?: unknown) => void
   /**
-   * The worker handling the execution.
+   * The worker node key handling the execution.
    */
-  readonly worker: Worker
+  readonly workerNodeKey: number
 }