764d5851d9be4d6cd9c52b2333b1348c50c803bf
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 isMainThread,
3 MessageChannel,
4 SHARE_ENV,
5 Worker
6 } from 'node:worker_threads'
7 import type { Draft, MessageValue } from '../../utility-types'
8 import { AbstractPool } from '../abstract-pool'
9 import type { PoolOptions } from '../pool'
10 import { PoolType } from '../pool'
11
12 /**
13 * A thread worker with message channels for communication between main thread and thread worker.
14 */
15 export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
16
17 /**
18 * A thread pool with a fixed number of threads.
19 *
20 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
21 *
22 * This pool selects the threads in a round robin fashion.
23 *
24 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
25 * @typeParam Response - Type of response of execution. This can only be serializable data.
26 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
27 * @since 0.0.1
28 */
29 export class FixedThreadPool<
30 Data = unknown,
31 Response = unknown
32 > extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
33 /**
34 * Constructs a new poolifier fixed thread pool.
35 *
36 * @param numberOfThreads - Number of threads for this pool.
37 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
38 * @param opts - Options for this fixed thread pool.
39 */
40 public constructor (
41 numberOfThreads: number,
42 filePath: string,
43 opts: PoolOptions<ThreadWorkerWithMessageChannel> = {}
44 ) {
45 super(numberOfThreads, filePath, opts)
46 }
47
48 /** @inheritDoc */
49 protected isMain (): boolean {
50 return isMainThread
51 }
52
53 /** @inheritDoc */
54 protected async destroyWorker (
55 worker: ThreadWorkerWithMessageChannel
56 ): Promise<void> {
57 this.sendToWorker(worker, { kill: 1 })
58 await worker.terminate()
59 }
60
61 /** @inheritDoc */
62 protected sendToWorker (
63 worker: ThreadWorkerWithMessageChannel,
64 message: MessageValue<Data>
65 ): void {
66 worker.postMessage(message)
67 }
68
69 /** @inheritDoc */
70 protected registerWorkerMessageListener<Message extends Data | Response>(
71 messageChannel: ThreadWorkerWithMessageChannel,
72 listener: (message: MessageValue<Message>) => void
73 ): void {
74 messageChannel.port2?.on('message', listener)
75 }
76
77 /** @inheritDoc */
78 protected createWorker (): ThreadWorkerWithMessageChannel {
79 return new Worker(this.filePath, {
80 env: SHARE_ENV
81 })
82 }
83
84 /** @inheritDoc */
85 protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void {
86 const { port1, port2 } = new MessageChannel()
87 worker.postMessage({ parent: port1 }, [port1])
88 worker.port1 = port1
89 worker.port2 = port2
90 // Listen to worker messages.
91 this.registerWorkerMessageListener(worker, super.workerListener())
92 }
93
94 /** @inheritDoc */
95 public get type (): PoolType {
96 return PoolType.FIXED
97 }
98
99 /** @inheritDoc */
100 protected get full (): boolean {
101 return this.workerNodes.length === this.numberOfWorkers
102 }
103
104 /** @inheritDoc */
105 protected get busy (): boolean {
106 return this.internalBusy()
107 }
108 }