chore: v3.1.8
[poolifier.git] / src / worker / abstract-worker.ts
index fde55a588e85413d1f6bad59d0abffee25d39c73..e2b16e25495eafb6a70a8e50ac2fa0e208583d0e 100644 (file)
@@ -1,4 +1,3 @@
-import { AsyncResource } from 'node:async_hooks'
 import type { Worker } from 'node:cluster'
 import type { MessagePort } from 'node:worker_threads'
 import { performance } from 'node:perf_hooks'
@@ -22,6 +21,11 @@ import type {
   TaskFunctions,
   TaskSyncFunction
 } from './task-functions'
+import {
+  checkTaskFunctionName,
+  checkValidTaskFunctionEntry,
+  checkValidWorkerOptions
+} from './utils'
 
 const DEFAULT_MAX_INACTIVE_TIME = 60000
 const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
@@ -51,7 +55,7 @@ export abstract class AbstractWorker<
   MainWorker extends Worker | MessagePort,
   Data = unknown,
   Response = unknown
-> extends AsyncResource {
+> {
   /**
    * Worker id.
    */
@@ -72,86 +76,37 @@ export abstract class AbstractWorker<
    * Handler id of the `activeInterval` worker activity check.
    */
   protected activeInterval?: NodeJS.Timeout
+
   /**
    * Constructs a new poolifier worker.
    *
-   * @param type - The type of async event.
    * @param isMain - Whether this is the main worker or not.
    * @param mainWorker - Reference to main worker.
    * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
    * @param opts - Options for the worker.
    */
   public constructor (
-    type: string,
     protected readonly isMain: boolean,
     private readonly mainWorker: MainWorker,
     taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
     protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS
   ) {
-    super(type)
     if (this.isMain == null) {
       throw new Error('isMain parameter is mandatory')
     }
     this.checkTaskFunctions(taskFunctions)
     this.checkWorkerOptions(this.opts)
     if (!this.isMain) {
+      // Should be once() but Node.js on windows has a bug that prevents it from working
       this.getMainWorker().on('message', this.handleReadyMessage.bind(this))
     }
   }
 
   private checkWorkerOptions (opts: WorkerOptions): void {
-    if (opts != null && !isPlainObject(opts)) {
-      throw new TypeError('opts worker options parameter is not a plain object')
-    }
-    if (
-      opts?.killBehavior != null &&
-      !Object.values(KillBehaviors).includes(opts.killBehavior)
-    ) {
-      throw new TypeError(
-        `killBehavior option '${opts.killBehavior}' is not valid`
-      )
-    }
-    if (
-      opts?.maxInactiveTime != null &&
-      !Number.isSafeInteger(opts.maxInactiveTime)
-    ) {
-      throw new TypeError('maxInactiveTime option is not an integer')
-    }
-    if (opts?.maxInactiveTime != null && opts.maxInactiveTime < 5) {
-      throw new TypeError(
-        'maxInactiveTime option is not a positive integer greater or equal than 5'
-      )
-    }
-    if (opts?.killHandler != null && typeof opts.killHandler !== 'function') {
-      throw new TypeError('killHandler option is not a function')
-    }
-    if (opts?.async != null) {
-      throw new Error('async option is deprecated')
-    }
+    checkValidWorkerOptions(opts)
     this.opts = { ...DEFAULT_WORKER_OPTIONS, ...opts }
   }
 
-  private checkValidTaskFunctionEntry (
-    name: string,
-    fn: TaskFunction<Data, Response>
-  ): void {
-    if (typeof name !== 'string') {
-      throw new TypeError(
-        'A taskFunctions parameter object key is not a string'
-      )
-    }
-    if (typeof name === 'string' && name.trim().length === 0) {
-      throw new TypeError(
-        'A taskFunctions parameter object key is an empty string'
-      )
-    }
-    if (typeof fn !== 'function') {
-      throw new TypeError(
-        'A taskFunctions parameter object value is not a function'
-      )
-    }
-  }
-
   /**
    * Checks if the `taskFunctions` parameter is passed to the constructor and valid.
    *
@@ -177,7 +132,7 @@ export abstract class AbstractWorker<
     } else if (isPlainObject(taskFunctions)) {
       let firstEntry = true
       for (const [name, fn] of Object.entries(taskFunctions)) {
-        this.checkValidTaskFunctionEntry(name, fn)
+        checkValidTaskFunctionEntry<Data, Response>(name, fn)
         const boundFn = fn.bind(this)
         if (firstEntry) {
           this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
@@ -203,7 +158,7 @@ export abstract class AbstractWorker<
    */
   public hasTaskFunction (name: string): TaskFunctionOperationResult {
     try {
-      this.checkTaskFunctionName(name)
+      checkTaskFunctionName(name)
     } catch (error) {
       return { status: false, error: error as Error }
     }
@@ -223,7 +178,7 @@ export abstract class AbstractWorker<
     fn: TaskFunction<Data, Response>
   ): TaskFunctionOperationResult {
     try {
-      this.checkTaskFunctionName(name)
+      checkTaskFunctionName(name)
       if (name === DEFAULT_TASK_NAME) {
         throw new Error(
           'Cannot add a task function with the default reserved name'
@@ -255,7 +210,7 @@ export abstract class AbstractWorker<
    */
   public removeTaskFunction (name: string): TaskFunctionOperationResult {
     try {
-      this.checkTaskFunctionName(name)
+      checkTaskFunctionName(name)
       if (name === DEFAULT_TASK_NAME) {
         throw new Error(
           'Cannot remove the task function with the default reserved name'
@@ -311,7 +266,7 @@ export abstract class AbstractWorker<
    */
   public setDefaultTaskFunction (name: string): TaskFunctionOperationResult {
     try {
-      this.checkTaskFunctionName(name)
+      checkTaskFunctionName(name)
       if (name === DEFAULT_TASK_NAME) {
         throw new Error(
           'Cannot set the default task function reserved name as the default task function'
@@ -333,15 +288,6 @@ export abstract class AbstractWorker<
     }
   }
 
-  private checkTaskFunctionName (name: string): void {
-    if (typeof name !== 'string') {
-      throw new TypeError('name parameter is not a string')
-    }
-    if (typeof name === 'string' && name.trim().length === 0) {
-      throw new TypeError('name parameter is an empty string')
-    }
-  }
-
   /**
    * Handles the ready message sent by the main worker.
    *
@@ -419,21 +365,17 @@ export abstract class AbstractWorker<
    *
    * @param message - The kill message.
    */
-  protected handleKillMessage (message: MessageValue<Data>): void {
+  protected handleKillMessage (_message: MessageValue<Data>): void {
     this.stopCheckActive()
     if (isAsyncFunction(this.opts.killHandler)) {
       (this.opts.killHandler?.() as Promise<void>)
         .then(() => {
           this.sendToMainWorker({ kill: 'success' })
-          return null
+          return undefined
         })
         .catch(() => {
           this.sendToMainWorker({ kill: 'failure' })
         })
-        .finally(() => {
-          this.emitDestroy()
-        })
-        .catch(EMPTY_FUNCTION)
     } else {
       try {
         // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
@@ -441,8 +383,6 @@ export abstract class AbstractWorker<
         this.sendToMainWorker({ kill: 'success' })
       } catch {
         this.sendToMainWorker({ kill: 'failure' })
-      } finally {
-        this.emitDestroy()
       }
     }
   }
@@ -456,7 +396,7 @@ export abstract class AbstractWorker<
   private checkMessageWorkerId (message: MessageValue<Data>): void {
     if (message.workerId == null) {
       throw new Error('Message worker id is not set')
-    } else if (message.workerId != null && message.workerId !== this.id) {
+    } else if (message.workerId !== this.id) {
       throw new Error(
         `Message worker id ${message.workerId} does not match the worker id ${this.id}`
       )
@@ -541,12 +481,11 @@ export abstract class AbstractWorker<
    * Runs the given task.
    *
    * @param task - The task to execute.
-   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
    */
-  protected run (task: Task<Data>): void {
+  protected readonly run = (task: Task<Data>): void => {
     const { name, taskId, data } = task
-    const fn = this.taskFunctions.get(name ?? DEFAULT_TASK_NAME)
-    if (fn == null) {
+    const taskFunctionName = name ?? DEFAULT_TASK_NAME
+    if (!this.taskFunctions.has(taskFunctionName)) {
       this.sendToMainWorker({
         workerError: {
           name: name as string,
@@ -557,10 +496,11 @@ export abstract class AbstractWorker<
       })
       return
     }
+    const fn = this.taskFunctions.get(taskFunctionName)
     if (isAsyncFunction(fn)) {
-      this.runInAsyncScope(this.runAsync.bind(this), this, fn, task)
+      this.runAsync(fn as TaskAsyncFunction<Data, Response>, task)
     } else {
-      this.runInAsyncScope(this.runSync.bind(this), this, fn, task)
+      this.runSync(fn as TaskSyncFunction<Data, Response>, task)
     }
   }
 
@@ -570,10 +510,10 @@ export abstract class AbstractWorker<
    * @param fn - Task function that will be executed.
    * @param task - Input data for the task function.
    */
-  protected runSync (
+  protected readonly runSync = (
     fn: TaskSyncFunction<Data, Response>,
     task: Task<Data>
-  ): void {
+  ): void => {
     const { name, taskId, data } = task
     try {
       let taskPerformance = this.beginTaskPerformance(name)
@@ -604,10 +544,10 @@ export abstract class AbstractWorker<
    * @param fn - Task function that will be executed.
    * @param task - Input data for the task function.
    */
-  protected runAsync (
+  protected readonly runAsync = (
     fn: TaskAsyncFunction<Data, Response>,
     task: Task<Data>
-  ): void {
+  ): void => {
     const { name, taskId, data } = task
     let taskPerformance = this.beginTaskPerformance(name)
     fn(data)