refactor: move worker setup into worker node constructor
[poolifier.git] / src / pools / thread / fixed.ts
1 import {
2 type MessageChannel,
3 type MessagePort,
4 type TransferListItem,
5 type Worker,
6 isMainThread
7 } from 'node:worker_threads'
8 import type { MessageValue } from '../../utility-types'
9 import { AbstractPool } from '../abstract-pool'
10 import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
11 import { type WorkerType, WorkerTypes } from '../worker'
12
13 /**
14 * A thread pool with a fixed number of threads.
15 *
16 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
17 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
18 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
19 * @since 0.0.1
20 */
21 export class FixedThreadPool<
22 Data = unknown,
23 Response = unknown
24 > extends AbstractPool<Worker, Data, Response> {
25 /**
26 * Constructs a new poolifier fixed thread pool.
27 *
28 * @param numberOfThreads - Number of threads for this pool.
29 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
30 * @param opts - Options for this fixed thread pool.
31 */
32 public constructor (
33 numberOfThreads: number,
34 filePath: string,
35 protected readonly opts: PoolOptions<Worker> = {}
36 ) {
37 super(numberOfThreads, filePath, opts)
38 }
39
40 /** @inheritDoc */
41 protected isMain (): boolean {
42 return isMainThread
43 }
44
45 /** @inheritDoc */
46 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
47 this.flagWorkerNodeAsNotReady(workerNodeKey)
48 this.flushTasksQueue(workerNodeKey)
49 // FIXME: wait for tasks to be finished
50 const workerNode = this.workerNodes[workerNodeKey]
51 const waitWorkerExit = new Promise<void>(resolve => {
52 workerNode.registerOnceWorkerEventHandler('exit', () => {
53 resolve()
54 })
55 })
56 await this.sendKillMessageToWorker(workerNodeKey)
57 workerNode.closeChannel()
58 workerNode.removeAllListeners()
59 await workerNode.worker.terminate()
60 await waitWorkerExit
61 }
62
63 /** @inheritDoc */
64 protected sendToWorker (
65 workerNodeKey: number,
66 message: MessageValue<Data>,
67 transferList?: TransferListItem[]
68 ): void {
69 this.workerNodes[workerNodeKey].messageChannel?.port1?.postMessage(
70 { ...message, workerId: this.getWorkerInfo(workerNodeKey).id },
71 transferList
72 )
73 }
74
75 /** @inheritDoc */
76 protected sendStartupMessageToWorker (workerNodeKey: number): void {
77 const workerNode = this.workerNodes[workerNodeKey]
78 const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
79 .port2
80 workerNode.worker.postMessage(
81 {
82 ready: false,
83 workerId: this.getWorkerInfo(workerNodeKey).id,
84 port: port2
85 },
86 [port2]
87 )
88 }
89
90 /** @inheritDoc */
91 protected registerWorkerMessageListener<Message extends Data | Response>(
92 workerNodeKey: number,
93 listener: (message: MessageValue<Message>) => void
94 ): void {
95 this.workerNodes[workerNodeKey].messageChannel?.port1?.on(
96 'message',
97 listener
98 )
99 }
100
101 /** @inheritDoc */
102 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
103 workerNodeKey: number,
104 listener: (message: MessageValue<Message>) => void
105 ): void {
106 this.workerNodes[workerNodeKey].messageChannel?.port1?.once(
107 'message',
108 listener
109 )
110 }
111
112 /** @inheritDoc */
113 protected deregisterWorkerMessageListener<Message extends Data | Response>(
114 workerNodeKey: number,
115 listener: (message: MessageValue<Message>) => void
116 ): void {
117 this.workerNodes[workerNodeKey].messageChannel?.port1?.off(
118 'message',
119 listener
120 )
121 }
122
123 /** @inheritDoc */
124 protected get type (): PoolType {
125 return PoolTypes.fixed
126 }
127
128 /** @inheritDoc */
129 protected get worker (): WorkerType {
130 return WorkerTypes.thread
131 }
132
133 /** @inheritDoc */
134 protected get busy (): boolean {
135 return this.internalBusy()
136 }
137 }