refactor: limit pool internals public exposure
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fc3e6586 1import {
fc3e6586
JB
2 MessageChannel,
3 SHARE_ENV,
65d7a1c9
JB
4 Worker,
5 isMainThread
fc3e6586 6} from 'node:worker_threads'
deb85c12 7import type { Draft, MessageValue } from '../../utility-types'
c97c7edb 8import { AbstractPool } from '../abstract-pool'
184855e6
JB
9import {
10 type PoolOptions,
11 type PoolType,
12 PoolTypes,
13 type WorkerType,
14 WorkerTypes
15} from '../pool'
4ade5f1f 16
729c563d
S
17/**
18 * A thread worker with message channels for communication between main thread and thread worker.
19 */
c97c7edb 20export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
4ade5f1f
S
21
22/**
729c563d
S
23 * A thread pool with a fixed number of threads.
24 *
25 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
26 *
27 * This pool selects the threads in a round robin fashion.
4ade5f1f 28 *
38e795c1 29 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 30 * @typeParam Response - Type of execution response. This can only be serializable data.
4ade5f1f
S
31 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
32 * @since 0.0.1
33 */
d3c8a1a8 34export class FixedThreadPool<
deb85c12
JB
35 Data = unknown,
36 Response = unknown
d3c8a1a8 37> extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
4ade5f1f 38 /**
729c563d
S
39 * Constructs a new poolifier fixed thread pool.
40 *
38e795c1
JB
41 * @param numberOfThreads - Number of threads for this pool.
42 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
43 * @param opts - Options for this fixed thread pool.
4ade5f1f
S
44 */
45 public constructor (
5c5a1fb7 46 numberOfThreads: number,
c97c7edb 47 filePath: string,
1927ee67 48 opts: PoolOptions<ThreadWorkerWithMessageChannel> = {}
4ade5f1f 49 ) {
5c5a1fb7 50 super(numberOfThreads, filePath, opts)
c97c7edb 51 }
4ade5f1f 52
afc003b2 53 /** @inheritDoc */
c97c7edb
S
54 protected isMain (): boolean {
55 return isMainThread
4ade5f1f
S
56 }
57
afc003b2 58 /** @inheritDoc */
14a2e530 59 protected async destroyWorker (
c97c7edb
S
60 worker: ThreadWorkerWithMessageChannel
61 ): Promise<void> {
cefac5ba 62 this.sendToWorker(worker, { kill: 1 })
c97c7edb 63 await worker.terminate()
4ade5f1f
S
64 }
65
afc003b2 66 /** @inheritDoc */
c97c7edb
S
67 protected sendToWorker (
68 worker: ThreadWorkerWithMessageChannel,
69 message: MessageValue<Data>
70 ): void {
71 worker.postMessage(message)
4ade5f1f
S
72 }
73
afc003b2 74 /** @inheritDoc */
c319c66b 75 protected registerWorkerMessageListener<Message extends Data | Response>(
ef41a6e6 76 worker: ThreadWorkerWithMessageChannel,
4f7fa42a 77 listener: (message: MessageValue<Message>) => void
c97c7edb 78 ): void {
ef41a6e6 79 worker.port2?.on('message', listener)
4ade5f1f
S
80 }
81
afc003b2 82 /** @inheritDoc */
280c2a77 83 protected createWorker (): ThreadWorkerWithMessageChannel {
c97c7edb 84 return new Worker(this.filePath, {
4ade5f1f
S
85 env: SHARE_ENV
86 })
c97c7edb
S
87 }
88
afc003b2 89 /** @inheritDoc */
280c2a77 90 protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void {
4ade5f1f
S
91 const { port1, port2 } = new MessageChannel()
92 worker.postMessage({ parent: port1 }, [port1])
93 worker.port1 = port1
94 worker.port2 = port2
a05c10de 95 // Listen to worker messages.
be0676b3 96 this.registerWorkerMessageListener(worker, super.workerListener())
4ade5f1f 97 }
7c0ba920 98
afc003b2 99 /** @inheritDoc */
8881ae32 100 protected get type (): PoolType {
6b27d407 101 return PoolTypes.fixed
7c0ba920
JB
102 }
103
184855e6
JB
104 /** @inheritDoc */
105 protected get worker (): WorkerType {
106 return WorkerTypes.thread
107 }
108
08f3f44c 109 /** @inheritDoc */
6b27d407
JB
110 protected get minSize (): number {
111 return this.numberOfWorkers
112 }
113
114 /** @inheritDoc */
115 protected get maxSize (): number {
08f3f44c
JB
116 return this.numberOfWorkers
117 }
118
afc003b2 119 /** @inheritDoc */
c319c66b 120 protected get full (): boolean {
1f68cede 121 return this.workerNodes.length >= this.numberOfWorkers
c2ade475
JB
122 }
123
afc003b2 124 /** @inheritDoc */
c319c66b 125 protected get busy (): boolean {
c2ade475 126 return this.internalBusy()
7c0ba920 127 }
4ade5f1f 128}