Merge branch 'master' into elu-strategy
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 MessageChannel,
3 SHARE_ENV,
4 Worker,
5 type WorkerOptions,
6 isMainThread
7 } from 'node:worker_threads'
8 import type { Draft, MessageValue } from '../../utility-types'
9 import { AbstractPool } from '../abstract-pool'
10 import {
11 type PoolOptions,
12 type PoolType,
13 PoolTypes,
14 type WorkerType,
15 WorkerTypes
16 } from '../pool'
17
18 /**
19 * Options for a poolifier thread pool.
20 */
21 export interface ThreadPoolOptions extends PoolOptions<Worker> {
22 /**
23 * Worker options.
24 *
25 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
26 */
27 workerOptions?: WorkerOptions
28 }
29
30 /**
31 * A thread worker with message channels for communication between main thread and thread worker.
32 */
33 export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
34
35 /**
36 * A thread pool with a fixed number of threads.
37 *
38 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
39 *
40 * This pool selects the threads in a round robin fashion.
41 *
42 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
43 * @typeParam Response - Type of execution response. This can only be serializable data.
44 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
45 * @since 0.0.1
46 */
47 export class FixedThreadPool<
48 Data = unknown,
49 Response = unknown
50 > extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
51 /**
52 * Constructs a new poolifier fixed thread pool.
53 *
54 * @param numberOfThreads - Number of threads for this pool.
55 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
56 * @param opts - Options for this fixed thread pool.
57 */
58 public constructor (
59 numberOfThreads: number,
60 filePath: string,
61 protected readonly opts: ThreadPoolOptions = {}
62 ) {
63 super(numberOfThreads, filePath, opts)
64 }
65
66 /** @inheritDoc */
67 protected isMain (): boolean {
68 return isMainThread
69 }
70
71 /** @inheritDoc */
72 protected async destroyWorker (
73 worker: ThreadWorkerWithMessageChannel
74 ): Promise<void> {
75 this.sendToWorker(worker, { kill: 1 })
76 await worker.terminate()
77 }
78
79 /** @inheritDoc */
80 protected sendToWorker (
81 worker: ThreadWorkerWithMessageChannel,
82 message: MessageValue<Data>
83 ): void {
84 worker.postMessage(message)
85 }
86
87 /** @inheritDoc */
88 protected registerWorkerMessageListener<Message extends Data | Response>(
89 worker: ThreadWorkerWithMessageChannel,
90 listener: (message: MessageValue<Message>) => void
91 ): void {
92 worker.port2?.on('message', listener)
93 }
94
95 /** @inheritDoc */
96 protected createWorker (): ThreadWorkerWithMessageChannel {
97 return new Worker(this.filePath, {
98 env: SHARE_ENV,
99 ...this.opts.workerOptions
100 })
101 }
102
103 /** @inheritDoc */
104 protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void {
105 const { port1, port2 } = new MessageChannel()
106 worker.postMessage({ parent: port1 }, [port1])
107 worker.port1 = port1
108 worker.port2 = port2
109 // Listen to worker messages.
110 this.registerWorkerMessageListener(worker, super.workerListener())
111 }
112
113 /** @inheritDoc */
114 protected get type (): PoolType {
115 return PoolTypes.fixed
116 }
117
118 /** @inheritDoc */
119 protected get worker (): WorkerType {
120 return WorkerTypes.thread
121 }
122
123 /** @inheritDoc */
124 protected get minSize (): number {
125 return this.numberOfWorkers
126 }
127
128 /** @inheritDoc */
129 protected get maxSize (): number {
130 return this.numberOfWorkers
131 }
132
133 /** @inheritDoc */
134 protected get busy (): boolean {
135 return this.internalBusy()
136 }
137 }