6 } from
'node:worker_threads'
7 import type { Draft
, MessageValue
} from
'../../utility-types'
8 import { AbstractPool
} from
'../abstract-pool'
9 import { type PoolOptions
, type PoolType
, PoolTypes
} from
'../pool'
12 * A thread worker with message channels for communication between main thread and thread worker.
14 export type ThreadWorkerWithMessageChannel
= Worker
& Draft
<MessageChannel
>
17 * A thread pool with a fixed number of threads.
19 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
21 * This pool selects the threads in a round robin fashion.
23 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
24 * @typeParam Response - Type of execution response. This can only be serializable data.
25 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
28 export class FixedThreadPool
<
31 > extends AbstractPool
<ThreadWorkerWithMessageChannel
, Data
, Response
> {
33 * Constructs a new poolifier fixed thread pool.
35 * @param numberOfThreads - Number of threads for this pool.
36 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
37 * @param opts - Options for this fixed thread pool.
40 numberOfThreads
: number,
42 opts
: PoolOptions
<ThreadWorkerWithMessageChannel
> = {}
44 super(numberOfThreads
, filePath
, opts
)
48 protected isMain (): boolean {
53 protected async destroyWorker (
54 worker
: ThreadWorkerWithMessageChannel
56 this.sendToWorker(worker
, { kill
: 1 })
57 await worker
.terminate()
61 protected sendToWorker (
62 worker
: ThreadWorkerWithMessageChannel
,
63 message
: MessageValue
<Data
>
65 worker
.postMessage(message
)
69 protected registerWorkerMessageListener
<Message
extends Data
| Response
>(
70 worker
: ThreadWorkerWithMessageChannel
,
71 listener
: (message
: MessageValue
<Message
>) => void
73 worker
.port2
?.on('message', listener
)
77 protected createWorker (): ThreadWorkerWithMessageChannel
{
78 return new Worker(this.filePath
, {
84 protected afterWorkerSetup (worker
: ThreadWorkerWithMessageChannel
): void {
85 const { port1
, port2
} = new MessageChannel()
86 worker
.postMessage({ parent: port1
}, [port1
])
89 // Listen to worker messages.
90 this.registerWorkerMessageListener(worker
, super.workerListener())
94 public get
type (): PoolType
{
95 return PoolTypes
.fixed
99 protected get
minSize (): number {
100 return this.numberOfWorkers
104 protected get
maxSize (): number {
105 return this.numberOfWorkers
109 protected get
full (): boolean {
110 return this.workerNodes
.length
>= this.numberOfWorkers
114 protected get
busy (): boolean {
115 return this.internalBusy()