refactor: add sanity check at getting worker info
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fc3e6586 1import {
85aeb3f3
JB
2 type MessageChannel,
3 type MessagePort,
fc3e6586 4 SHARE_ENV,
65d7a1c9 5 Worker,
90082c8c 6 type WorkerOptions,
65d7a1c9 7 isMainThread
fc3e6586 8} from 'node:worker_threads'
e102732c 9import type { MessageValue } from '../../utility-types'
c97c7edb 10import { AbstractPool } from '../abstract-pool'
4b628b48
JB
11import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
12import { type WorkerType, WorkerTypes } from '../worker'
4ade5f1f 13
90082c8c
JB
14/**
15 * Options for a poolifier thread pool.
16 */
17export interface ThreadPoolOptions extends PoolOptions<Worker> {
18 /**
19 * Worker options.
20 *
21 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
22 */
23 workerOptions?: WorkerOptions
24}
25
4ade5f1f 26/**
729c563d
S
27 * A thread pool with a fixed number of threads.
28 *
e102732c
JB
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
4ade5f1f
S
31 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
32 * @since 0.0.1
33 */
d3c8a1a8 34export class FixedThreadPool<
deb85c12
JB
35 Data = unknown,
36 Response = unknown
e102732c 37> extends AbstractPool<Worker, Data, Response> {
4ade5f1f 38 /**
729c563d
S
39 * Constructs a new poolifier fixed thread pool.
40 *
38e795c1
JB
41 * @param numberOfThreads - Number of threads for this pool.
42 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
43 * @param opts - Options for this fixed thread pool.
4ade5f1f
S
44 */
45 public constructor (
5c5a1fb7 46 numberOfThreads: number,
c97c7edb 47 filePath: string,
90082c8c 48 protected readonly opts: ThreadPoolOptions = {}
4ade5f1f 49 ) {
5c5a1fb7 50 super(numberOfThreads, filePath, opts)
c97c7edb 51 }
4ade5f1f 52
afc003b2 53 /** @inheritDoc */
c97c7edb
S
54 protected isMain (): boolean {
55 return isMainThread
4ade5f1f
S
56 }
57
afc003b2 58 /** @inheritDoc */
e102732c 59 protected async destroyWorker (worker: Worker): Promise<void> {
21f710aa 60 this.sendToWorker(worker, { kill: true, workerId: worker.threadId })
85aeb3f3
JB
61 const workerInfo = this.getWorkerInfoByWorker(worker)
62 workerInfo.messageChannel?.port1.close()
63 workerInfo.messageChannel?.port2.close()
c97c7edb 64 await worker.terminate()
4ade5f1f
S
65 }
66
afc003b2 67 /** @inheritDoc */
e102732c 68 protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
85aeb3f3
JB
69 (
70 this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
71 ).port1.postMessage(message)
72 }
73
74 /** @inheritDoc */
75 protected sendStartupMessageToWorker (worker: Worker): void {
76 const port2: MessagePort = (
77 this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
78 ).port2
79 worker.postMessage(
80 {
81 ready: false,
a038b517 82 workerId: worker.threadId,
85aeb3f3
JB
83 port: port2
84 },
85 [port2]
86 )
87 }
88
89 /** @inheritDoc */
90 protected registerWorkerMessageListener<Message extends Data | Response>(
91 worker: Worker,
92 listener: (message: MessageValue<Message>) => void
93 ): void {
94 (
95 this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
96 ).port1.on('message', listener)
4ade5f1f
S
97 }
98
afc003b2 99 /** @inheritDoc */
e102732c 100 protected createWorker (): Worker {
c97c7edb 101 return new Worker(this.filePath, {
90082c8c
JB
102 env: SHARE_ENV,
103 ...this.opts.workerOptions
4ade5f1f 104 })
c97c7edb
S
105 }
106
afc003b2 107 /** @inheritDoc */
8881ae32 108 protected get type (): PoolType {
6b27d407 109 return PoolTypes.fixed
7c0ba920
JB
110 }
111
184855e6
JB
112 /** @inheritDoc */
113 protected get worker (): WorkerType {
114 return WorkerTypes.thread
115 }
116
08f3f44c 117 /** @inheritDoc */
6b27d407
JB
118 protected get minSize (): number {
119 return this.numberOfWorkers
120 }
121
122 /** @inheritDoc */
123 protected get maxSize (): number {
08f3f44c
JB
124 return this.numberOfWorkers
125 }
126
afc003b2 127 /** @inheritDoc */
c319c66b 128 protected get busy (): boolean {
c2ade475 129 return this.internalBusy()
7c0ba920 130 }
4ade5f1f 131}