refactor: use named export in benchmarking code
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fc3e6586 1import {
fc3e6586
JB
2 MessageChannel,
3 SHARE_ENV,
65d7a1c9 4 Worker,
90082c8c 5 type WorkerOptions,
65d7a1c9 6 isMainThread
fc3e6586 7} from 'node:worker_threads'
deb85c12 8import type { Draft, MessageValue } from '../../utility-types'
c97c7edb 9import { AbstractPool } from '../abstract-pool'
184855e6
JB
10import {
11 type PoolOptions,
12 type PoolType,
13 PoolTypes,
14 type WorkerType,
15 WorkerTypes
16} from '../pool'
4ade5f1f 17
90082c8c
JB
18/**
19 * Options for a poolifier thread pool.
20 */
21export interface ThreadPoolOptions extends PoolOptions<Worker> {
22 /**
23 * Worker options.
24 *
25 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
26 */
27 workerOptions?: WorkerOptions
28}
29
729c563d
S
30/**
31 * A thread worker with message channels for communication between main thread and thread worker.
32 */
c97c7edb 33export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
4ade5f1f
S
34
35/**
729c563d
S
36 * A thread pool with a fixed number of threads.
37 *
38 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
39 *
38e795c1 40 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 41 * @typeParam Response - Type of execution response. This can only be serializable data.
4ade5f1f
S
42 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
43 * @since 0.0.1
44 */
d3c8a1a8 45export class FixedThreadPool<
deb85c12
JB
46 Data = unknown,
47 Response = unknown
d3c8a1a8 48> extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
4ade5f1f 49 /**
729c563d
S
50 * Constructs a new poolifier fixed thread pool.
51 *
38e795c1
JB
52 * @param numberOfThreads - Number of threads for this pool.
53 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
54 * @param opts - Options for this fixed thread pool.
4ade5f1f
S
55 */
56 public constructor (
5c5a1fb7 57 numberOfThreads: number,
c97c7edb 58 filePath: string,
90082c8c 59 protected readonly opts: ThreadPoolOptions = {}
4ade5f1f 60 ) {
5c5a1fb7 61 super(numberOfThreads, filePath, opts)
c97c7edb 62 }
4ade5f1f 63
afc003b2 64 /** @inheritDoc */
c97c7edb
S
65 protected isMain (): boolean {
66 return isMainThread
4ade5f1f
S
67 }
68
afc003b2 69 /** @inheritDoc */
14a2e530 70 protected async destroyWorker (
c97c7edb
S
71 worker: ThreadWorkerWithMessageChannel
72 ): Promise<void> {
cefac5ba 73 this.sendToWorker(worker, { kill: 1 })
c97c7edb 74 await worker.terminate()
4ade5f1f
S
75 }
76
afc003b2 77 /** @inheritDoc */
c97c7edb
S
78 protected sendToWorker (
79 worker: ThreadWorkerWithMessageChannel,
80 message: MessageValue<Data>
81 ): void {
82 worker.postMessage(message)
4ade5f1f
S
83 }
84
afc003b2 85 /** @inheritDoc */
c319c66b 86 protected registerWorkerMessageListener<Message extends Data | Response>(
ef41a6e6 87 worker: ThreadWorkerWithMessageChannel,
4f7fa42a 88 listener: (message: MessageValue<Message>) => void
c97c7edb 89 ): void {
ef41a6e6 90 worker.port2?.on('message', listener)
4ade5f1f
S
91 }
92
afc003b2 93 /** @inheritDoc */
280c2a77 94 protected createWorker (): ThreadWorkerWithMessageChannel {
c97c7edb 95 return new Worker(this.filePath, {
90082c8c
JB
96 env: SHARE_ENV,
97 ...this.opts.workerOptions
4ade5f1f 98 })
c97c7edb
S
99 }
100
afc003b2 101 /** @inheritDoc */
280c2a77 102 protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void {
4ade5f1f
S
103 const { port1, port2 } = new MessageChannel()
104 worker.postMessage({ parent: port1 }, [port1])
105 worker.port1 = port1
106 worker.port2 = port2
a05c10de 107 // Listen to worker messages.
be0676b3 108 this.registerWorkerMessageListener(worker, super.workerListener())
4ade5f1f 109 }
7c0ba920 110
afc003b2 111 /** @inheritDoc */
8881ae32 112 protected get type (): PoolType {
6b27d407 113 return PoolTypes.fixed
7c0ba920
JB
114 }
115
184855e6
JB
116 /** @inheritDoc */
117 protected get worker (): WorkerType {
118 return WorkerTypes.thread
119 }
120
08f3f44c 121 /** @inheritDoc */
6b27d407
JB
122 protected get minSize (): number {
123 return this.numberOfWorkers
124 }
125
126 /** @inheritDoc */
127 protected get maxSize (): number {
08f3f44c
JB
128 return this.numberOfWorkers
129 }
130
afc003b2 131 /** @inheritDoc */
c319c66b 132 protected get busy (): boolean {
c2ade475 133 return this.internalBusy()
7c0ba920 134 }
4ade5f1f 135}