fix: ensure worker choice is retried at least the pool max size
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 type MessageChannel,
3 type MessagePort,
4 type TransferListItem,
5 type Worker,
6 isMainThread
7 } from 'node:worker_threads'
8 import type { MessageValue } from '../../utility-types'
9 import { AbstractPool } from '../abstract-pool'
10 import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
11 import { type WorkerType, WorkerTypes } from '../worker'
12
13 /**
14 * A thread pool with a fixed number of threads.
15 *
16 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
17 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
18 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
19 * @since 0.0.1
20 */
21 export class FixedThreadPool<
22 Data = unknown,
23 Response = unknown
24 > extends AbstractPool<Worker, Data, Response> {
25 /**
26 * Constructs a new poolifier fixed thread pool.
27 *
28 * @param numberOfThreads - Number of threads for this pool.
29 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
30 * @param opts - Options for this fixed thread pool.
31 */
32 public constructor (
33 numberOfThreads: number,
34 filePath: string,
35 opts: PoolOptions<Worker> = {},
36 maximumNumberOfThreads?: number
37 ) {
38 super(numberOfThreads, filePath, opts, maximumNumberOfThreads)
39 }
40
41 /** @inheritDoc */
42 protected isMain (): boolean {
43 return isMainThread
44 }
45
46 /** @inheritDoc */
47 protected sendToWorker (
48 workerNodeKey: number,
49 message: MessageValue<Data>,
50 transferList?: TransferListItem[]
51 ): void {
52 this.workerNodes[workerNodeKey].messageChannel?.port1?.postMessage(
53 { ...message, workerId: this.getWorkerInfo(workerNodeKey).id },
54 transferList
55 )
56 }
57
58 /** @inheritDoc */
59 protected sendStartupMessageToWorker (workerNodeKey: number): void {
60 const workerNode = this.workerNodes[workerNodeKey]
61 const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
62 .port2
63 workerNode.worker.postMessage(
64 {
65 ready: false,
66 workerId: this.getWorkerInfo(workerNodeKey).id,
67 port: port2
68 },
69 [port2]
70 )
71 }
72
73 /** @inheritDoc */
74 protected registerWorkerMessageListener<Message extends Data | Response>(
75 workerNodeKey: number,
76 listener: (message: MessageValue<Message>) => void
77 ): void {
78 this.workerNodes[workerNodeKey].messageChannel?.port1?.on(
79 'message',
80 listener
81 )
82 }
83
84 /** @inheritDoc */
85 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
86 workerNodeKey: number,
87 listener: (message: MessageValue<Message>) => void
88 ): void {
89 this.workerNodes[workerNodeKey].messageChannel?.port1?.once(
90 'message',
91 listener
92 )
93 }
94
95 /** @inheritDoc */
96 protected deregisterWorkerMessageListener<Message extends Data | Response>(
97 workerNodeKey: number,
98 listener: (message: MessageValue<Message>) => void
99 ): void {
100 this.workerNodes[workerNodeKey].messageChannel?.port1?.off(
101 'message',
102 listener
103 )
104 }
105
106 /** @inheritDoc */
107 protected get type (): PoolType {
108 return PoolTypes.fixed
109 }
110
111 /** @inheritDoc */
112 protected get worker (): WorkerType {
113 return WorkerTypes.thread
114 }
115
116 /** @inheritDoc */
117 protected get busy (): boolean {
118 return this.internalBusy()
119 }
120 }