From 8ff2efc7b8bc4e459a5c545bfc88daa3437d73e8 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 13 Aug 2023 18:31:10 +0200 Subject: [PATCH] refactor: move request handler pool init into cluster worker in ws-hybrid example MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../ws-hybrid/src/request-handler-pool.ts | 26 --- .../ws-hybrid/src/websocket-server-worker.ts | 148 +++++++++++------- 2 files changed, 89 insertions(+), 85 deletions(-) delete mode 100644 examples/typescript/websocket-server-pool/ws-hybrid/src/request-handler-pool.ts diff --git a/examples/typescript/websocket-server-pool/ws-hybrid/src/request-handler-pool.ts b/examples/typescript/websocket-server-pool/ws-hybrid/src/request-handler-pool.ts deleted file mode 100644 index 8fecfba5..00000000 --- a/examples/typescript/websocket-server-pool/ws-hybrid/src/request-handler-pool.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { dirname, extname, join } from 'node:path' -import { fileURLToPath } from 'node:url' -import { DynamicThreadPool, availableParallelism } from 'poolifier' -import { - type DataPayload, - type ThreadWorkerData, - type ThreadWorkerResponse -} from './types.js' - -const requestHandlerWorkerFile = join( - dirname(fileURLToPath(import.meta.url)), - `request-handler-worker${extname(fileURLToPath(import.meta.url))}` -) - -export const requestHandlerPool = new DynamicThreadPool< -ThreadWorkerData, -ThreadWorkerResponse ->(1, Math.round(availableParallelism() / 2), requestHandlerWorkerFile, { - enableTasksQueue: true, - tasksQueueOptions: { - concurrency: 8 - }, - errorHandler: (e: Error) => { - console.error('Thread worker error:', e) - } -}) diff --git a/examples/typescript/websocket-server-pool/ws-hybrid/src/websocket-server-worker.ts b/examples/typescript/websocket-server-pool/ws-hybrid/src/websocket-server-worker.ts index dfcba667..71cce531 100644 --- a/examples/typescript/websocket-server-pool/ws-hybrid/src/websocket-server-worker.ts +++ b/examples/typescript/websocket-server-pool/ws-hybrid/src/websocket-server-worker.ts @@ -1,79 +1,109 @@ -import { ClusterWorker } from 'poolifier' +import { dirname, extname, join } from 'node:path' +import { fileURLToPath } from 'node:url' +import { + ClusterWorker, + DynamicThreadPool, + availableParallelism +} from 'poolifier' import { type RawData, WebSocketServer } from 'ws' import { type ClusterWorkerData, type ClusterWorkerResponse, type DataPayload, type MessagePayload, - MessageType + MessageType, + type ThreadWorkerData, + type ThreadWorkerResponse } from './types.js' -import { requestHandlerPool } from './request-handler-pool.js' const emptyFunction = (): void => { /** Intentional */ } -const startWebSocketServer = ( - workerData?: ClusterWorkerData -): ClusterWorkerResponse => { - const { port } = workerData as ClusterWorkerData - const wss = new WebSocketServer({ port }, () => { - console.info( - `⚡️[ws server]: WebSocket server is started on cluster worker at ws://localhost:${port}/` - ) - }) - - wss.on('connection', ws => { - ws.on('error', console.error) - ws.on('message', (message: RawData) => { - const { type, data } = JSON.parse( - // eslint-disable-next-line @typescript-eslint/no-base-to-string - message.toString() - ) as MessagePayload - switch (type) { - case MessageType.echo: - requestHandlerPool - .execute({ data }, 'echo') - .then(response => { - ws.send( - JSON.stringify({ - type: MessageType.echo, - data: response.data - }) - ) - return null - }) - .catch(emptyFunction) - break - case MessageType.factorial: - requestHandlerPool - .execute({ data }, 'factorial') - .then(response => { - ws.send( - JSON.stringify({ - type: MessageType.factorial, - data: response.data - }) - ) - return null - }) - .catch(emptyFunction) - break - } - }) - }) - return { - status: true, - port: wss.options.port - } -} - class WebSocketServerWorker extends ClusterWorker< ClusterWorkerData, ClusterWorkerResponse > { + private static readonly startWebSocketServer = ( + workerData?: ClusterWorkerData + ): ClusterWorkerResponse => { + const { port } = workerData as ClusterWorkerData + const wss = new WebSocketServer({ port }, () => { + console.info( + `⚡️[ws server]: WebSocket server is started on cluster worker at ws://localhost:${port}/` + ) + }) + + wss.on('connection', ws => { + ws.on('error', console.error) + ws.on('message', (message: RawData) => { + const { type, data } = JSON.parse( + // eslint-disable-next-line @typescript-eslint/no-base-to-string + message.toString() + ) as MessagePayload + switch (type) { + case MessageType.echo: + this.requestHandlerPool + .execute({ data }, 'echo') + .then(response => { + ws.send( + JSON.stringify({ + type: MessageType.echo, + data: response.data + }) + ) + return null + }) + .catch(emptyFunction) + break + case MessageType.factorial: + this.requestHandlerPool + .execute({ data }, 'factorial') + .then(response => { + ws.send( + JSON.stringify({ + type: MessageType.factorial, + data: response.data + }) + ) + return null + }) + .catch(emptyFunction) + break + } + }) + }) + return { + status: true, + port: wss.options.port + } + } + + private static readonly requestHandlerWorkerFile = join( + dirname(fileURLToPath(import.meta.url)), + `request-handler-worker${extname(fileURLToPath(import.meta.url))}` + ) + + private static readonly requestHandlerPool = new DynamicThreadPool< + ThreadWorkerData, + ThreadWorkerResponse + >( + 1, + Math.round(availableParallelism() / 2), + WebSocketServerWorker.requestHandlerWorkerFile, + { + enableTasksQueue: true, + tasksQueueOptions: { + concurrency: 8 + }, + errorHandler: (e: Error) => { + console.error('Thread worker error:', e) + } + } + ) + public constructor () { - super(startWebSocketServer) + super(WebSocketServerWorker.startWebSocketServer) } } -- 2.34.1