72ac2a1baef359eb635543a2e7ac68dcb080e832
[poolifier.git] / examples / typescript / websocket-server-pool / ws-hybrid / src / websocket-server-worker.ts
1 import {
2 availableParallelism,
3 ClusterWorker,
4 DynamicThreadPool,
5 } from 'poolifier'
6 import { type RawData, WebSocketServer } from 'ws'
7
8 import {
9 type ClusterWorkerData,
10 type ClusterWorkerResponse,
11 type DataPayload,
12 type MessagePayload,
13 MessageType,
14 type ThreadWorkerData,
15 type ThreadWorkerResponse,
16 } from './types.js'
17
18 const emptyFunction = (): void => {
19 /* Intentional */
20 }
21
22 class WebSocketServerWorker extends ClusterWorker<
23 ClusterWorkerData,
24 ClusterWorkerResponse
25 > {
26 private static wss: WebSocketServer
27 private static requestHandlerPool: DynamicThreadPool<
28 ThreadWorkerData<DataPayload>,
29 ThreadWorkerResponse<DataPayload>
30 >
31
32 private static readonly startWebSocketServer = (
33 workerData?: ClusterWorkerData
34 ): ClusterWorkerResponse => {
35 const { port, workerFile, minWorkers, maxWorkers, ...poolOptions } =
36 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
37 workerData!
38
39 WebSocketServerWorker.requestHandlerPool = new DynamicThreadPool<
40 ThreadWorkerData<DataPayload>,
41 ThreadWorkerResponse<DataPayload>
42 >(
43 minWorkers ?? 1,
44 maxWorkers ?? availableParallelism(),
45 workerFile,
46 poolOptions
47 )
48
49 WebSocketServerWorker.wss = new WebSocketServer({ port }, () => {
50 console.info(
51 `⚡️[ws server]: WebSocket server is started in cluster worker at ws://localhost:${port.toString()}/`
52 )
53 })
54
55 WebSocketServerWorker.wss.on('connection', ws => {
56 ws.on('error', console.error)
57 ws.on('message', (message: RawData) => {
58 const { type, data } = JSON.parse(
59 // eslint-disable-next-line @typescript-eslint/no-base-to-string
60 message.toString()
61 ) as MessagePayload<DataPayload>
62 switch (type) {
63 case MessageType.echo:
64 WebSocketServerWorker.requestHandlerPool
65 .execute({ data }, 'echo')
66 .then(response => {
67 ws.send(
68 JSON.stringify({
69 type: MessageType.echo,
70 data: response.data,
71 })
72 )
73 return undefined
74 })
75 .catch(emptyFunction)
76 break
77 case MessageType.factorial:
78 WebSocketServerWorker.requestHandlerPool
79 .execute({ data }, 'factorial')
80 .then(response => {
81 ws.send(
82 JSON.stringify(
83 {
84 type: MessageType.factorial,
85 data: response.data,
86 },
87 (_, v: unknown) =>
88 typeof v === 'bigint' ? v.toString() : v
89 )
90 )
91 return undefined
92 })
93 .catch(emptyFunction)
94 break
95 }
96 })
97 })
98 return {
99 status: true,
100 port: WebSocketServerWorker.wss.options.port,
101 }
102 }
103
104 public constructor () {
105 super(WebSocketServerWorker.startWebSocketServer, {
106 killHandler: async () => {
107 await WebSocketServerWorker.requestHandlerPool.destroy()
108 WebSocketServerWorker.wss.close()
109 },
110 })
111 }
112 }
113
114 export const webSocketServerWorker = new WebSocketServerWorker()