c4256bf3b2213d8d7a6a60d7df8a95447576b888
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 type MessagePort,
3 type TransferListItem,
4 type Worker,
5 isMainThread
6 } from 'node:worker_threads'
7 import type { MessageValue } from '../../utility-types.js'
8 import { AbstractPool } from '../abstract-pool.js'
9 import { type PoolOptions, type PoolType, PoolTypes } from '../pool.js'
10 import { type WorkerType, WorkerTypes } from '../worker.js'
11
12 /**
13 * Options for a poolifier thread pool.
14 */
15 export type ThreadPoolOptions = PoolOptions<Worker>
16
17 /**
18 * A thread pool with a fixed number of threads.
19 *
20 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
22 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
23 * @since 0.0.1
24 */
25 export class FixedThreadPool<
26 Data = unknown,
27 Response = unknown
28 > extends AbstractPool<Worker, Data, Response> {
29 /**
30 * Constructs a new poolifier fixed thread pool.
31 *
32 * @param numberOfThreads - Number of threads for this pool.
33 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
34 * @param opts - Options for this fixed thread pool.
35 */
36 public constructor (
37 numberOfThreads: number,
38 filePath: string,
39 opts: ThreadPoolOptions = {},
40 maximumNumberOfThreads?: number
41 ) {
42 super(numberOfThreads, filePath, opts, maximumNumberOfThreads)
43 }
44
45 /** @inheritDoc */
46 protected isMain (): boolean {
47 return isMainThread
48 }
49
50 /** @inheritDoc */
51 protected sendToWorker (
52 workerNodeKey: number,
53 message: MessageValue<Data>,
54 transferList?: TransferListItem[]
55 ): void {
56 this.workerNodes[workerNodeKey].messageChannel?.port1?.postMessage(
57 { ...message, workerId: this.getWorkerInfo(workerNodeKey).id },
58 transferList
59 )
60 }
61
62 /** @inheritDoc */
63 protected sendStartupMessageToWorker (workerNodeKey: number): void {
64 const workerNode = this.workerNodes[workerNodeKey]
65 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
66 const port2: MessagePort = workerNode.messageChannel!.port2
67 workerNode.worker.postMessage(
68 {
69 ready: false,
70 workerId: this.getWorkerInfo(workerNodeKey).id,
71 port: port2
72 },
73 [port2]
74 )
75 }
76
77 /** @inheritDoc */
78 protected registerWorkerMessageListener<Message extends Data | Response>(
79 workerNodeKey: number,
80 listener: (message: MessageValue<Message>) => void
81 ): void {
82 this.workerNodes[workerNodeKey].messageChannel?.port1?.on(
83 'message',
84 listener
85 )
86 }
87
88 /** @inheritDoc */
89 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
90 workerNodeKey: number,
91 listener: (message: MessageValue<Message>) => void
92 ): void {
93 this.workerNodes[workerNodeKey].messageChannel?.port1?.once(
94 'message',
95 listener
96 )
97 }
98
99 /** @inheritDoc */
100 protected deregisterWorkerMessageListener<Message extends Data | Response>(
101 workerNodeKey: number,
102 listener: (message: MessageValue<Message>) => void
103 ): void {
104 this.workerNodes[workerNodeKey].messageChannel?.port1?.off(
105 'message',
106 listener
107 )
108 }
109
110 /** @inheritDoc */
111 protected shallCreateDynamicWorker (): boolean {
112 return false
113 }
114
115 /** @inheritDoc */
116 protected checkAndEmitDynamicWorkerCreationEvents (): void {
117 /* noop */
118 }
119
120 /** @inheritDoc */
121 protected get type (): PoolType {
122 return PoolTypes.fixed
123 }
124
125 /** @inheritDoc */
126 protected get worker (): WorkerType {
127 return WorkerTypes.thread
128 }
129
130 /** @inheritDoc */
131 protected get busy (): boolean {
132 return this.internalBusy()
133 }
134 }