refactor: cleanup worker id handling code
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 type MessageChannel,
3 type MessagePort,
4 SHARE_ENV,
5 Worker,
6 type WorkerOptions,
7 isMainThread
8 } from 'node:worker_threads'
9 import type { MessageValue } from '../../utility-types'
10 import { AbstractPool } from '../abstract-pool'
11 import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
12 import { type WorkerType, WorkerTypes } from '../worker'
13
14 /**
15 * Options for a poolifier thread pool.
16 */
17 export interface ThreadPoolOptions extends PoolOptions<Worker> {
18 /**
19 * Worker options.
20 *
21 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
22 */
23 workerOptions?: WorkerOptions
24 }
25
26 /**
27 * A thread pool with a fixed number of threads.
28 *
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
31 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
32 * @since 0.0.1
33 */
34 export class FixedThreadPool<
35 Data = unknown,
36 Response = unknown
37 > extends AbstractPool<Worker, Data, Response> {
38 /**
39 * Constructs a new poolifier fixed thread pool.
40 *
41 * @param numberOfThreads - Number of threads for this pool.
42 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
43 * @param opts - Options for this fixed thread pool.
44 */
45 public constructor (
46 numberOfThreads: number,
47 filePath: string,
48 protected readonly opts: ThreadPoolOptions = {}
49 ) {
50 super(numberOfThreads, filePath, opts)
51 }
52
53 /** @inheritDoc */
54 protected isMain (): boolean {
55 return isMainThread
56 }
57
58 /** @inheritDoc */
59 protected async destroyWorker (worker: Worker): Promise<void> {
60 this.sendToWorker(worker, { kill: true, workerId: worker.threadId })
61 const workerInfo = this.getWorkerInfoByWorker(worker)
62 workerInfo.messageChannel?.port1.close()
63 workerInfo.messageChannel?.port2.close()
64 await worker.terminate()
65 }
66
67 /** @inheritDoc */
68 protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
69 (
70 this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
71 ).port1.postMessage(message)
72 }
73
74 /** @inheritDoc */
75 protected sendStartupMessageToWorker (worker: Worker): void {
76 const port2: MessagePort = (
77 this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
78 ).port2
79 worker.postMessage(
80 {
81 ready: false,
82 workerId: worker.threadId,
83 port: port2
84 },
85 [port2]
86 )
87 }
88
89 /** @inheritDoc */
90 protected registerWorkerMessageListener<Message extends Data | Response>(
91 worker: Worker,
92 listener: (message: MessageValue<Message>) => void
93 ): void {
94 (
95 this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
96 ).port1.on('message', listener)
97 }
98
99 /** @inheritDoc */
100 protected createWorker (): Worker {
101 return new Worker(this.filePath, {
102 env: SHARE_ENV,
103 ...this.opts.workerOptions
104 })
105 }
106
107 /** @inheritDoc */
108 protected get type (): PoolType {
109 return PoolTypes.fixed
110 }
111
112 /** @inheritDoc */
113 protected get worker (): WorkerType {
114 return WorkerTypes.thread
115 }
116
117 /** @inheritDoc */
118 protected get minSize (): number {
119 return this.numberOfWorkers
120 }
121
122 /** @inheritDoc */
123 protected get maxSize (): number {
124 return this.numberOfWorkers
125 }
126
127 /** @inheritDoc */
128 protected get busy (): boolean {
129 return this.internalBusy()
130 }
131 }