build: switch default to ESM
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 type MessageChannel,
3 type MessagePort,
4 type TransferListItem,
5 type Worker,
6 isMainThread
7 } from 'node:worker_threads'
8 import type { MessageValue } from '../../utility-types.js'
9 import { AbstractPool } from '../abstract-pool.js'
10 import { type PoolOptions, type PoolType, PoolTypes } from '../pool.js'
11 import { type WorkerType, WorkerTypes } from '../worker.js'
12
13 /**
14 * Options for a poolifier thread pool.
15 */
16 export type ThreadPoolOptions = PoolOptions<Worker>
17
18 /**
19 * A thread pool with a fixed number of threads.
20 *
21 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
22 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
23 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
24 * @since 0.0.1
25 */
26 export class FixedThreadPool<
27 Data = unknown,
28 Response = unknown
29 > extends AbstractPool<Worker, Data, Response> {
30 /**
31 * Constructs a new poolifier fixed thread pool.
32 *
33 * @param numberOfThreads - Number of threads for this pool.
34 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
35 * @param opts - Options for this fixed thread pool.
36 */
37 public constructor (
38 numberOfThreads: number,
39 filePath: string,
40 opts: ThreadPoolOptions = {},
41 maximumNumberOfThreads?: number
42 ) {
43 super(numberOfThreads, filePath, opts, maximumNumberOfThreads)
44 }
45
46 /** @inheritDoc */
47 protected isMain (): boolean {
48 return isMainThread
49 }
50
51 /** @inheritDoc */
52 protected sendToWorker (
53 workerNodeKey: number,
54 message: MessageValue<Data>,
55 transferList?: TransferListItem[]
56 ): void {
57 this.workerNodes[workerNodeKey].messageChannel?.port1?.postMessage(
58 { ...message, workerId: this.getWorkerInfo(workerNodeKey).id },
59 transferList
60 )
61 }
62
63 /** @inheritDoc */
64 protected sendStartupMessageToWorker (workerNodeKey: number): void {
65 const workerNode = this.workerNodes[workerNodeKey]
66 const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
67 .port2
68 workerNode.worker.postMessage(
69 {
70 ready: false,
71 workerId: this.getWorkerInfo(workerNodeKey).id,
72 port: port2
73 },
74 [port2]
75 )
76 }
77
78 /** @inheritDoc */
79 protected registerWorkerMessageListener<Message extends Data | Response>(
80 workerNodeKey: number,
81 listener: (message: MessageValue<Message>) => void
82 ): void {
83 this.workerNodes[workerNodeKey].messageChannel?.port1?.on(
84 'message',
85 listener
86 )
87 }
88
89 /** @inheritDoc */
90 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
91 workerNodeKey: number,
92 listener: (message: MessageValue<Message>) => void
93 ): void {
94 this.workerNodes[workerNodeKey].messageChannel?.port1?.once(
95 'message',
96 listener
97 )
98 }
99
100 /** @inheritDoc */
101 protected deregisterWorkerMessageListener<Message extends Data | Response>(
102 workerNodeKey: number,
103 listener: (message: MessageValue<Message>) => void
104 ): void {
105 this.workerNodes[workerNodeKey].messageChannel?.port1?.off(
106 'message',
107 listener
108 )
109 }
110
111 /** @inheritDoc */
112 protected shallCreateDynamicWorker (): boolean {
113 return false
114 }
115
116 /** @inheritDoc */
117 protected checkAndEmitDynamicWorkerCreationEvents (): void {
118 /* noop */
119 }
120
121 /** @inheritDoc */
122 protected get type (): PoolType {
123 return PoolTypes.fixed
124 }
125
126 /** @inheritDoc */
127 protected get worker (): WorkerType {
128 return WorkerTypes.thread
129 }
130
131 /** @inheritDoc */
132 protected get busy (): boolean {
133 return this.internalBusy()
134 }
135 }