Merge branch 'master' into elu-strategy
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fc3e6586 1import {
fc3e6586
JB
2 MessageChannel,
3 SHARE_ENV,
65d7a1c9 4 Worker,
90082c8c 5 type WorkerOptions,
65d7a1c9 6 isMainThread
fc3e6586 7} from 'node:worker_threads'
deb85c12 8import type { Draft, MessageValue } from '../../utility-types'
c97c7edb 9import { AbstractPool } from '../abstract-pool'
184855e6
JB
10import {
11 type PoolOptions,
12 type PoolType,
13 PoolTypes,
14 type WorkerType,
15 WorkerTypes
16} from '../pool'
4ade5f1f 17
90082c8c
JB
18/**
19 * Options for a poolifier thread pool.
20 */
21export interface ThreadPoolOptions extends PoolOptions<Worker> {
22 /**
23 * Worker options.
24 *
25 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
26 */
27 workerOptions?: WorkerOptions
28}
29
729c563d
S
30/**
31 * A thread worker with message channels for communication between main thread and thread worker.
32 */
c97c7edb 33export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
4ade5f1f
S
34
35/**
729c563d
S
36 * A thread pool with a fixed number of threads.
37 *
38 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
39 *
40 * This pool selects the threads in a round robin fashion.
4ade5f1f 41 *
38e795c1 42 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 43 * @typeParam Response - Type of execution response. This can only be serializable data.
4ade5f1f
S
44 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
45 * @since 0.0.1
46 */
d3c8a1a8 47export class FixedThreadPool<
deb85c12
JB
48 Data = unknown,
49 Response = unknown
d3c8a1a8 50> extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
4ade5f1f 51 /**
729c563d
S
52 * Constructs a new poolifier fixed thread pool.
53 *
38e795c1
JB
54 * @param numberOfThreads - Number of threads for this pool.
55 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
56 * @param opts - Options for this fixed thread pool.
4ade5f1f
S
57 */
58 public constructor (
5c5a1fb7 59 numberOfThreads: number,
c97c7edb 60 filePath: string,
90082c8c 61 protected readonly opts: ThreadPoolOptions = {}
4ade5f1f 62 ) {
5c5a1fb7 63 super(numberOfThreads, filePath, opts)
c97c7edb 64 }
4ade5f1f 65
afc003b2 66 /** @inheritDoc */
c97c7edb
S
67 protected isMain (): boolean {
68 return isMainThread
4ade5f1f
S
69 }
70
afc003b2 71 /** @inheritDoc */
14a2e530 72 protected async destroyWorker (
c97c7edb
S
73 worker: ThreadWorkerWithMessageChannel
74 ): Promise<void> {
cefac5ba 75 this.sendToWorker(worker, { kill: 1 })
c97c7edb 76 await worker.terminate()
4ade5f1f
S
77 }
78
afc003b2 79 /** @inheritDoc */
c97c7edb
S
80 protected sendToWorker (
81 worker: ThreadWorkerWithMessageChannel,
82 message: MessageValue<Data>
83 ): void {
84 worker.postMessage(message)
4ade5f1f
S
85 }
86
afc003b2 87 /** @inheritDoc */
c319c66b 88 protected registerWorkerMessageListener<Message extends Data | Response>(
ef41a6e6 89 worker: ThreadWorkerWithMessageChannel,
4f7fa42a 90 listener: (message: MessageValue<Message>) => void
c97c7edb 91 ): void {
ef41a6e6 92 worker.port2?.on('message', listener)
4ade5f1f
S
93 }
94
afc003b2 95 /** @inheritDoc */
280c2a77 96 protected createWorker (): ThreadWorkerWithMessageChannel {
c97c7edb 97 return new Worker(this.filePath, {
90082c8c
JB
98 env: SHARE_ENV,
99 ...this.opts.workerOptions
4ade5f1f 100 })
c97c7edb
S
101 }
102
afc003b2 103 /** @inheritDoc */
280c2a77 104 protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void {
4ade5f1f
S
105 const { port1, port2 } = new MessageChannel()
106 worker.postMessage({ parent: port1 }, [port1])
107 worker.port1 = port1
108 worker.port2 = port2
a05c10de 109 // Listen to worker messages.
be0676b3 110 this.registerWorkerMessageListener(worker, super.workerListener())
4ade5f1f 111 }
7c0ba920 112
afc003b2 113 /** @inheritDoc */
8881ae32 114 protected get type (): PoolType {
6b27d407 115 return PoolTypes.fixed
7c0ba920
JB
116 }
117
184855e6
JB
118 /** @inheritDoc */
119 protected get worker (): WorkerType {
120 return WorkerTypes.thread
121 }
122
08f3f44c 123 /** @inheritDoc */
6b27d407
JB
124 protected get minSize (): number {
125 return this.numberOfWorkers
126 }
127
128 /** @inheritDoc */
129 protected get maxSize (): number {
08f3f44c
JB
130 return this.numberOfWorkers
131 }
132
afc003b2 133 /** @inheritDoc */
c319c66b 134 protected get busy (): boolean {
c2ade475 135 return this.internalBusy()
7c0ba920 136 }
4ade5f1f 137}