12c2190e750d36b77a4a8aca912c62ae37711ffa
[poolifier.git] / examples / typescript / websocket-server-pool / ws-hybrid / src / websocket-server-worker.ts
1 import {
2 ClusterWorker,
3 DynamicThreadPool,
4 availableParallelism
5 } from 'poolifier'
6 import { type RawData, WebSocketServer } from 'ws'
7 import {
8 type ClusterWorkerData,
9 type ClusterWorkerResponse,
10 type DataPayload,
11 type MessagePayload,
12 MessageType,
13 type ThreadWorkerData,
14 type ThreadWorkerResponse
15 } from './types.js'
16
17 const emptyFunction = (): void => {
18 /** Intentional */
19 }
20
21 class WebSocketServerWorker extends ClusterWorker<
22 ClusterWorkerData,
23 ClusterWorkerResponse
24 > {
25 private static wss: WebSocketServer
26 private static requestHandlerPool: DynamicThreadPool<
27 ThreadWorkerData<DataPayload>,
28 ThreadWorkerResponse<DataPayload>
29 >
30
31 private static readonly startWebSocketServer = (
32 workerData?: ClusterWorkerData
33 ): ClusterWorkerResponse => {
34 const { port } = workerData as ClusterWorkerData
35 WebSocketServerWorker.wss = new WebSocketServer({ port }, () => {
36 console.info(
37 `⚡️[ws server]: WebSocket server is started in cluster worker at ws://localhost:${port}/`
38 )
39 })
40
41 WebSocketServerWorker.requestHandlerPool = new DynamicThreadPool<
42 ThreadWorkerData<DataPayload>,
43 ThreadWorkerResponse<DataPayload>
44 >(
45 workerData?.minWorkers ?? 1,
46 workerData?.maxWorkers ?? availableParallelism(),
47 workerData?.workerFile as string
48 )
49
50 WebSocketServerWorker.wss.on('connection', (ws) => {
51 ws.on('error', console.error)
52 ws.on('message', (message: RawData) => {
53 const { type, data } = JSON.parse(
54 // eslint-disable-next-line @typescript-eslint/no-base-to-string
55 message.toString()
56 ) as MessagePayload<DataPayload>
57 switch (type) {
58 case MessageType.echo:
59 WebSocketServerWorker.requestHandlerPool
60 .execute({ data }, 'echo')
61 .then((response) => {
62 ws.send(
63 JSON.stringify({
64 type: MessageType.echo,
65 data: response.data
66 })
67 )
68 return null
69 })
70 .catch(emptyFunction)
71 break
72 case MessageType.factorial:
73 WebSocketServerWorker.requestHandlerPool
74 .execute({ data }, 'factorial')
75 .then((response) => {
76 ws.send(
77 JSON.stringify({
78 type: MessageType.factorial,
79 data: response.data
80 })
81 )
82 return null
83 })
84 .catch(emptyFunction)
85 break
86 }
87 })
88 })
89 return {
90 status: true,
91 port: WebSocketServerWorker.wss.options.port
92 }
93 }
94
95 public constructor () {
96 super(WebSocketServerWorker.startWebSocketServer, {
97 killHandler: async () => {
98 await WebSocketServerWorker.requestHandlerPool.destroy()
99 WebSocketServerWorker.wss.close()
100 }
101 })
102 }
103 }
104
105 export const webSocketServerWorker = new WebSocketServerWorker()