chore: cleanup `@std/expect` imports
[poolifier.git] / src / pools / thread / fixed.ts
... / ...
CommitLineData
1import {
2 isMainThread,
3 type TransferListItem,
4 type Worker,
5} from 'node:worker_threads'
6
7import type { MessageValue } from '../../utility-types.js'
8
9import { AbstractPool } from '../abstract-pool.js'
10import { type PoolOptions, type PoolType, PoolTypes } from '../pool.js'
11import { type WorkerType, WorkerTypes } from '../worker.js'
12
13/**
14 * Options for a poolifier thread pool.
15 */
16export type ThreadPoolOptions = PoolOptions<Worker>
17
18/**
19 * A thread pool with a fixed number of threads.
20 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
22 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
23 * @since 0.0.1
24 */
25export class FixedThreadPool<
26 Data = unknown,
27 Response = unknown
28> extends AbstractPool<Worker, Data, Response> {
29 /**
30 * Constructs a new poolifier fixed thread pool.
31 * @param numberOfThreads - Number of threads for this pool.
32 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
33 * @param opts - Options for this fixed thread pool.
34 * @param maximumNumberOfThreads - The maximum number of threads for this pool.
35 */
36 public constructor (
37 numberOfThreads: number,
38 filePath: string,
39 opts: ThreadPoolOptions = {},
40 maximumNumberOfThreads?: number
41 ) {
42 super(numberOfThreads, filePath, opts, maximumNumberOfThreads)
43 }
44
45 /** @inheritDoc */
46 protected checkAndEmitDynamicWorkerCreationEvents (): void {
47 /* noop */
48 }
49
50 /** @inheritDoc */
51 protected checkAndEmitDynamicWorkerDestructionEvents (): void {
52 /* noop */
53 }
54
55 /** @inheritDoc */
56 protected deregisterWorkerMessageListener<Message extends Data | Response>(
57 workerNodeKey: number,
58 listener: (message: MessageValue<Message>) => void
59 ): void {
60 this.workerNodes[workerNodeKey].messageChannel?.port1.off(
61 'message',
62 listener
63 )
64 }
65
66 /** @inheritDoc */
67 protected isMain (): boolean {
68 return isMainThread
69 }
70
71 /** @inheritDoc */
72 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
73 workerNodeKey: number,
74 listener: (message: MessageValue<Message>) => void
75 ): void {
76 this.workerNodes[workerNodeKey].messageChannel?.port1.once(
77 'message',
78 listener
79 )
80 }
81
82 /** @inheritDoc */
83 protected registerWorkerMessageListener<Message extends Data | Response>(
84 workerNodeKey: number,
85 listener: (message: MessageValue<Message>) => void
86 ): void {
87 this.workerNodes[workerNodeKey].messageChannel?.port1.on(
88 'message',
89 listener
90 )
91 }
92
93 /** @inheritDoc */
94 protected sendStartupMessageToWorker (workerNodeKey: number): void {
95 const workerNode = this.workerNodes[workerNodeKey]
96 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
97 const port2 = workerNode.messageChannel!.port2
98 workerNode.worker.postMessage(
99 {
100 port: port2,
101 ready: false,
102 workerId: this.getWorkerInfo(workerNodeKey)?.id,
103 } satisfies MessageValue<Data>,
104 [port2]
105 )
106 }
107
108 /** @inheritDoc */
109 protected sendToWorker (
110 workerNodeKey: number,
111 message: MessageValue<Data>,
112 transferList?: readonly TransferListItem[]
113 ): void {
114 this.workerNodes[workerNodeKey]?.messageChannel?.port1.postMessage(
115 {
116 ...message,
117 workerId: this.getWorkerInfo(workerNodeKey)?.id,
118 } satisfies MessageValue<Data>,
119 transferList
120 )
121 }
122
123 /** @inheritDoc */
124 protected shallCreateDynamicWorker (): boolean {
125 return false
126 }
127
128 /** @inheritDoc */
129 protected get backPressure (): boolean {
130 return this.internalBackPressure()
131 }
132
133 /** @inheritDoc */
134 protected get busy (): boolean {
135 return this.internalBusy()
136 }
137
138 /** @inheritDoc */
139 protected get type (): PoolType {
140 return PoolTypes.fixed
141 }
142
143 /** @inheritDoc */
144 protected get worker (): WorkerType {
145 return WorkerTypes.thread
146 }
147}