refactor: factor out worker communication channel closing
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 type MessageChannel,
3 type MessagePort,
4 SHARE_ENV,
5 Worker,
6 type WorkerOptions,
7 isMainThread
8 } from 'node:worker_threads'
9 import type { MessageValue } from '../../utility-types'
10 import { AbstractPool } from '../abstract-pool'
11 import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
12 import { type WorkerType, WorkerTypes } from '../worker'
13
14 /**
15 * Options for a poolifier thread pool.
16 */
17 export interface ThreadPoolOptions extends PoolOptions<Worker> {
18 /**
19 * Worker options.
20 *
21 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
22 */
23 workerOptions?: WorkerOptions
24 }
25
26 /**
27 * A thread pool with a fixed number of threads.
28 *
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
31 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
32 * @since 0.0.1
33 */
34 export class FixedThreadPool<
35 Data = unknown,
36 Response = unknown
37 > extends AbstractPool<Worker, Data, Response> {
38 /**
39 * Constructs a new poolifier fixed thread pool.
40 *
41 * @param numberOfThreads - Number of threads for this pool.
42 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
43 * @param opts - Options for this fixed thread pool.
44 */
45 public constructor (
46 numberOfThreads: number,
47 filePath: string,
48 protected readonly opts: ThreadPoolOptions = {}
49 ) {
50 super(numberOfThreads, filePath, opts)
51 }
52
53 /** @inheritDoc */
54 protected isMain (): boolean {
55 return isMainThread
56 }
57
58 /** @inheritDoc */
59 protected async destroyWorker (worker: Worker): Promise<void> {
60 this.sendToWorker(worker, { kill: true, workerId: worker.threadId })
61 this.workerNodes[this.getWorkerNodeKey(worker)].closeChannel()
62 await worker.terminate()
63 }
64
65 /** @inheritDoc */
66 protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
67 (
68 this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
69 ).port1.postMessage(message)
70 }
71
72 /** @inheritDoc */
73 protected sendStartupMessageToWorker (worker: Worker): void {
74 const port2: MessagePort = (
75 this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
76 ).port2
77 worker.postMessage(
78 {
79 ready: false,
80 workerId: worker.threadId,
81 port: port2
82 },
83 [port2]
84 )
85 }
86
87 /** @inheritDoc */
88 protected registerWorkerMessageListener<Message extends Data | Response>(
89 worker: Worker,
90 listener: (message: MessageValue<Message>) => void
91 ): void {
92 (
93 this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
94 ).port1.on('message', listener)
95 }
96
97 /** @inheritDoc */
98 protected createWorker (): Worker {
99 return new Worker(this.filePath, {
100 env: SHARE_ENV,
101 ...this.opts.workerOptions
102 })
103 }
104
105 /** @inheritDoc */
106 protected get type (): PoolType {
107 return PoolTypes.fixed
108 }
109
110 /** @inheritDoc */
111 protected get worker (): WorkerType {
112 return WorkerTypes.thread
113 }
114
115 /** @inheritDoc */
116 protected get minSize (): number {
117 return this.numberOfWorkers
118 }
119
120 /** @inheritDoc */
121 protected get maxSize (): number {
122 return this.numberOfWorkers
123 }
124
125 /** @inheritDoc */
126 protected get busy (): boolean {
127 return this.internalBusy()
128 }
129 }