7 } from
'node:worker_threads'
8 import type { MessageValue
} from
'../../utility-types'
9 import { AbstractPool
} from
'../abstract-pool'
10 import { type PoolOptions
, type PoolType
, PoolTypes
} from
'../pool'
11 import { type WorkerType
, WorkerTypes
} from
'../worker'
14 * Options for a poolifier thread pool.
16 export type ThreadPoolOptions
= PoolOptions
<Worker
>
19 * A thread pool with a fixed number of threads.
21 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
22 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
23 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
26 export class FixedThreadPool
<
29 > extends AbstractPool
<Worker
, Data
, Response
> {
31 * Constructs a new poolifier fixed thread pool.
33 * @param numberOfThreads - Number of threads for this pool.
34 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
35 * @param opts - Options for this fixed thread pool.
38 numberOfThreads
: number,
40 opts
: ThreadPoolOptions
= {},
41 maximumNumberOfThreads
?: number
43 super(numberOfThreads
, filePath
, opts
, maximumNumberOfThreads
)
47 protected isMain (): boolean {
52 protected sendToWorker (
53 workerNodeKey
: number,
54 message
: MessageValue
<Data
>,
55 transferList
?: TransferListItem
[]
57 this.workerNodes
[workerNodeKey
].messageChannel
?.port1
?.postMessage(
58 { ...message
, workerId
: this.getWorkerInfo(workerNodeKey
).id
},
64 protected sendStartupMessageToWorker (workerNodeKey
: number): void {
65 const workerNode
= this.workerNodes
[workerNodeKey
]
66 const port2
: MessagePort
= (workerNode
.messageChannel
as MessageChannel
)
68 workerNode
.worker
.postMessage(
71 workerId
: this.getWorkerInfo(workerNodeKey
).id
,
79 protected registerWorkerMessageListener
<Message
extends Data
| Response
>(
80 workerNodeKey
: number,
81 listener
: (message
: MessageValue
<Message
>) => void
83 this.workerNodes
[workerNodeKey
].messageChannel
?.port1
?.on(
90 protected registerOnceWorkerMessageListener
<Message
extends Data
| Response
>(
91 workerNodeKey
: number,
92 listener
: (message
: MessageValue
<Message
>) => void
94 this.workerNodes
[workerNodeKey
].messageChannel
?.port1
?.once(
101 protected deregisterWorkerMessageListener
<Message
extends Data
| Response
>(
102 workerNodeKey
: number,
103 listener
: (message
: MessageValue
<Message
>) => void
105 this.workerNodes
[workerNodeKey
].messageChannel
?.port1
?.off(
112 protected shallCreateDynamicWorker (): boolean {
117 protected checkAndEmitDynamicWorkerCreationEvents (): void {
122 protected get
type (): PoolType
{
123 return PoolTypes
.fixed
127 protected get
worker (): WorkerType
{
128 return WorkerTypes
.thread
132 protected get
busy (): boolean {
133 return this.internalBusy()