feat: expose worker type in pool information
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 MessageChannel,
3 SHARE_ENV,
4 Worker,
5 isMainThread
6 } from 'node:worker_threads'
7 import type { Draft, MessageValue } from '../../utility-types'
8 import { AbstractPool } from '../abstract-pool'
9 import {
10 type PoolOptions,
11 type PoolType,
12 PoolTypes,
13 type WorkerType,
14 WorkerTypes
15 } from '../pool'
16
17 /**
18 * A thread worker with message channels for communication between main thread and thread worker.
19 */
20 export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
21
22 /**
23 * A thread pool with a fixed number of threads.
24 *
25 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
26 *
27 * This pool selects the threads in a round robin fashion.
28 *
29 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
30 * @typeParam Response - Type of execution response. This can only be serializable data.
31 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
32 * @since 0.0.1
33 */
34 export class FixedThreadPool<
35 Data = unknown,
36 Response = unknown
37 > extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
38 /**
39 * Constructs a new poolifier fixed thread pool.
40 *
41 * @param numberOfThreads - Number of threads for this pool.
42 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
43 * @param opts - Options for this fixed thread pool.
44 */
45 public constructor (
46 numberOfThreads: number,
47 filePath: string,
48 opts: PoolOptions<ThreadWorkerWithMessageChannel> = {}
49 ) {
50 super(numberOfThreads, filePath, opts)
51 }
52
53 /** @inheritDoc */
54 protected isMain (): boolean {
55 return isMainThread
56 }
57
58 /** @inheritDoc */
59 protected async destroyWorker (
60 worker: ThreadWorkerWithMessageChannel
61 ): Promise<void> {
62 this.sendToWorker(worker, { kill: 1 })
63 await worker.terminate()
64 }
65
66 /** @inheritDoc */
67 protected sendToWorker (
68 worker: ThreadWorkerWithMessageChannel,
69 message: MessageValue<Data>
70 ): void {
71 worker.postMessage(message)
72 }
73
74 /** @inheritDoc */
75 protected registerWorkerMessageListener<Message extends Data | Response>(
76 worker: ThreadWorkerWithMessageChannel,
77 listener: (message: MessageValue<Message>) => void
78 ): void {
79 worker.port2?.on('message', listener)
80 }
81
82 /** @inheritDoc */
83 protected createWorker (): ThreadWorkerWithMessageChannel {
84 return new Worker(this.filePath, {
85 env: SHARE_ENV
86 })
87 }
88
89 /** @inheritDoc */
90 protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void {
91 const { port1, port2 } = new MessageChannel()
92 worker.postMessage({ parent: port1 }, [port1])
93 worker.port1 = port1
94 worker.port2 = port2
95 // Listen to worker messages.
96 this.registerWorkerMessageListener(worker, super.workerListener())
97 }
98
99 /** @inheritDoc */
100 public get type (): PoolType {
101 return PoolTypes.fixed
102 }
103
104 /** @inheritDoc */
105 protected get worker (): WorkerType {
106 return WorkerTypes.thread
107 }
108
109 /** @inheritDoc */
110 protected get minSize (): number {
111 return this.numberOfWorkers
112 }
113
114 /** @inheritDoc */
115 protected get maxSize (): number {
116 return this.numberOfWorkers
117 }
118
119 /** @inheritDoc */
120 protected get full (): boolean {
121 return this.workerNodes.length >= this.numberOfWorkers
122 }
123
124 /** @inheritDoc */
125 protected get busy (): boolean {
126 return this.internalBusy()
127 }
128 }