6 } from
'node:worker_threads'
7 import type { Draft
, MessageValue
} from
'../../utility-types'
8 import { AbstractPool
} from
'../abstract-pool'
18 * A thread worker with message channels for communication between main thread and thread worker.
20 export type ThreadWorkerWithMessageChannel
= Worker
& Draft
<MessageChannel
>
23 * A thread pool with a fixed number of threads.
25 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
27 * This pool selects the threads in a round robin fashion.
29 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
30 * @typeParam Response - Type of execution response. This can only be serializable data.
31 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
34 export class FixedThreadPool
<
37 > extends AbstractPool
<ThreadWorkerWithMessageChannel
, Data
, Response
> {
39 * Constructs a new poolifier fixed thread pool.
41 * @param numberOfThreads - Number of threads for this pool.
42 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
43 * @param opts - Options for this fixed thread pool.
46 numberOfThreads
: number,
48 opts
: PoolOptions
<ThreadWorkerWithMessageChannel
> = {}
50 super(numberOfThreads
, filePath
, opts
)
54 protected isMain (): boolean {
59 protected async destroyWorker (
60 worker
: ThreadWorkerWithMessageChannel
62 this.sendToWorker(worker
, { kill
: 1 })
63 await worker
.terminate()
67 protected sendToWorker (
68 worker
: ThreadWorkerWithMessageChannel
,
69 message
: MessageValue
<Data
>
71 worker
.postMessage(message
)
75 protected registerWorkerMessageListener
<Message
extends Data
| Response
>(
76 worker
: ThreadWorkerWithMessageChannel
,
77 listener
: (message
: MessageValue
<Message
>) => void
79 worker
.port2
?.on('message', listener
)
83 protected createWorker (): ThreadWorkerWithMessageChannel
{
84 return new Worker(this.filePath
, {
90 protected afterWorkerSetup (worker
: ThreadWorkerWithMessageChannel
): void {
91 const { port1
, port2
} = new MessageChannel()
92 worker
.postMessage({ parent: port1
}, [port1
])
95 // Listen to worker messages.
96 this.registerWorkerMessageListener(worker
, super.workerListener())
100 protected get
type (): PoolType
{
101 return PoolTypes
.fixed
105 protected get
worker (): WorkerType
{
106 return WorkerTypes
.thread
110 protected get
minSize (): number {
111 return this.numberOfWorkers
115 protected get
maxSize (): number {
116 return this.numberOfWorkers
120 protected get
busy (): boolean {
121 return this.internalBusy()