8 } from
'node:worker_threads'
9 import type { MessageValue
} from
'../../utility-types'
10 import { AbstractPool
} from
'../abstract-pool'
11 import { type PoolOptions
, type PoolType
, PoolTypes
} from
'../pool'
12 import { type WorkerType
, WorkerTypes
} from
'../worker'
15 * Options for a poolifier thread pool.
17 export interface ThreadPoolOptions
extends PoolOptions
<Worker
> {
21 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
23 workerOptions
?: WorkerOptions
27 * A thread pool with a fixed number of threads.
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
31 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
34 export class FixedThreadPool
<
37 > extends AbstractPool
<Worker
, Data
, Response
> {
39 * Constructs a new poolifier fixed thread pool.
41 * @param numberOfThreads - Number of threads for this pool.
42 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
43 * @param opts - Options for this fixed thread pool.
46 numberOfThreads
: number,
48 protected readonly opts
: ThreadPoolOptions
= {}
50 super(numberOfThreads
, filePath
, opts
)
54 protected isMain (): boolean {
59 protected async destroyWorker (worker
: Worker
): Promise
<void> {
60 this.sendToWorker(worker
, { kill
: true, workerId
: worker
.threadId
})
61 const workerInfo
= this.getWorkerInfoByWorker(worker
)
62 workerInfo
.messageChannel
?.port1
.close()
63 workerInfo
.messageChannel
?.port2
.close()
64 await worker
.terminate()
68 protected sendToWorker (worker
: Worker
, message
: MessageValue
<Data
>): void {
70 this.getWorkerInfoByWorker(worker
).messageChannel
as MessageChannel
71 ).port1
.postMessage(message
)
75 protected sendStartupMessageToWorker (worker
: Worker
): void {
76 const port2
: MessagePort
= (
77 this.getWorkerInfoByWorker(worker
).messageChannel
as MessageChannel
82 workerId
: worker
.threadId
,
90 protected registerWorkerMessageListener
<Message
extends Data
| Response
>(
92 listener
: (message
: MessageValue
<Message
>) => void
95 this.getWorkerInfoByWorker(worker
).messageChannel
as MessageChannel
96 ).port1
.on('message', listener
)
100 protected createWorker (): Worker
{
101 return new Worker(this.filePath
, {
103 ...this.opts
.workerOptions
108 protected get
type (): PoolType
{
109 return PoolTypes
.fixed
113 protected get
worker (): WorkerType
{
114 return WorkerTypes
.thread
118 protected get
minSize (): number {
119 return this.numberOfWorkers
123 protected get
maxSize (): number {
124 return this.numberOfWorkers
128 protected get
busy (): boolean {
129 return this.internalBusy()