Commit | Line | Data |
---|---|---|
fc3e6586 | 1 | import { |
85aeb3f3 | 2 | type MessagePort, |
7d91a8cd | 3 | type TransferListItem, |
c3719753 | 4 | type Worker, |
65d7a1c9 | 5 | isMainThread |
fc3e6586 | 6 | } from 'node:worker_threads' |
d35e5717 JB |
7 | import type { MessageValue } from '../../utility-types.js' |
8 | import { AbstractPool } from '../abstract-pool.js' | |
9 | import { type PoolOptions, type PoolType, PoolTypes } from '../pool.js' | |
10 | import { type WorkerType, WorkerTypes } from '../worker.js' | |
4ade5f1f | 11 | |
2889bd70 JB |
12 | /** |
13 | * Options for a poolifier thread pool. | |
14 | */ | |
15 | export type ThreadPoolOptions = PoolOptions<Worker> | |
16 | ||
4ade5f1f | 17 | /** |
729c563d S |
18 | * A thread pool with a fixed number of threads. |
19 | * | |
e102732c JB |
20 | * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. |
21 | * @typeParam Response - Type of execution response. This can only be structured-cloneable data. | |
4ade5f1f S |
22 | * @author [Alessandro Pio Ardizio](https://github.com/pioardi) |
23 | * @since 0.0.1 | |
24 | */ | |
d3c8a1a8 | 25 | export class FixedThreadPool< |
deb85c12 JB |
26 | Data = unknown, |
27 | Response = unknown | |
e102732c | 28 | > extends AbstractPool<Worker, Data, Response> { |
4ade5f1f | 29 | /** |
729c563d S |
30 | * Constructs a new poolifier fixed thread pool. |
31 | * | |
38e795c1 JB |
32 | * @param numberOfThreads - Number of threads for this pool. |
33 | * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute. | |
34 | * @param opts - Options for this fixed thread pool. | |
4ade5f1f S |
35 | */ |
36 | public constructor ( | |
5c5a1fb7 | 37 | numberOfThreads: number, |
c97c7edb | 38 | filePath: string, |
2889bd70 | 39 | opts: ThreadPoolOptions = {}, |
26ce26ca | 40 | maximumNumberOfThreads?: number |
4ade5f1f | 41 | ) { |
26ce26ca | 42 | super(numberOfThreads, filePath, opts, maximumNumberOfThreads) |
c97c7edb | 43 | } |
4ade5f1f | 44 | |
afc003b2 | 45 | /** @inheritDoc */ |
c97c7edb S |
46 | protected isMain (): boolean { |
47 | return isMainThread | |
4ade5f1f S |
48 | } |
49 | ||
afc003b2 | 50 | /** @inheritDoc */ |
aa9eede8 JB |
51 | protected sendToWorker ( |
52 | workerNodeKey: number, | |
7d91a8cd JB |
53 | message: MessageValue<Data>, |
54 | transferList?: TransferListItem[] | |
aa9eede8 | 55 | ): void { |
fa548cda | 56 | this.workerNodes[workerNodeKey].messageChannel?.port1?.postMessage( |
dbfa7948 | 57 | { ...message, workerId: this.getWorkerInfo(workerNodeKey).id }, |
72ae84a2 JB |
58 | transferList |
59 | ) | |
85aeb3f3 JB |
60 | } |
61 | ||
62 | /** @inheritDoc */ | |
aa9eede8 | 63 | protected sendStartupMessageToWorker (workerNodeKey: number): void { |
75de9f41 | 64 | const workerNode = this.workerNodes[workerNodeKey] |
67f3f2d6 JB |
65 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
66 | const port2: MessagePort = workerNode.messageChannel!.port2 | |
e9dd5b66 | 67 | workerNode.worker.postMessage( |
85aeb3f3 JB |
68 | { |
69 | ready: false, | |
dbfa7948 | 70 | workerId: this.getWorkerInfo(workerNodeKey).id, |
85aeb3f3 JB |
71 | port: port2 |
72 | }, | |
73 | [port2] | |
74 | ) | |
75 | } | |
76 | ||
77 | /** @inheritDoc */ | |
78 | protected registerWorkerMessageListener<Message extends Data | Response>( | |
aa9eede8 | 79 | workerNodeKey: number, |
85aeb3f3 JB |
80 | listener: (message: MessageValue<Message>) => void |
81 | ): void { | |
fa548cda JB |
82 | this.workerNodes[workerNodeKey].messageChannel?.port1?.on( |
83 | 'message', | |
84 | listener | |
85 | ) | |
4ade5f1f S |
86 | } |
87 | ||
ae036c3e JB |
88 | /** @inheritDoc */ |
89 | protected registerOnceWorkerMessageListener<Message extends Data | Response>( | |
90 | workerNodeKey: number, | |
91 | listener: (message: MessageValue<Message>) => void | |
92 | ): void { | |
fa548cda JB |
93 | this.workerNodes[workerNodeKey].messageChannel?.port1?.once( |
94 | 'message', | |
95 | listener | |
96 | ) | |
ae036c3e JB |
97 | } |
98 | ||
99 | /** @inheritDoc */ | |
100 | protected deregisterWorkerMessageListener<Message extends Data | Response>( | |
101 | workerNodeKey: number, | |
102 | listener: (message: MessageValue<Message>) => void | |
103 | ): void { | |
fa548cda JB |
104 | this.workerNodes[workerNodeKey].messageChannel?.port1?.off( |
105 | 'message', | |
106 | listener | |
107 | ) | |
ae036c3e JB |
108 | } |
109 | ||
9d9fb7b6 JB |
110 | /** @inheritDoc */ |
111 | protected shallCreateDynamicWorker (): boolean { | |
112 | return false | |
113 | } | |
114 | ||
d0878034 | 115 | /** @inheritDoc */ |
d1c03778 JB |
116 | protected checkAndEmitDynamicWorkerCreationEvents (): void { |
117 | /* noop */ | |
118 | } | |
d0878034 | 119 | |
afc003b2 | 120 | /** @inheritDoc */ |
8881ae32 | 121 | protected get type (): PoolType { |
6b27d407 | 122 | return PoolTypes.fixed |
7c0ba920 JB |
123 | } |
124 | ||
184855e6 JB |
125 | /** @inheritDoc */ |
126 | protected get worker (): WorkerType { | |
127 | return WorkerTypes.thread | |
128 | } | |
129 | ||
afc003b2 | 130 | /** @inheritDoc */ |
c319c66b | 131 | protected get busy (): boolean { |
c2ade475 | 132 | return this.internalBusy() |
7c0ba920 | 133 | } |
4ade5f1f | 134 | } |