9 } from
'node:worker_threads'
10 import type { MessageValue
} from
'../../utility-types'
11 import { AbstractPool
} from
'../abstract-pool'
12 import { type PoolOptions
, type PoolType
, PoolTypes
} from
'../pool'
13 import { type WorkerType
, WorkerTypes
} from
'../worker'
16 * Options for a poolifier thread pool.
18 export interface ThreadPoolOptions
extends PoolOptions
<Worker
> {
22 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
24 workerOptions
?: WorkerOptions
28 * A thread pool with a fixed number of threads.
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
32 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
35 export class FixedThreadPool
<
38 > extends AbstractPool
<Worker
, Data
, Response
> {
40 * Constructs a new poolifier fixed thread pool.
42 * @param numberOfThreads - Number of threads for this pool.
43 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
44 * @param opts - Options for this fixed thread pool.
47 numberOfThreads
: number,
49 protected readonly opts
: ThreadPoolOptions
= {}
51 super(numberOfThreads
, filePath
, opts
)
55 protected isMain (): boolean {
60 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
61 this.flagWorkerNodeAsNotReady(workerNodeKey
)
62 this.flushTasksQueue(workerNodeKey
)
63 // FIXME: wait for tasks to be finished
64 const workerNode
= this.workerNodes
[workerNodeKey
]
65 const worker
= workerNode
.worker
66 const waitWorkerExit
= new Promise
<void>(resolve
=> {
67 worker
.once('exit', () => {
71 await this.sendKillMessageToWorker(workerNodeKey
)
72 workerNode
.closeChannel()
73 workerNode
.removeAllListeners()
74 await worker
.terminate()
79 protected sendToWorker (
80 workerNodeKey
: number,
81 message
: MessageValue
<Data
>,
82 transferList
?: TransferListItem
[]
85 this.workerNodes
[workerNodeKey
]?.messageChannel
as MessageChannel
86 )?.port1
?.postMessage(
87 { ...message
, workerId
: this.getWorkerInfo(workerNodeKey
).id
},
93 protected sendStartupMessageToWorker (workerNodeKey
: number): void {
94 const workerNode
= this.workerNodes
[workerNodeKey
]
95 const port2
: MessagePort
= (workerNode
.messageChannel
as MessageChannel
)
97 workerNode
.worker
.postMessage(
100 workerId
: this.getWorkerInfo(workerNodeKey
).id
,
108 protected registerWorkerMessageListener
<Message
extends Data
| Response
>(
109 workerNodeKey
: number,
110 listener
: (message
: MessageValue
<Message
>) => void
113 this.workerNodes
[workerNodeKey
]?.messageChannel
as MessageChannel
114 ).port1
.on('message', listener
)
118 protected registerOnceWorkerMessageListener
<Message
extends Data
| Response
>(
119 workerNodeKey
: number,
120 listener
: (message
: MessageValue
<Message
>) => void
123 this.workerNodes
[workerNodeKey
]?.messageChannel
as MessageChannel
124 ).port1
.once('message', listener
)
128 protected deregisterWorkerMessageListener
<Message
extends Data
| Response
>(
129 workerNodeKey
: number,
130 listener
: (message
: MessageValue
<Message
>) => void
133 this.workerNodes
[workerNodeKey
]?.messageChannel
as MessageChannel
134 ).port1
.off('message', listener
)
138 protected createWorker (): Worker
{
139 return new Worker(this.filePath
, {
141 ...this.opts
.workerOptions
146 protected get
type (): PoolType
{
147 return PoolTypes
.fixed
151 protected get
worker (): WorkerType
{
152 return WorkerTypes
.thread
156 protected get
busy (): boolean {
157 return this.internalBusy()