build(deps): bump poolifier
[poolifier.git] / src / pools / thread / fixed.ts
... / ...
CommitLineData
1import {
2 isMainThread,
3 type TransferListItem,
4 type Worker
5} from 'node:worker_threads'
6
7import type { MessageValue } from '../../utility-types.js'
8import { AbstractPool } from '../abstract-pool.js'
9import { type PoolOptions, type PoolType, PoolTypes } from '../pool.js'
10import { type WorkerType, WorkerTypes } from '../worker.js'
11
12/**
13 * Options for a poolifier thread pool.
14 */
15export type ThreadPoolOptions = PoolOptions<Worker>
16
17/**
18 * A thread pool with a fixed number of threads.
19 *
20 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
22 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
23 * @since 0.0.1
24 */
25export class FixedThreadPool<
26 Data = unknown,
27 Response = unknown
28> extends AbstractPool<Worker, Data, Response> {
29 /**
30 * Constructs a new poolifier fixed thread pool.
31 *
32 * @param numberOfThreads - Number of threads for this pool.
33 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
34 * @param opts - Options for this fixed thread pool.
35 */
36 public constructor (
37 numberOfThreads: number,
38 filePath: string,
39 opts: ThreadPoolOptions = {},
40 maximumNumberOfThreads?: number
41 ) {
42 super(numberOfThreads, filePath, opts, maximumNumberOfThreads)
43 }
44
45 /** @inheritDoc */
46 protected isMain (): boolean {
47 return isMainThread
48 }
49
50 /** @inheritDoc */
51 protected sendToWorker (
52 workerNodeKey: number,
53 message: MessageValue<Data>,
54 transferList?: readonly TransferListItem[]
55 ): void {
56 this.workerNodes[workerNodeKey]?.messageChannel?.port1.postMessage(
57 {
58 ...message,
59 workerId: this.getWorkerInfo(workerNodeKey)?.id
60 } satisfies MessageValue<Data>,
61 transferList
62 )
63 }
64
65 /** @inheritDoc */
66 protected sendStartupMessageToWorker (workerNodeKey: number): void {
67 const workerNode = this.workerNodes[workerNodeKey]
68 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
69 const port2 = workerNode.messageChannel!.port2
70 workerNode.worker.postMessage(
71 {
72 ready: false,
73 workerId: this.getWorkerInfo(workerNodeKey)?.id,
74 port: port2
75 } satisfies MessageValue<Data>,
76 [port2]
77 )
78 }
79
80 /** @inheritDoc */
81 protected registerWorkerMessageListener<Message extends Data | Response>(
82 workerNodeKey: number,
83 listener: (message: MessageValue<Message>) => void
84 ): void {
85 this.workerNodes[workerNodeKey].messageChannel?.port1.on(
86 'message',
87 listener
88 )
89 }
90
91 /** @inheritDoc */
92 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
93 workerNodeKey: number,
94 listener: (message: MessageValue<Message>) => void
95 ): void {
96 this.workerNodes[workerNodeKey].messageChannel?.port1.once(
97 'message',
98 listener
99 )
100 }
101
102 /** @inheritDoc */
103 protected deregisterWorkerMessageListener<Message extends Data | Response>(
104 workerNodeKey: number,
105 listener: (message: MessageValue<Message>) => void
106 ): void {
107 this.workerNodes[workerNodeKey].messageChannel?.port1.off(
108 'message',
109 listener
110 )
111 }
112
113 /** @inheritDoc */
114 protected shallCreateDynamicWorker (): boolean {
115 return false
116 }
117
118 /** @inheritDoc */
119 protected checkAndEmitDynamicWorkerCreationEvents (): void {
120 /* noop */
121 }
122
123 /** @inheritDoc */
124 protected get type (): PoolType {
125 return PoolTypes.fixed
126 }
127
128 /** @inheritDoc */
129 protected get worker (): WorkerType {
130 return WorkerTypes.thread
131 }
132
133 /** @inheritDoc */
134 protected get busy (): boolean {
135 return this.internalBusy()
136 }
137}