refactor: cleanups
[poolifier.git] / examples / typescript / websocket-server-pool / ws-hybrid / src / websocket-server-worker.ts
1 import {
2 ClusterWorker,
3 DynamicThreadPool,
4 availableParallelism
5 } from 'poolifier'
6 import { type RawData, WebSocketServer } from 'ws'
7 import {
8 type ClusterWorkerData,
9 type ClusterWorkerResponse,
10 type DataPayload,
11 type MessagePayload,
12 MessageType,
13 type ThreadWorkerData,
14 type ThreadWorkerResponse
15 } from './types.js'
16
17 const emptyFunction = (): void => {
18 /* Intentional */
19 }
20
21 class WebSocketServerWorker extends ClusterWorker<
22 ClusterWorkerData,
23 ClusterWorkerResponse
24 > {
25 private static wss: WebSocketServer
26 private static requestHandlerPool: DynamicThreadPool<
27 ThreadWorkerData<DataPayload>,
28 ThreadWorkerResponse<DataPayload>
29 >
30
31 private static readonly startWebSocketServer = (
32 workerData?: ClusterWorkerData
33 ): ClusterWorkerResponse => {
34 const { port, workerFile, minWorkers, maxWorkers, ...poolOptions } =
35 workerData as ClusterWorkerData
36
37 WebSocketServerWorker.requestHandlerPool = new DynamicThreadPool<
38 ThreadWorkerData<DataPayload>,
39 ThreadWorkerResponse<DataPayload>
40 >(
41 minWorkers ?? 1,
42 maxWorkers ?? availableParallelism(),
43 workerFile,
44 poolOptions
45 )
46
47 WebSocketServerWorker.wss = new WebSocketServer({ port }, () => {
48 console.info(
49 `⚡️[ws server]: WebSocket server is started in cluster worker at ws://localhost:${port}/`
50 )
51 })
52
53 WebSocketServerWorker.wss.on('connection', (ws) => {
54 ws.on('error', console.error)
55 ws.on('message', (message: RawData) => {
56 const { type, data } = JSON.parse(
57 // eslint-disable-next-line @typescript-eslint/no-base-to-string
58 message.toString()
59 ) as MessagePayload<DataPayload>
60 switch (type) {
61 case MessageType.echo:
62 WebSocketServerWorker.requestHandlerPool
63 .execute({ data }, 'echo')
64 .then((response) => {
65 ws.send(
66 JSON.stringify({
67 type: MessageType.echo,
68 data: response.data
69 })
70 )
71 return null
72 })
73 .catch(emptyFunction)
74 break
75 case MessageType.factorial:
76 WebSocketServerWorker.requestHandlerPool
77 .execute({ data }, 'factorial')
78 .then((response) => {
79 ws.send(
80 JSON.stringify({
81 type: MessageType.factorial,
82 data: response.data
83 })
84 )
85 return null
86 })
87 .catch(emptyFunction)
88 break
89 }
90 })
91 })
92 return {
93 status: true,
94 port: WebSocketServerWorker.wss.options.port
95 }
96 }
97
98 public constructor () {
99 super(WebSocketServerWorker.startWebSocketServer, {
100 killHandler: async () => {
101 await WebSocketServerWorker.requestHandlerPool.destroy()
102 WebSocketServerWorker.wss.close()
103 }
104 })
105 }
106 }
107
108 export const webSocketServerWorker = new WebSocketServerWorker()