perf: optimize worker node lookup by worker id
[poolifier.git] / src / pools / thread / fixed.ts
... / ...
CommitLineData
1import {
2 type MessageChannel,
3 type MessagePort,
4 SHARE_ENV,
5 Worker,
6 type WorkerOptions,
7 isMainThread
8} from 'node:worker_threads'
9import type { MessageValue } from '../../utility-types'
10import { AbstractPool } from '../abstract-pool'
11import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
12import { type WorkerType, WorkerTypes } from '../worker'
13
14/**
15 * Options for a poolifier thread pool.
16 */
17export interface ThreadPoolOptions extends PoolOptions<Worker> {
18 /**
19 * Worker options.
20 *
21 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
22 */
23 workerOptions?: WorkerOptions
24}
25
26/**
27 * A thread pool with a fixed number of threads.
28 *
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
31 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
32 * @since 0.0.1
33 */
34export class FixedThreadPool<
35 Data = unknown,
36 Response = unknown
37> extends AbstractPool<Worker, Data, Response> {
38 /**
39 * Constructs a new poolifier fixed thread pool.
40 *
41 * @param numberOfThreads - Number of threads for this pool.
42 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
43 * @param opts - Options for this fixed thread pool.
44 */
45 public constructor (
46 numberOfThreads: number,
47 filePath: string,
48 protected readonly opts: ThreadPoolOptions = {}
49 ) {
50 super(numberOfThreads, filePath, opts)
51 }
52
53 /** @inheritDoc */
54 protected isMain (): boolean {
55 return isMainThread
56 }
57
58 /** @inheritDoc */
59 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
60 const workerNode = this.workerNodes[workerNodeKey]
61 const worker = workerNode.worker
62 this.sendToWorker(workerNodeKey, { kill: true, workerId: worker.threadId })
63 workerNode.closeChannel()
64 await worker.terminate()
65 }
66
67 /** @inheritDoc */
68 protected sendToWorker (
69 workerNodeKey: number,
70 message: MessageValue<Data>
71 ): void {
72 (
73 this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel
74 ).port1.postMessage(message)
75 }
76
77 /** @inheritDoc */
78 protected sendStartupMessageToWorker (workerNodeKey: number): void {
79 const worker = this.workerNodes[workerNodeKey].worker
80 const port2: MessagePort = (
81 this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel
82 ).port2
83 worker.postMessage(
84 {
85 ready: false,
86 workerId: worker.threadId,
87 port: port2
88 },
89 [port2]
90 )
91 }
92
93 /** @inheritDoc */
94 protected registerWorkerMessageListener<Message extends Data | Response>(
95 workerNodeKey: number,
96 listener: (message: MessageValue<Message>) => void
97 ): void {
98 (
99 this.getWorkerInfo(workerNodeKey).messageChannel as MessageChannel
100 ).port1.on('message', listener)
101 }
102
103 /** @inheritDoc */
104 protected createWorker (): Worker {
105 return new Worker(this.filePath, {
106 env: SHARE_ENV,
107 ...this.opts.workerOptions
108 })
109 }
110
111 /** @inheritDoc */
112 protected get type (): PoolType {
113 return PoolTypes.fixed
114 }
115
116 /** @inheritDoc */
117 protected get worker (): WorkerType {
118 return WorkerTypes.thread
119 }
120
121 /** @inheritDoc */
122 protected get minSize (): number {
123 return this.numberOfWorkers
124 }
125
126 /** @inheritDoc */
127 protected get maxSize (): number {
128 return this.numberOfWorkers
129 }
130
131 /** @inheritDoc */
132 protected get busy (): boolean {
133 return this.internalBusy()
134 }
135}