X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=examples%2Ftypescript%2Fwebsocket-server-pool%2Fws-hybrid%2Fsrc%2Fwebsocket-server-worker.ts;h=2a2c450203eac30ac37dbec05db35ffd469ddc0e;hb=d90d599481965e77ee5bfbf913868220b9fb1395;hp=2418be61d0487e9ce43bda018e41a4b8f5117579;hpb=80ccdab6c5ce5e7e6305c15dd29257d0acc9e302;p=poolifier.git diff --git a/examples/typescript/websocket-server-pool/ws-hybrid/src/websocket-server-worker.ts b/examples/typescript/websocket-server-pool/ws-hybrid/src/websocket-server-worker.ts index 2418be61..2a2c4502 100644 --- a/examples/typescript/websocket-server-pool/ws-hybrid/src/websocket-server-worker.ts +++ b/examples/typescript/websocket-server-pool/ws-hybrid/src/websocket-server-worker.ts @@ -1,5 +1,3 @@ -import { dirname, extname, join } from 'node:path' -import { fileURLToPath } from 'node:url' import { ClusterWorker, DynamicThreadPool, @@ -17,24 +15,42 @@ import { } from './types.js' const emptyFunction = (): void => { - /** Intentional */ + /* Intentional */ } class WebSocketServerWorker extends ClusterWorker< ClusterWorkerData, ClusterWorkerResponse > { + private static wss: WebSocketServer + private static requestHandlerPool: DynamicThreadPool< + ThreadWorkerData, + ThreadWorkerResponse + > + private static readonly startWebSocketServer = ( workerData?: ClusterWorkerData ): ClusterWorkerResponse => { - const { port } = workerData as ClusterWorkerData - const wss = new WebSocketServer({ port }, () => { + const { port, workerFile, minWorkers, maxWorkers, ...poolOptions } = + workerData as ClusterWorkerData + + WebSocketServerWorker.requestHandlerPool = new DynamicThreadPool< + ThreadWorkerData, + ThreadWorkerResponse + >( + minWorkers ?? 1, + maxWorkers ?? availableParallelism(), + workerFile, + poolOptions + ) + + WebSocketServerWorker.wss = new WebSocketServer({ port }, () => { console.info( `⚡️[ws server]: WebSocket server is started in cluster worker at ws://localhost:${port}/` ) }) - wss.on('connection', ws => { + WebSocketServerWorker.wss.on('connection', (ws) => { ws.on('error', console.error) ws.on('message', (message: RawData) => { const { type, data } = JSON.parse( @@ -43,9 +59,9 @@ ClusterWorkerResponse ) as MessagePayload switch (type) { case MessageType.echo: - this.requestHandlerPool + WebSocketServerWorker.requestHandlerPool .execute({ data }, 'echo') - .then(response => { + .then((response) => { ws.send( JSON.stringify({ type: MessageType.echo, @@ -57,9 +73,9 @@ ClusterWorkerResponse .catch(emptyFunction) break case MessageType.factorial: - this.requestHandlerPool + WebSocketServerWorker.requestHandlerPool .execute({ data }, 'factorial') - .then(response => { + .then((response) => { ws.send( JSON.stringify({ type: MessageType.factorial, @@ -75,35 +91,17 @@ ClusterWorkerResponse }) return { status: true, - port: wss.options.port + port: WebSocketServerWorker.wss.options.port } } - private static readonly requestHandlerWorkerFile = join( - dirname(fileURLToPath(import.meta.url)), - `request-handler-worker${extname(fileURLToPath(import.meta.url))}` - ) - - private static readonly requestHandlerPool = new DynamicThreadPool< - ThreadWorkerData, - ThreadWorkerResponse - >( - 1, - Math.round(availableParallelism() / 2), - WebSocketServerWorker.requestHandlerWorkerFile, - { - enableTasksQueue: true, - tasksQueueOptions: { - concurrency: 8 - }, - errorHandler: (e: Error) => { - console.error('Thread worker error:', e) - } - } - ) - public constructor () { - super(WebSocketServerWorker.startWebSocketServer) + super(WebSocketServerWorker.startWebSocketServer, { + killHandler: async () => { + await WebSocketServerWorker.requestHandlerPool.destroy() + WebSocketServerWorker.wss.close() + } + }) } }