6 } from
'node:worker_threads'
7 import type { Draft
, MessageValue
} from
'../../utility-types'
8 import { AbstractPool
} from
'../abstract-pool'
9 import type { PoolOptions
} from
'../pool'
10 import { PoolType
} from
'../pool'
13 * A thread worker with message channels for communication between main thread and thread worker.
15 export type ThreadWorkerWithMessageChannel
= Worker
& Draft
<MessageChannel
>
18 * A thread pool with a fixed number of threads.
20 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
22 * This pool selects the threads in a round robin fashion.
24 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
25 * @typeParam Response - Type of execution response. This can only be serializable data.
26 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
29 export class FixedThreadPool
<
32 > extends AbstractPool
<ThreadWorkerWithMessageChannel
, Data
, Response
> {
34 * Constructs a new poolifier fixed thread pool.
36 * @param numberOfThreads - Number of threads for this pool.
37 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
38 * @param opts - Options for this fixed thread pool.
41 numberOfThreads
: number,
43 opts
: PoolOptions
<ThreadWorkerWithMessageChannel
> = {}
45 super(numberOfThreads
, filePath
, opts
)
49 protected isMain (): boolean {
54 protected async destroyWorker (
55 worker
: ThreadWorkerWithMessageChannel
57 this.sendToWorker(worker
, { kill
: 1 })
58 await worker
.terminate()
62 protected sendToWorker (
63 worker
: ThreadWorkerWithMessageChannel
,
64 message
: MessageValue
<Data
>
66 worker
.postMessage(message
)
70 protected registerWorkerMessageListener
<Message
extends Data
| Response
>(
71 worker
: ThreadWorkerWithMessageChannel
,
72 listener
: (message
: MessageValue
<Message
>) => void
74 worker
.port2
?.on('message', listener
)
78 protected createWorker (): ThreadWorkerWithMessageChannel
{
79 return new Worker(this.filePath
, {
85 protected afterWorkerSetup (worker
: ThreadWorkerWithMessageChannel
): void {
86 const { port1
, port2
} = new MessageChannel()
87 worker
.postMessage({ parent: port1
}, [port1
])
90 // Listen to worker messages.
91 this.registerWorkerMessageListener(worker
, super.workerListener())
95 public get
type (): PoolType
{
100 protected get
full (): boolean {
101 return this.workerNodes
.length
=== this.numberOfWorkers
105 protected get
busy (): boolean {
106 return this.internalBusy()