fix: readd ThreadPoolOptions and ClusterPoolOptions TS type aliases to PoolOptions
[poolifier.git] / examples / typescript / websocket-server-pool / ws-hybrid / src / websocket-server-worker.ts
index 2418be61d0487e9ce43bda018e41a4b8f5117579..46745d9657cc9f700fc3f9ad79789b8323bf0715 100644 (file)
@@ -1,5 +1,3 @@
-import { dirname, extname, join } from 'node:path'
-import { fileURLToPath } from 'node:url'
 import {
   ClusterWorker,
   DynamicThreadPool,
@@ -17,24 +15,42 @@ import {
 } from './types.js'
 
 const emptyFunction = (): void => {
-  /** Intentional */
+  /* Intentional */
 }
 
 class WebSocketServerWorker extends ClusterWorker<
 ClusterWorkerData,
 ClusterWorkerResponse
 > {
+  private static wss: WebSocketServer
+  private static requestHandlerPool: DynamicThreadPool<
+  ThreadWorkerData<DataPayload>,
+  ThreadWorkerResponse<DataPayload>
+  >
+
   private static readonly startWebSocketServer = (
     workerData?: ClusterWorkerData
   ): ClusterWorkerResponse => {
-    const { port } = workerData as ClusterWorkerData
-    const wss = new WebSocketServer({ port }, () => {
+    const { port, workerFile, minWorkers, maxWorkers, ...poolOptions } =
+      workerData as ClusterWorkerData
+
+    WebSocketServerWorker.requestHandlerPool = new DynamicThreadPool<
+    ThreadWorkerData<DataPayload>,
+    ThreadWorkerResponse<DataPayload>
+    >(
+      minWorkers ?? 1,
+      maxWorkers ?? availableParallelism(),
+      workerFile,
+      poolOptions
+    )
+
+    WebSocketServerWorker.wss = new WebSocketServer({ port }, () => {
       console.info(
         `⚡️[ws server]: WebSocket server is started in cluster worker at ws://localhost:${port}/`
       )
     })
 
-    wss.on('connection', ws => {
+    WebSocketServerWorker.wss.on('connection', ws => {
       ws.on('error', console.error)
       ws.on('message', (message: RawData) => {
         const { type, data } = JSON.parse(
@@ -43,7 +59,7 @@ ClusterWorkerResponse
         ) as MessagePayload<DataPayload>
         switch (type) {
           case MessageType.echo:
-            this.requestHandlerPool
+            WebSocketServerWorker.requestHandlerPool
               .execute({ data }, 'echo')
               .then(response => {
                 ws.send(
@@ -52,12 +68,12 @@ ClusterWorkerResponse
                     data: response.data
                   })
                 )
-                return null
+                return undefined
               })
               .catch(emptyFunction)
             break
           case MessageType.factorial:
-            this.requestHandlerPool
+            WebSocketServerWorker.requestHandlerPool
               .execute({ data }, 'factorial')
               .then(response => {
                 ws.send(
@@ -66,7 +82,7 @@ ClusterWorkerResponse
                     data: response.data
                   })
                 )
-                return null
+                return undefined
               })
               .catch(emptyFunction)
             break
@@ -75,35 +91,17 @@ ClusterWorkerResponse
     })
     return {
       status: true,
-      port: wss.options.port
+      port: WebSocketServerWorker.wss.options.port
     }
   }
 
-  private static readonly requestHandlerWorkerFile = join(
-    dirname(fileURLToPath(import.meta.url)),
-    `request-handler-worker${extname(fileURLToPath(import.meta.url))}`
-  )
-
-  private static readonly requestHandlerPool = new DynamicThreadPool<
-  ThreadWorkerData<DataPayload>,
-  ThreadWorkerResponse<DataPayload>
-  >(
-    1,
-    Math.round(availableParallelism() / 2),
-    WebSocketServerWorker.requestHandlerWorkerFile,
-    {
-      enableTasksQueue: true,
-      tasksQueueOptions: {
-        concurrency: 8
-      },
-      errorHandler: (e: Error) => {
-        console.error('Thread worker error:', e)
-      }
-    }
-  )
-
   public constructor () {
-    super(WebSocketServerWorker.startWebSocketServer)
+    super(WebSocketServerWorker.startWebSocketServer, {
+      killHandler: async () => {
+        await WebSocketServerWorker.requestHandlerPool.destroy()
+        WebSocketServerWorker.wss.close()
+      }
+    })
   }
 }