Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fc3e6586 1import {
85aeb3f3
JB
2 type MessageChannel,
3 type MessagePort,
fc3e6586 4 SHARE_ENV,
7d91a8cd 5 type TransferListItem,
65d7a1c9 6 Worker,
90082c8c 7 type WorkerOptions,
65d7a1c9 8 isMainThread
fc3e6586 9} from 'node:worker_threads'
e102732c 10import type { MessageValue } from '../../utility-types'
c97c7edb 11import { AbstractPool } from '../abstract-pool'
4b628b48
JB
12import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
13import { type WorkerType, WorkerTypes } from '../worker'
4ade5f1f 14
90082c8c
JB
15/**
16 * Options for a poolifier thread pool.
17 */
18export interface ThreadPoolOptions extends PoolOptions<Worker> {
19 /**
20 * Worker options.
21 *
22 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
23 */
24 workerOptions?: WorkerOptions
25}
26
4ade5f1f 27/**
729c563d
S
28 * A thread pool with a fixed number of threads.
29 *
e102732c
JB
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
4ade5f1f
S
32 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
33 * @since 0.0.1
34 */
d3c8a1a8 35export class FixedThreadPool<
deb85c12
JB
36 Data = unknown,
37 Response = unknown
e102732c 38> extends AbstractPool<Worker, Data, Response> {
4ade5f1f 39 /**
729c563d
S
40 * Constructs a new poolifier fixed thread pool.
41 *
38e795c1
JB
42 * @param numberOfThreads - Number of threads for this pool.
43 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
44 * @param opts - Options for this fixed thread pool.
4ade5f1f
S
45 */
46 public constructor (
5c5a1fb7 47 numberOfThreads: number,
c97c7edb 48 filePath: string,
90082c8c 49 protected readonly opts: ThreadPoolOptions = {}
4ade5f1f 50 ) {
5c5a1fb7 51 super(numberOfThreads, filePath, opts)
c97c7edb 52 }
4ade5f1f 53
afc003b2 54 /** @inheritDoc */
c97c7edb
S
55 protected isMain (): boolean {
56 return isMainThread
4ade5f1f
S
57 }
58
afc003b2 59 /** @inheritDoc */
aa9eede8 60 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
ae3ab61d 61 this.flagWorkerNodeAsNotReady(workerNodeKey)
81c02522
JB
62 this.flushTasksQueue(workerNodeKey)
63 // FIXME: wait for tasks to be finished
aa9eede8
JB
64 const workerNode = this.workerNodes[workerNodeKey]
65 const worker = workerNode.worker
041dc05b 66 const waitWorkerExit = new Promise<void>(resolve => {
ae036c3e 67 worker.once('exit', () => {
81c02522
JB
68 resolve()
69 })
70 })
72ae84a2 71 await this.sendKillMessageToWorker(workerNodeKey)
aa9eede8 72 workerNode.closeChannel()
78f60f82 73 workerNode.removeAllListeners()
c97c7edb 74 await worker.terminate()
c2301b8e 75 await waitWorkerExit
4ade5f1f
S
76 }
77
afc003b2 78 /** @inheritDoc */
aa9eede8
JB
79 protected sendToWorker (
80 workerNodeKey: number,
7d91a8cd
JB
81 message: MessageValue<Data>,
82 transferList?: TransferListItem[]
aa9eede8 83 ): void {
041dc05b 84 (
ae3ab61d 85 this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel
9761e404 86 )?.port1?.postMessage(
dbfa7948 87 { ...message, workerId: this.getWorkerInfo(workerNodeKey).id },
72ae84a2
JB
88 transferList
89 )
85aeb3f3
JB
90 }
91
92 /** @inheritDoc */
aa9eede8 93 protected sendStartupMessageToWorker (workerNodeKey: number): void {
75de9f41 94 const workerNode = this.workerNodes[workerNodeKey]
75de9f41
JB
95 const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
96 .port2
e9dd5b66 97 workerNode.worker.postMessage(
85aeb3f3
JB
98 {
99 ready: false,
dbfa7948 100 workerId: this.getWorkerInfo(workerNodeKey).id,
85aeb3f3
JB
101 port: port2
102 },
103 [port2]
104 )
105 }
106
107 /** @inheritDoc */
108 protected registerWorkerMessageListener<Message extends Data | Response>(
aa9eede8 109 workerNodeKey: number,
85aeb3f3
JB
110 listener: (message: MessageValue<Message>) => void
111 ): void {
041dc05b 112 (
ae3ab61d 113 this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel
85aeb3f3 114 ).port1.on('message', listener)
4ade5f1f
S
115 }
116
ae036c3e
JB
117 /** @inheritDoc */
118 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
119 workerNodeKey: number,
120 listener: (message: MessageValue<Message>) => void
121 ): void {
122 (
ae3ab61d 123 this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel
ae036c3e
JB
124 ).port1.once('message', listener)
125 }
126
127 /** @inheritDoc */
128 protected deregisterWorkerMessageListener<Message extends Data | Response>(
129 workerNodeKey: number,
130 listener: (message: MessageValue<Message>) => void
131 ): void {
132 (
ae3ab61d 133 this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel
ae036c3e
JB
134 ).port1.off('message', listener)
135 }
136
afc003b2 137 /** @inheritDoc */
e102732c 138 protected createWorker (): Worker {
c97c7edb 139 return new Worker(this.filePath, {
90082c8c
JB
140 env: SHARE_ENV,
141 ...this.opts.workerOptions
4ade5f1f 142 })
c97c7edb
S
143 }
144
afc003b2 145 /** @inheritDoc */
8881ae32 146 protected get type (): PoolType {
6b27d407 147 return PoolTypes.fixed
7c0ba920
JB
148 }
149
184855e6
JB
150 /** @inheritDoc */
151 protected get worker (): WorkerType {
152 return WorkerTypes.thread
153 }
154
afc003b2 155 /** @inheritDoc */
c319c66b 156 protected get busy (): boolean {
c2ade475 157 return this.internalBusy()
7c0ba920 158 }
4ade5f1f 159}