refactor: add type enforcement to message passing
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 type TransferListItem,
3 type Worker,
4 isMainThread
5 } from 'node:worker_threads'
6 import type { MessageValue } from '../../utility-types.js'
7 import { AbstractPool } from '../abstract-pool.js'
8 import { type PoolOptions, type PoolType, PoolTypes } from '../pool.js'
9 import { type WorkerType, WorkerTypes } from '../worker.js'
10
11 /**
12 * Options for a poolifier thread pool.
13 */
14 export type ThreadPoolOptions = PoolOptions<Worker>
15
16 /**
17 * A thread pool with a fixed number of threads.
18 *
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
21 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
22 * @since 0.0.1
23 */
24 export class FixedThreadPool<
25 Data = unknown,
26 Response = unknown
27 > extends AbstractPool<Worker, Data, Response> {
28 /**
29 * Constructs a new poolifier fixed thread pool.
30 *
31 * @param numberOfThreads - Number of threads for this pool.
32 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
33 * @param opts - Options for this fixed thread pool.
34 */
35 public constructor (
36 numberOfThreads: number,
37 filePath: string,
38 opts: ThreadPoolOptions = {},
39 maximumNumberOfThreads?: number
40 ) {
41 super(numberOfThreads, filePath, opts, maximumNumberOfThreads)
42 }
43
44 /** @inheritDoc */
45 protected isMain (): boolean {
46 return isMainThread
47 }
48
49 /** @inheritDoc */
50 protected sendToWorker (
51 workerNodeKey: number,
52 message: MessageValue<Data>,
53 transferList?: TransferListItem[]
54 ): void {
55 this.workerNodes[workerNodeKey].messageChannel?.port1.postMessage(
56 {
57 ...message,
58 workerId: this.getWorkerInfo(workerNodeKey)?.id
59 } satisfies MessageValue<Data>,
60 transferList
61 )
62 }
63
64 /** @inheritDoc */
65 protected sendStartupMessageToWorker (workerNodeKey: number): void {
66 const workerNode = this.workerNodes[workerNodeKey]
67 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
68 const port2 = workerNode.messageChannel!.port2
69 workerNode.worker.postMessage(
70 {
71 ready: false,
72 workerId: this.getWorkerInfo(workerNodeKey)?.id,
73 port: port2
74 } satisfies MessageValue<Data>,
75 [port2]
76 )
77 }
78
79 /** @inheritDoc */
80 protected registerWorkerMessageListener<Message extends Data | Response>(
81 workerNodeKey: number,
82 listener: (message: MessageValue<Message>) => void
83 ): void {
84 this.workerNodes[workerNodeKey].messageChannel?.port1.on(
85 'message',
86 listener
87 )
88 }
89
90 /** @inheritDoc */
91 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
92 workerNodeKey: number,
93 listener: (message: MessageValue<Message>) => void
94 ): void {
95 this.workerNodes[workerNodeKey].messageChannel?.port1.once(
96 'message',
97 listener
98 )
99 }
100
101 /** @inheritDoc */
102 protected deregisterWorkerMessageListener<Message extends Data | Response>(
103 workerNodeKey: number,
104 listener: (message: MessageValue<Message>) => void
105 ): void {
106 this.workerNodes[workerNodeKey].messageChannel?.port1.off(
107 'message',
108 listener
109 )
110 }
111
112 /** @inheritDoc */
113 protected shallCreateDynamicWorker (): boolean {
114 return false
115 }
116
117 /** @inheritDoc */
118 protected checkAndEmitDynamicWorkerCreationEvents (): void {
119 /* noop */
120 }
121
122 /** @inheritDoc */
123 protected get type (): PoolType {
124 return PoolTypes.fixed
125 }
126
127 /** @inheritDoc */
128 protected get worker (): WorkerType {
129 return WorkerTypes.thread
130 }
131
132 /** @inheritDoc */
133 protected get busy (): boolean {
134 return this.internalBusy()
135 }
136 }