refactor: cleanup internal benchmark code
[poolifier.git] / src / worker / abstract-worker.ts
index b588b181fa1720bb44fc8b76d4dbafe847743364..5390ec029719549aeef5c31791b22e3365eeaa4c 100644 (file)
@@ -1,7 +1,12 @@
 import { AsyncResource } from 'node:async_hooks'
 import type { Worker } from 'node:cluster'
 import type { MessagePort } from 'node:worker_threads'
-import type { MessageValue } from '../utility-types'
+import type {
+  MessageValue,
+  WorkerAsyncFunction,
+  WorkerFunction,
+  WorkerSyncFunction
+} from '../utility-types'
 import { EMPTY_FUNCTION } from '../utils'
 import type { KillBehavior, WorkerOptions } from './worker-options'
 import { KillBehaviors } from './worker-options'
@@ -41,7 +46,7 @@ export abstract class AbstractWorker<
   public constructor (
     type: string,
     protected readonly isMain: boolean,
-    fn: (data: Data) => Response,
+    fn: WorkerFunction<Data, Response>,
     protected mainWorker: MainWorker | undefined | null,
     protected readonly opts: WorkerOptions = {
       /**
@@ -56,8 +61,8 @@ export abstract class AbstractWorker<
     }
   ) {
     super(type)
-    this.checkFunctionInput(fn)
     this.checkWorkerOptions(this.opts)
+    this.checkFunctionInput(fn)
     if (!this.isMain) {
       this.lastTaskTimestamp = performance.now()
       this.aliveInterval = setInterval(
@@ -83,9 +88,9 @@ export abstract class AbstractWorker<
    */
   protected messageListener (
     message: MessageValue<Data, MainWorker>,
-    fn: (data: Data) => Response
+    fn: WorkerFunction<Data, Response>
   ): void {
-    if (message.data != null && message.id != null) {
+    if (message.id != null && message.data != null) {
       // Task message received
       if (this.opts.async === true) {
         this.runInAsyncScope(this.runAsync.bind(this), this, fn, message)
@@ -114,11 +119,16 @@ export abstract class AbstractWorker<
    *
    * @param fn - The function that should be defined.
    */
-  private checkFunctionInput (fn: (data: Data) => Response): void {
+  private checkFunctionInput (fn: WorkerFunction<Data, Response>): void {
     if (fn == null) throw new Error('fn parameter is mandatory')
     if (typeof fn !== 'function') {
       throw new TypeError('fn parameter is not a function')
     }
+    if (fn.constructor.name === 'AsyncFunction' && this.opts.async === false) {
+      throw new Error(
+        'fn parameter is an async function, please set the async option to true'
+      )
+    }
   }
 
   /**
@@ -169,7 +179,7 @@ export abstract class AbstractWorker<
    * @param message - Input data for the given function.
    */
   protected run (
-    fn: (data?: Data) => Response,
+    fn: WorkerSyncFunction<Data, Response>,
     message: MessageValue<Data>
   ): void {
     try {
@@ -196,7 +206,7 @@ export abstract class AbstractWorker<
    * @param message - Input data for the given function.
    */
   protected runAsync (
-    fn: (data?: Data) => Promise<Response>,
+    fn: WorkerAsyncFunction<Data, Response>,
     message: MessageValue<Data>
   ): void {
     const startTimestamp = performance.now()