Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 MessageChannel,
3 SHARE_ENV,
4 Worker,
5 isMainThread
6 } from 'node:worker_threads'
7 import type { Draft, MessageValue } from '../../utility-types'
8 import { AbstractPool } from '../abstract-pool'
9 import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
10
11 /**
12 * A thread worker with message channels for communication between main thread and thread worker.
13 */
14 export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
15
16 /**
17 * A thread pool with a fixed number of threads.
18 *
19 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
20 *
21 * This pool selects the threads in a round robin fashion.
22 *
23 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
24 * @typeParam Response - Type of execution response. This can only be serializable data.
25 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
26 * @since 0.0.1
27 */
28 export class FixedThreadPool<
29 Data = unknown,
30 Response = unknown
31 > extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
32 /**
33 * Constructs a new poolifier fixed thread pool.
34 *
35 * @param numberOfThreads - Number of threads for this pool.
36 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
37 * @param opts - Options for this fixed thread pool.
38 */
39 public constructor (
40 numberOfThreads: number,
41 filePath: string,
42 opts: PoolOptions<ThreadWorkerWithMessageChannel> = {}
43 ) {
44 super(numberOfThreads, filePath, opts)
45 }
46
47 /** @inheritDoc */
48 protected isMain (): boolean {
49 return isMainThread
50 }
51
52 /** @inheritDoc */
53 protected async destroyWorker (
54 worker: ThreadWorkerWithMessageChannel
55 ): Promise<void> {
56 this.sendToWorker(worker, { kill: 1 })
57 await worker.terminate()
58 }
59
60 /** @inheritDoc */
61 protected sendToWorker (
62 worker: ThreadWorkerWithMessageChannel,
63 message: MessageValue<Data>
64 ): void {
65 worker.postMessage(message)
66 }
67
68 /** @inheritDoc */
69 protected registerWorkerMessageListener<Message extends Data | Response>(
70 worker: ThreadWorkerWithMessageChannel,
71 listener: (message: MessageValue<Message>) => void
72 ): void {
73 worker.port2?.on('message', listener)
74 }
75
76 /** @inheritDoc */
77 protected createWorker (): ThreadWorkerWithMessageChannel {
78 return new Worker(this.filePath, {
79 env: SHARE_ENV
80 })
81 }
82
83 /** @inheritDoc */
84 protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void {
85 const { port1, port2 } = new MessageChannel()
86 worker.postMessage({ parent: port1 }, [port1])
87 worker.port1 = port1
88 worker.port2 = port2
89 // Listen to worker messages.
90 this.registerWorkerMessageListener(worker, super.workerListener())
91 }
92
93 /** @inheritDoc */
94 public get type (): PoolType {
95 return PoolTypes.fixed
96 }
97
98 /** @inheritDoc */
99 protected get minSize (): number {
100 return this.numberOfWorkers
101 }
102
103 /** @inheritDoc */
104 protected get maxSize (): number {
105 return this.numberOfWorkers
106 }
107
108 /** @inheritDoc */
109 protected get full (): boolean {
110 return this.workerNodes.length >= this.numberOfWorkers
111 }
112
113 /** @inheritDoc */
114 protected get busy (): boolean {
115 return this.internalBusy()
116 }
117 }