7 } from
'node:worker_threads'
8 import type { MessageValue
} from
'../../utility-types'
9 import { AbstractPool
} from
'../abstract-pool'
10 import { type PoolOptions
, type PoolType
, PoolTypes
} from
'../pool'
11 import { type WorkerType
, WorkerTypes
} from
'../worker'
14 * A thread pool with a fixed number of threads.
16 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
17 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
18 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
21 export class FixedThreadPool
<
24 > extends AbstractPool
<Worker
, Data
, Response
> {
26 * Constructs a new poolifier fixed thread pool.
28 * @param numberOfThreads - Number of threads for this pool.
29 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
30 * @param opts - Options for this fixed thread pool.
33 numberOfThreads
: number,
35 protected readonly opts
: PoolOptions
<Worker
> = {}
37 super(numberOfThreads
, filePath
, opts
)
41 protected isMain (): boolean {
46 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
47 this.flagWorkerNodeAsNotReady(workerNodeKey
)
48 this.flushTasksQueue(workerNodeKey
)
49 // FIXME: wait for tasks to be finished
50 const workerNode
= this.workerNodes
[workerNodeKey
]
51 const waitWorkerExit
= new Promise
<void>(resolve
=> {
52 workerNode
.registerOnceWorkerEventHandler('exit', () => {
56 await this.sendKillMessageToWorker(workerNodeKey
)
57 workerNode
.closeChannel()
58 workerNode
.removeAllListeners()
59 await workerNode
.worker
.terminate()
64 protected sendToWorker (
65 workerNodeKey
: number,
66 message
: MessageValue
<Data
>,
67 transferList
?: TransferListItem
[]
69 this.workerNodes
[workerNodeKey
].messageChannel
?.port1
?.postMessage(
70 { ...message
, workerId
: this.getWorkerInfo(workerNodeKey
).id
},
76 protected sendStartupMessageToWorker (workerNodeKey
: number): void {
77 const workerNode
= this.workerNodes
[workerNodeKey
]
78 const port2
: MessagePort
= (workerNode
.messageChannel
as MessageChannel
)
80 workerNode
.worker
.postMessage(
83 workerId
: this.getWorkerInfo(workerNodeKey
).id
,
91 protected registerWorkerMessageListener
<Message
extends Data
| Response
>(
92 workerNodeKey
: number,
93 listener
: (message
: MessageValue
<Message
>) => void
95 this.workerNodes
[workerNodeKey
].messageChannel
?.port1
?.on(
102 protected registerOnceWorkerMessageListener
<Message
extends Data
| Response
>(
103 workerNodeKey
: number,
104 listener
: (message
: MessageValue
<Message
>) => void
106 this.workerNodes
[workerNodeKey
].messageChannel
?.port1
?.once(
113 protected deregisterWorkerMessageListener
<Message
extends Data
| Response
>(
114 workerNodeKey
: number,
115 listener
: (message
: MessageValue
<Message
>) => void
117 this.workerNodes
[workerNodeKey
].messageChannel
?.port1
?.off(
124 protected get
type (): PoolType
{
125 return PoolTypes
.fixed
129 protected get
worker (): WorkerType
{
130 return WorkerTypes
.thread
134 protected get
busy (): boolean {
135 return this.internalBusy()