5 } from
'node:worker_threads'
7 import type { MessageValue
} from
'../../utility-types.js'
9 import { AbstractPool
} from
'../abstract-pool.js'
10 import { type PoolOptions
, type PoolType
, PoolTypes
} from
'../pool.js'
11 import { type WorkerType
, WorkerTypes
} from
'../worker.js'
14 * Options for a poolifier thread pool.
16 export type ThreadPoolOptions
= PoolOptions
<Worker
>
19 * A thread pool with a fixed number of threads.
20 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
22 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
25 export class FixedThreadPool
<
28 > extends AbstractPool
<Worker
, Data
, Response
> {
30 protected get
backPressure (): boolean {
31 return this.internalBackPressure()
35 protected get
busy (): boolean {
36 return this.internalBusy()
40 protected get
type (): PoolType
{
41 return PoolTypes
.fixed
45 protected get
worker (): WorkerType
{
46 return WorkerTypes
.thread
50 * Constructs a new poolifier fixed thread pool.
51 * @param numberOfThreads - Number of threads for this pool.
52 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
53 * @param opts - Options for this fixed thread pool.
54 * @param maximumNumberOfThreads - The maximum number of threads for this pool.
57 numberOfThreads
: number,
59 opts
: ThreadPoolOptions
= {},
60 maximumNumberOfThreads
?: number
62 super(numberOfThreads
, filePath
, opts
, maximumNumberOfThreads
)
66 protected checkAndEmitDynamicWorkerCreationEvents (): void {
71 protected checkAndEmitDynamicWorkerDestructionEvents (): void {
76 protected deregisterWorkerMessageListener
<Message
extends Data
| Response
>(
77 workerNodeKey
: number,
78 listener
: (message
: MessageValue
<Message
>) => void
80 this.workerNodes
[workerNodeKey
].messageChannel
?.port1
.off(
87 protected isMain (): boolean {
92 protected registerOnceWorkerMessageListener
<Message
extends Data
| Response
>(
93 workerNodeKey
: number,
94 listener
: (message
: MessageValue
<Message
>) => void
96 this.workerNodes
[workerNodeKey
].messageChannel
?.port1
.once(
103 protected registerWorkerMessageListener
<Message
extends Data
| Response
>(
104 workerNodeKey
: number,
105 listener
: (message
: MessageValue
<Message
>) => void
107 this.workerNodes
[workerNodeKey
].messageChannel
?.port1
.on(
114 protected sendStartupMessageToWorker (workerNodeKey
: number): void {
115 const workerNode
= this.workerNodes
[workerNodeKey
]
116 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
117 const port2
= workerNode
.messageChannel
!.port2
118 workerNode
.worker
.postMessage(
122 workerId
: this.getWorkerInfo(workerNodeKey
)?.id
,
123 } satisfies MessageValue
<Data
>,
129 protected sendToWorker (
130 workerNodeKey
: number,
131 message
: MessageValue
<Data
>,
132 transferList
?: readonly Transferable
[]
134 this.workerNodes
[workerNodeKey
]?.messageChannel
?.port1
.postMessage(
137 workerId
: this.getWorkerInfo(workerNodeKey
)?.id
,
138 } satisfies MessageValue
<Data
>,
144 protected shallCreateDynamicWorker (): boolean {