refactor: factor out worker node termination code
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 type MessageChannel,
3 type MessagePort,
4 type TransferListItem,
5 type Worker,
6 isMainThread
7 } from 'node:worker_threads'
8 import type { MessageValue } from '../../utility-types'
9 import { AbstractPool } from '../abstract-pool'
10 import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
11 import { type WorkerType, WorkerTypes } from '../worker'
12
13 /**
14 * A thread pool with a fixed number of threads.
15 *
16 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
17 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
18 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
19 * @since 0.0.1
20 */
21 export class FixedThreadPool<
22 Data = unknown,
23 Response = unknown
24 > extends AbstractPool<Worker, Data, Response> {
25 /**
26 * Constructs a new poolifier fixed thread pool.
27 *
28 * @param numberOfThreads - Number of threads for this pool.
29 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
30 * @param opts - Options for this fixed thread pool.
31 */
32 public constructor (
33 numberOfThreads: number,
34 filePath: string,
35 protected readonly opts: PoolOptions<Worker> = {}
36 ) {
37 super(numberOfThreads, filePath, opts)
38 }
39
40 /** @inheritDoc */
41 protected isMain (): boolean {
42 return isMainThread
43 }
44
45 /** @inheritDoc */
46 protected sendToWorker (
47 workerNodeKey: number,
48 message: MessageValue<Data>,
49 transferList?: TransferListItem[]
50 ): void {
51 this.workerNodes[workerNodeKey].messageChannel?.port1?.postMessage(
52 { ...message, workerId: this.getWorkerInfo(workerNodeKey).id },
53 transferList
54 )
55 }
56
57 /** @inheritDoc */
58 protected sendStartupMessageToWorker (workerNodeKey: number): void {
59 const workerNode = this.workerNodes[workerNodeKey]
60 const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
61 .port2
62 workerNode.worker.postMessage(
63 {
64 ready: false,
65 workerId: this.getWorkerInfo(workerNodeKey).id,
66 port: port2
67 },
68 [port2]
69 )
70 }
71
72 /** @inheritDoc */
73 protected registerWorkerMessageListener<Message extends Data | Response>(
74 workerNodeKey: number,
75 listener: (message: MessageValue<Message>) => void
76 ): void {
77 this.workerNodes[workerNodeKey].messageChannel?.port1?.on(
78 'message',
79 listener
80 )
81 }
82
83 /** @inheritDoc */
84 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
85 workerNodeKey: number,
86 listener: (message: MessageValue<Message>) => void
87 ): void {
88 this.workerNodes[workerNodeKey].messageChannel?.port1?.once(
89 'message',
90 listener
91 )
92 }
93
94 /** @inheritDoc */
95 protected deregisterWorkerMessageListener<Message extends Data | Response>(
96 workerNodeKey: number,
97 listener: (message: MessageValue<Message>) => void
98 ): void {
99 this.workerNodes[workerNodeKey].messageChannel?.port1?.off(
100 'message',
101 listener
102 )
103 }
104
105 /** @inheritDoc */
106 protected get type (): PoolType {
107 return PoolTypes.fixed
108 }
109
110 /** @inheritDoc */
111 protected get worker (): WorkerType {
112 return WorkerTypes.thread
113 }
114
115 /** @inheritDoc */
116 protected get busy (): boolean {
117 return this.internalBusy()
118 }
119 }