7 } from
'node:worker_threads'
8 import type { Draft
, MessageValue
} from
'../../utility-types'
9 import { AbstractPool
} from
'../abstract-pool'
19 * Options for a poolifier thread pool.
21 export interface ThreadPoolOptions
extends PoolOptions
<Worker
> {
25 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
27 workerOptions
?: WorkerOptions
31 * A thread worker with message channels for communication between main thread and thread worker.
33 export type ThreadWorkerWithMessageChannel
= Worker
& Draft
<MessageChannel
>
36 * A thread pool with a fixed number of threads.
38 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
40 * This pool selects the threads in a round robin fashion.
42 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
43 * @typeParam Response - Type of execution response. This can only be serializable data.
44 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
47 export class FixedThreadPool
<
50 > extends AbstractPool
<ThreadWorkerWithMessageChannel
, Data
, Response
> {
52 * Constructs a new poolifier fixed thread pool.
54 * @param numberOfThreads - Number of threads for this pool.
55 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
56 * @param opts - Options for this fixed thread pool.
59 numberOfThreads
: number,
61 protected readonly opts
: ThreadPoolOptions
= {}
63 super(numberOfThreads
, filePath
, opts
)
67 protected isMain (): boolean {
72 protected async destroyWorker (
73 worker
: ThreadWorkerWithMessageChannel
75 this.sendToWorker(worker
, { kill
: 1 })
76 await worker
.terminate()
80 protected sendToWorker (
81 worker
: ThreadWorkerWithMessageChannel
,
82 message
: MessageValue
<Data
>
84 worker
.postMessage(message
)
88 protected registerWorkerMessageListener
<Message
extends Data
| Response
>(
89 worker
: ThreadWorkerWithMessageChannel
,
90 listener
: (message
: MessageValue
<Message
>) => void
92 worker
.port2
?.on('message', listener
)
96 protected createWorker (): ThreadWorkerWithMessageChannel
{
97 return new Worker(this.filePath
, {
99 ...this.opts
.workerOptions
104 protected afterWorkerSetup (worker
: ThreadWorkerWithMessageChannel
): void {
105 const { port1
, port2
} = new MessageChannel()
106 worker
.postMessage({ parent: port1
}, [port1
])
109 // Listen to worker messages.
110 this.registerWorkerMessageListener(worker
, super.workerListener())
114 protected get
type (): PoolType
{
115 return PoolTypes
.fixed
119 protected get
worker (): WorkerType
{
120 return WorkerTypes
.thread
124 protected get
minSize (): number {
125 return this.numberOfWorkers
129 protected get
maxSize (): number {
130 return this.numberOfWorkers
134 protected get
busy (): boolean {
135 return this.internalBusy()