docs: add changelog entry
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fc3e6586 1import {
85aeb3f3
JB
2 type MessageChannel,
3 type MessagePort,
7d91a8cd 4 type TransferListItem,
c3719753 5 type Worker,
65d7a1c9 6 isMainThread
fc3e6586 7} from 'node:worker_threads'
e102732c 8import type { MessageValue } from '../../utility-types'
c97c7edb 9import { AbstractPool } from '../abstract-pool'
4b628b48
JB
10import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
11import { type WorkerType, WorkerTypes } from '../worker'
4ade5f1f 12
4ade5f1f 13/**
729c563d
S
14 * A thread pool with a fixed number of threads.
15 *
e102732c
JB
16 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
17 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
4ade5f1f
S
18 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
19 * @since 0.0.1
20 */
d3c8a1a8 21export class FixedThreadPool<
deb85c12
JB
22 Data = unknown,
23 Response = unknown
e102732c 24> extends AbstractPool<Worker, Data, Response> {
4ade5f1f 25 /**
729c563d
S
26 * Constructs a new poolifier fixed thread pool.
27 *
38e795c1
JB
28 * @param numberOfThreads - Number of threads for this pool.
29 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
30 * @param opts - Options for this fixed thread pool.
4ade5f1f
S
31 */
32 public constructor (
5c5a1fb7 33 numberOfThreads: number,
c97c7edb 34 filePath: string,
c3719753 35 protected readonly opts: PoolOptions<Worker> = {}
4ade5f1f 36 ) {
5c5a1fb7 37 super(numberOfThreads, filePath, opts)
c97c7edb 38 }
4ade5f1f 39
afc003b2 40 /** @inheritDoc */
c97c7edb
S
41 protected isMain (): boolean {
42 return isMainThread
4ade5f1f
S
43 }
44
afc003b2 45 /** @inheritDoc */
aa9eede8
JB
46 protected sendToWorker (
47 workerNodeKey: number,
7d91a8cd
JB
48 message: MessageValue<Data>,
49 transferList?: TransferListItem[]
aa9eede8 50 ): void {
fa548cda 51 this.workerNodes[workerNodeKey].messageChannel?.port1?.postMessage(
dbfa7948 52 { ...message, workerId: this.getWorkerInfo(workerNodeKey).id },
72ae84a2
JB
53 transferList
54 )
85aeb3f3
JB
55 }
56
57 /** @inheritDoc */
aa9eede8 58 protected sendStartupMessageToWorker (workerNodeKey: number): void {
75de9f41 59 const workerNode = this.workerNodes[workerNodeKey]
75de9f41
JB
60 const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
61 .port2
e9dd5b66 62 workerNode.worker.postMessage(
85aeb3f3
JB
63 {
64 ready: false,
dbfa7948 65 workerId: this.getWorkerInfo(workerNodeKey).id,
85aeb3f3
JB
66 port: port2
67 },
68 [port2]
69 )
70 }
71
72 /** @inheritDoc */
73 protected registerWorkerMessageListener<Message extends Data | Response>(
aa9eede8 74 workerNodeKey: number,
85aeb3f3
JB
75 listener: (message: MessageValue<Message>) => void
76 ): void {
fa548cda
JB
77 this.workerNodes[workerNodeKey].messageChannel?.port1?.on(
78 'message',
79 listener
80 )
4ade5f1f
S
81 }
82
ae036c3e
JB
83 /** @inheritDoc */
84 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
85 workerNodeKey: number,
86 listener: (message: MessageValue<Message>) => void
87 ): void {
fa548cda
JB
88 this.workerNodes[workerNodeKey].messageChannel?.port1?.once(
89 'message',
90 listener
91 )
ae036c3e
JB
92 }
93
94 /** @inheritDoc */
95 protected deregisterWorkerMessageListener<Message extends Data | Response>(
96 workerNodeKey: number,
97 listener: (message: MessageValue<Message>) => void
98 ): void {
fa548cda
JB
99 this.workerNodes[workerNodeKey].messageChannel?.port1?.off(
100 'message',
101 listener
102 )
ae036c3e
JB
103 }
104
afc003b2 105 /** @inheritDoc */
8881ae32 106 protected get type (): PoolType {
6b27d407 107 return PoolTypes.fixed
7c0ba920
JB
108 }
109
184855e6
JB
110 /** @inheritDoc */
111 protected get worker (): WorkerType {
112 return WorkerTypes.thread
113 }
114
afc003b2 115 /** @inheritDoc */
c319c66b 116 protected get busy (): boolean {
c2ade475 117 return this.internalBusy()
7c0ba920 118 }
4ade5f1f 119}