refactor: cleanup message passing code
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 SHARE_ENV,
3 Worker,
4 type WorkerOptions,
5 isMainThread
6 } from 'node:worker_threads'
7 import type { MessageValue } from '../../utility-types'
8 import { AbstractPool } from '../abstract-pool'
9 import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
10 import { type WorkerType, WorkerTypes } from '../worker'
11
12 /**
13 * Options for a poolifier thread pool.
14 */
15 export interface ThreadPoolOptions extends PoolOptions<Worker> {
16 /**
17 * Worker options.
18 *
19 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
20 */
21 workerOptions?: WorkerOptions
22 }
23
24 /**
25 * A thread pool with a fixed number of threads.
26 *
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
29 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
30 * @since 0.0.1
31 */
32 export class FixedThreadPool<
33 Data = unknown,
34 Response = unknown
35 > extends AbstractPool<Worker, Data, Response> {
36 /**
37 * Constructs a new poolifier fixed thread pool.
38 *
39 * @param numberOfThreads - Number of threads for this pool.
40 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
41 * @param opts - Options for this fixed thread pool.
42 */
43 public constructor (
44 numberOfThreads: number,
45 filePath: string,
46 protected readonly opts: ThreadPoolOptions = {}
47 ) {
48 super(numberOfThreads, filePath, opts)
49 }
50
51 /** @inheritDoc */
52 protected isMain (): boolean {
53 return isMainThread
54 }
55
56 /** @inheritDoc */
57 protected async destroyWorker (worker: Worker): Promise<void> {
58 this.sendToWorker(worker, { kill: true })
59 await worker.terminate()
60 }
61
62 /** @inheritDoc */
63 protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
64 worker.postMessage(message)
65 }
66
67 /** @inheritDoc */
68 protected createWorker (): Worker {
69 return new Worker(this.filePath, {
70 env: SHARE_ENV,
71 ...this.opts.workerOptions
72 })
73 }
74
75 /** @inheritDoc */
76 protected get type (): PoolType {
77 return PoolTypes.fixed
78 }
79
80 /** @inheritDoc */
81 protected get worker (): WorkerType {
82 return WorkerTypes.thread
83 }
84
85 /** @inheritDoc */
86 protected get minSize (): number {
87 return this.numberOfWorkers
88 }
89
90 /** @inheritDoc */
91 protected get maxSize (): number {
92 return this.numberOfWorkers
93 }
94
95 /** @inheritDoc */
96 protected get busy (): boolean {
97 return this.internalBusy()
98 }
99 }