docs: cleanups
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 MessageChannel,
3 SHARE_ENV,
4 Worker,
5 type WorkerOptions,
6 isMainThread
7 } from 'node:worker_threads'
8 import type { Draft, MessageValue } from '../../utility-types'
9 import { AbstractPool } from '../abstract-pool'
10 import {
11 type PoolOptions,
12 type PoolType,
13 PoolTypes,
14 type WorkerType,
15 WorkerTypes
16 } from '../pool'
17
18 /**
19 * Options for a poolifier thread pool.
20 */
21 export interface ThreadPoolOptions extends PoolOptions<Worker> {
22 /**
23 * Worker options.
24 *
25 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
26 */
27 workerOptions?: WorkerOptions
28 }
29
30 /**
31 * A thread worker with message channels for communication between main thread and thread worker.
32 */
33 export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
34
35 /**
36 * A thread pool with a fixed number of threads.
37 *
38 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
39 *
40 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
41 * @typeParam Response - Type of execution response. This can only be serializable data.
42 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
43 * @since 0.0.1
44 */
45 export class FixedThreadPool<
46 Data = unknown,
47 Response = unknown
48 > extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
49 /**
50 * Constructs a new poolifier fixed thread pool.
51 *
52 * @param numberOfThreads - Number of threads for this pool.
53 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
54 * @param opts - Options for this fixed thread pool.
55 */
56 public constructor (
57 numberOfThreads: number,
58 filePath: string,
59 protected readonly opts: ThreadPoolOptions = {}
60 ) {
61 super(numberOfThreads, filePath, opts)
62 }
63
64 /** @inheritDoc */
65 protected isMain (): boolean {
66 return isMainThread
67 }
68
69 /** @inheritDoc */
70 protected async destroyWorker (
71 worker: ThreadWorkerWithMessageChannel
72 ): Promise<void> {
73 this.sendToWorker(worker, { kill: 1 })
74 await worker.terminate()
75 }
76
77 /** @inheritDoc */
78 protected sendToWorker (
79 worker: ThreadWorkerWithMessageChannel,
80 message: MessageValue<Data>
81 ): void {
82 worker.postMessage(message)
83 }
84
85 /** @inheritDoc */
86 protected registerWorkerMessageListener<Message extends Data | Response>(
87 worker: ThreadWorkerWithMessageChannel,
88 listener: (message: MessageValue<Message>) => void
89 ): void {
90 worker.port2?.on('message', listener)
91 }
92
93 /** @inheritDoc */
94 protected createWorker (): ThreadWorkerWithMessageChannel {
95 return new Worker(this.filePath, {
96 env: SHARE_ENV,
97 ...this.opts.workerOptions
98 })
99 }
100
101 /** @inheritDoc */
102 protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void {
103 const { port1, port2 } = new MessageChannel()
104 worker.postMessage({ parent: port1 }, [port1])
105 worker.port1 = port1
106 worker.port2 = port2
107 // Listen to worker messages.
108 this.registerWorkerMessageListener(worker, super.workerListener())
109 }
110
111 /** @inheritDoc */
112 protected get type (): PoolType {
113 return PoolTypes.fixed
114 }
115
116 /** @inheritDoc */
117 protected get worker (): WorkerType {
118 return WorkerTypes.thread
119 }
120
121 /** @inheritDoc */
122 protected get minSize (): number {
123 return this.numberOfWorkers
124 }
125
126 /** @inheritDoc */
127 protected get maxSize (): number {
128 return this.numberOfWorkers
129 }
130
131 /** @inheritDoc */
132 protected get busy (): boolean {
133 return this.internalBusy()
134 }
135 }