Add repo vscode extensions recommendation (#99)
[poolifier.git] / src / workers.ts
index 990f2208bc0cd506d12776eaf1397035116294c9..57f2fd95bef547161ca510078a5191dd6bd0ee9b 100644 (file)
@@ -1,6 +1,5 @@
-import { isMainThread, parentPort } from 'worker_threads'
-
 import { AsyncResource } from 'async_hooks'
+import { isMainThread, parentPort } from 'worker_threads'
 
 export interface ThreadWorkerOptions {
   /**
@@ -26,12 +25,13 @@ export interface ThreadWorkerOptions {
  * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
  * @since 0.0.1
  */
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
 export class ThreadWorker<Data = any, Response = any> extends AsyncResource {
   protected readonly maxInactiveTime: number
   protected readonly async: boolean
   protected lastTask: number
-  protected readonly interval: NodeJS.Timeout
-  protected parent: any
+  protected readonly interval?: NodeJS.Timeout
+  protected parent?: MessagePort
 
   public constructor (
     fn: (data: Data) => Response,
@@ -39,70 +39,78 @@ export class ThreadWorker<Data = any, Response = any> extends AsyncResource {
   ) {
     super('worker-thread-pool:pioardi')
 
-    this.maxInactiveTime = this.opts.maxInactiveTime || 1000 * 60
+    this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
     this.async = !!this.opts.async
     this.lastTask = Date.now()
     if (!fn) throw new Error('Fn parameter is mandatory')
     // keep the worker active
     if (!isMainThread) {
       this.interval = setInterval(
-        this._checkAlive.bind(this),
+        this.checkAlive.bind(this),
         this.maxInactiveTime / 2
       )
-      this._checkAlive.bind(this)()
+      this.checkAlive.bind(this)()
     }
-    parentPort.on('message', (value) => {
-      if (value && value.data && value._id) {
-        // here you will receive messages
-        // console.log('This is the main thread ' + isMainThread)
-        if (this.async) {
-          this.runInAsyncScope(this._runAsync.bind(this), this, fn, value)
-        } else {
-          this.runInAsyncScope(this._run.bind(this), this, fn, value)
+    parentPort?.on(
+      'message',
+      (value: {
+        data?: Response
+        id?: number
+        parent?: MessagePort
+        kill?: number
+      }) => {
+        if (value?.data && value.id) {
+          // here you will receive messages
+          // console.log('This is the main thread ' + isMainThread)
+          if (this.async) {
+            this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
+          } else {
+            this.runInAsyncScope(this.run.bind(this), this, fn, value)
+          }
+        } else if (value.parent) {
+          // save the port to communicate with the main thread
+          // this will be received once
+          this.parent = value.parent
+        } else if (value.kill) {
+          // here is time to kill this thread, just clearing the interval
+          if (this.interval) clearInterval(this.interval)
+          this.emitDestroy()
         }
-      } else if (value.parent) {
-        // save the port to communicate with the main thread
-        // this will be received once
-        this.parent = value.parent
-      } else if (value.kill) {
-        // here is time to kill this thread, just clearing the interval
-        clearInterval(this.interval)
-        this.emitDestroy()
       }
-    })
+    )
   }
 
-  protected _checkAlive (): void {
+  protected checkAlive (): void {
     if (Date.now() - this.lastTask > this.maxInactiveTime) {
-      this.parent.postMessage({ kill: 1 })
+      this.parent?.postMessage({ kill: 1 })
     }
   }
 
-  protected _run (
+  protected run (
     fn: (data: Data) => Response,
-    value: { readonly data: Data, readonly _id: number }
+    value: { readonly data: Data; readonly id: number }
   ): void {
     try {
       const res = fn(value.data)
-      this.parent.postMessage({ data: res, _id: value._id })
+      this.parent?.postMessage({ data: res, id: value.id })
       this.lastTask = Date.now()
     } catch (e) {
-      this.parent.postMessage({ error: e, _id: value._id })
+      this.parent?.postMessage({ error: e, id: value.id })
       this.lastTask = Date.now()
     }
   }
 
-  protected _runAsync (
+  protected runAsync (
     fn: (data: Data) => Promise<Response>,
-    value: { readonly data: Data, readonly _id: number }
+    value: { readonly data: Data; readonly id: number }
   ): void {
     fn(value.data)
-      .then((res) => {
-        this.parent.postMessage({ data: res, _id: value._id })
+      .then(res => {
+        this.parent?.postMessage({ data: res, id: value.id })
         this.lastTask = Date.now()
       })
-      .catch((e) => {
-        this.parent.postMessage({ error: e, _id: value._id })
+      .catch(e => {
+        this.parent?.postMessage({ error: e, id: value.id })
         this.lastTask = Date.now()
       })
   }