Merge dependabot/npm_and_yarn/examples/typescript/http-server-pool/fastify-hybrid...
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 type MessageChannel,
3 type MessagePort,
4 SHARE_ENV,
5 type TransferListItem,
6 Worker,
7 type WorkerOptions,
8 isMainThread
9 } from 'node:worker_threads'
10 import type { MessageValue } from '../../utility-types'
11 import { AbstractPool } from '../abstract-pool'
12 import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
13 import { type WorkerType, WorkerTypes } from '../worker'
14
15 /**
16 * Options for a poolifier thread pool.
17 */
18 export interface ThreadPoolOptions extends PoolOptions<Worker> {
19 /**
20 * Worker options.
21 *
22 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
23 */
24 workerOptions?: WorkerOptions
25 }
26
27 /**
28 * A thread pool with a fixed number of threads.
29 *
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
32 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
33 * @since 0.0.1
34 */
35 export class FixedThreadPool<
36 Data = unknown,
37 Response = unknown
38 > extends AbstractPool<Worker, Data, Response> {
39 /**
40 * Constructs a new poolifier fixed thread pool.
41 *
42 * @param numberOfThreads - Number of threads for this pool.
43 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
44 * @param opts - Options for this fixed thread pool.
45 */
46 public constructor (
47 numberOfThreads: number,
48 filePath: string,
49 protected readonly opts: ThreadPoolOptions = {}
50 ) {
51 super(numberOfThreads, filePath, opts)
52 }
53
54 /** @inheritDoc */
55 protected isMain (): boolean {
56 return isMainThread
57 }
58
59 /** @inheritDoc */
60 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
61 this.flagWorkerNodeAsNotReady(workerNodeKey)
62 this.flushTasksQueue(workerNodeKey)
63 // FIXME: wait for tasks to be finished
64 const workerNode = this.workerNodes[workerNodeKey]
65 const worker = workerNode.worker
66 const waitWorkerExit = new Promise<void>(resolve => {
67 worker.once('exit', () => {
68 resolve()
69 })
70 })
71 await this.sendKillMessageToWorker(workerNodeKey)
72 workerNode.closeChannel()
73 workerNode.removeAllListeners()
74 await worker.terminate()
75 await waitWorkerExit
76 }
77
78 /** @inheritDoc */
79 protected sendToWorker (
80 workerNodeKey: number,
81 message: MessageValue<Data>,
82 transferList?: TransferListItem[]
83 ): void {
84 this.workerNodes[workerNodeKey].messageChannel?.port1?.postMessage(
85 { ...message, workerId: this.getWorkerInfo(workerNodeKey).id },
86 transferList
87 )
88 }
89
90 /** @inheritDoc */
91 protected sendStartupMessageToWorker (workerNodeKey: number): void {
92 const workerNode = this.workerNodes[workerNodeKey]
93 const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
94 .port2
95 workerNode.worker.postMessage(
96 {
97 ready: false,
98 workerId: this.getWorkerInfo(workerNodeKey).id,
99 port: port2
100 },
101 [port2]
102 )
103 }
104
105 /** @inheritDoc */
106 protected registerWorkerMessageListener<Message extends Data | Response>(
107 workerNodeKey: number,
108 listener: (message: MessageValue<Message>) => void
109 ): void {
110 this.workerNodes[workerNodeKey].messageChannel?.port1?.on(
111 'message',
112 listener
113 )
114 }
115
116 /** @inheritDoc */
117 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
118 workerNodeKey: number,
119 listener: (message: MessageValue<Message>) => void
120 ): void {
121 this.workerNodes[workerNodeKey].messageChannel?.port1?.once(
122 'message',
123 listener
124 )
125 }
126
127 /** @inheritDoc */
128 protected deregisterWorkerMessageListener<Message extends Data | Response>(
129 workerNodeKey: number,
130 listener: (message: MessageValue<Message>) => void
131 ): void {
132 this.workerNodes[workerNodeKey].messageChannel?.port1?.off(
133 'message',
134 listener
135 )
136 }
137
138 /** @inheritDoc */
139 protected createWorker (): Worker {
140 return new Worker(this.filePath, {
141 env: SHARE_ENV,
142 ...this.opts.workerOptions
143 })
144 }
145
146 /** @inheritDoc */
147 protected get type (): PoolType {
148 return PoolTypes.fixed
149 }
150
151 /** @inheritDoc */
152 protected get worker (): WorkerType {
153 return WorkerTypes.thread
154 }
155
156 /** @inheritDoc */
157 protected get busy (): boolean {
158 return this.internalBusy()
159 }
160 }