refactor: move request handler pool init into cluster worker in
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 13 Aug 2023 16:31:10 +0000 (18:31 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 13 Aug 2023 16:31:10 +0000 (18:31 +0200)
ws-hybrid example

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
examples/typescript/websocket-server-pool/ws-hybrid/src/request-handler-pool.ts [deleted file]
examples/typescript/websocket-server-pool/ws-hybrid/src/websocket-server-worker.ts

diff --git a/examples/typescript/websocket-server-pool/ws-hybrid/src/request-handler-pool.ts b/examples/typescript/websocket-server-pool/ws-hybrid/src/request-handler-pool.ts
deleted file mode 100644 (file)
index 8fecfba..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-import { dirname, extname, join } from 'node:path'
-import { fileURLToPath } from 'node:url'
-import { DynamicThreadPool, availableParallelism } from 'poolifier'
-import {
-  type DataPayload,
-  type ThreadWorkerData,
-  type ThreadWorkerResponse
-} from './types.js'
-
-const requestHandlerWorkerFile = join(
-  dirname(fileURLToPath(import.meta.url)),
-  `request-handler-worker${extname(fileURLToPath(import.meta.url))}`
-)
-
-export const requestHandlerPool = new DynamicThreadPool<
-ThreadWorkerData<DataPayload>,
-ThreadWorkerResponse<DataPayload>
->(1, Math.round(availableParallelism() / 2), requestHandlerWorkerFile, {
-  enableTasksQueue: true,
-  tasksQueueOptions: {
-    concurrency: 8
-  },
-  errorHandler: (e: Error) => {
-    console.error('Thread worker error:', e)
-  }
-})
index dfcba66789934c53b05c2f0ec10af81a24c93804..71cce531df287572f6e0c8a2a9d11feb3fde167d 100644 (file)
-import { ClusterWorker } from 'poolifier'
+import { dirname, extname, join } from 'node:path'
+import { fileURLToPath } from 'node:url'
+import {
+  ClusterWorker,
+  DynamicThreadPool,
+  availableParallelism
+} from 'poolifier'
 import { type RawData, WebSocketServer } from 'ws'
 import {
   type ClusterWorkerData,
   type ClusterWorkerResponse,
   type DataPayload,
   type MessagePayload,
-  MessageType
+  MessageType,
+  type ThreadWorkerData,
+  type ThreadWorkerResponse
 } from './types.js'
-import { requestHandlerPool } from './request-handler-pool.js'
 
 const emptyFunction = (): void => {
   /** Intentional */
 }
 
-const startWebSocketServer = (
-  workerData?: ClusterWorkerData
-): ClusterWorkerResponse => {
-  const { port } = workerData as ClusterWorkerData
-  const wss = new WebSocketServer({ port }, () => {
-    console.info(
-      `⚡️[ws server]: WebSocket server is started on cluster worker at ws://localhost:${port}/`
-    )
-  })
-
-  wss.on('connection', ws => {
-    ws.on('error', console.error)
-    ws.on('message', (message: RawData) => {
-      const { type, data } = JSON.parse(
-        // eslint-disable-next-line @typescript-eslint/no-base-to-string
-        message.toString()
-      ) as MessagePayload<DataPayload>
-      switch (type) {
-        case MessageType.echo:
-          requestHandlerPool
-            .execute({ data }, 'echo')
-            .then(response => {
-              ws.send(
-                JSON.stringify({
-                  type: MessageType.echo,
-                  data: response.data
-                })
-              )
-              return null
-            })
-            .catch(emptyFunction)
-          break
-        case MessageType.factorial:
-          requestHandlerPool
-            .execute({ data }, 'factorial')
-            .then(response => {
-              ws.send(
-                JSON.stringify({
-                  type: MessageType.factorial,
-                  data: response.data
-                })
-              )
-              return null
-            })
-            .catch(emptyFunction)
-          break
-      }
-    })
-  })
-  return {
-    status: true,
-    port: wss.options.port
-  }
-}
-
 class WebSocketServerWorker extends ClusterWorker<
 ClusterWorkerData,
 ClusterWorkerResponse
 > {
+  private static readonly startWebSocketServer = (
+    workerData?: ClusterWorkerData
+  ): ClusterWorkerResponse => {
+    const { port } = workerData as ClusterWorkerData
+    const wss = new WebSocketServer({ port }, () => {
+      console.info(
+        `⚡️[ws server]: WebSocket server is started on cluster worker at ws://localhost:${port}/`
+      )
+    })
+
+    wss.on('connection', ws => {
+      ws.on('error', console.error)
+      ws.on('message', (message: RawData) => {
+        const { type, data } = JSON.parse(
+          // eslint-disable-next-line @typescript-eslint/no-base-to-string
+          message.toString()
+        ) as MessagePayload<DataPayload>
+        switch (type) {
+          case MessageType.echo:
+            this.requestHandlerPool
+              .execute({ data }, 'echo')
+              .then(response => {
+                ws.send(
+                  JSON.stringify({
+                    type: MessageType.echo,
+                    data: response.data
+                  })
+                )
+                return null
+              })
+              .catch(emptyFunction)
+            break
+          case MessageType.factorial:
+            this.requestHandlerPool
+              .execute({ data }, 'factorial')
+              .then(response => {
+                ws.send(
+                  JSON.stringify({
+                    type: MessageType.factorial,
+                    data: response.data
+                  })
+                )
+                return null
+              })
+              .catch(emptyFunction)
+            break
+        }
+      })
+    })
+    return {
+      status: true,
+      port: wss.options.port
+    }
+  }
+
+  private static readonly requestHandlerWorkerFile = join(
+    dirname(fileURLToPath(import.meta.url)),
+    `request-handler-worker${extname(fileURLToPath(import.meta.url))}`
+  )
+
+  private static readonly requestHandlerPool = new DynamicThreadPool<
+  ThreadWorkerData<DataPayload>,
+  ThreadWorkerResponse<DataPayload>
+  >(
+    1,
+    Math.round(availableParallelism() / 2),
+    WebSocketServerWorker.requestHandlerWorkerFile,
+    {
+      enableTasksQueue: true,
+      tasksQueueOptions: {
+        concurrency: 8
+      },
+      errorHandler: (e: Error) => {
+        console.error('Thread worker error:', e)
+      }
+    }
+  )
+
   public constructor () {
-    super(startWebSocketServer)
+    super(WebSocketServerWorker.startWebSocketServer)
   }
 }