refactor: move worker setup into worker node constructor
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fc3e6586 1import {
85aeb3f3
JB
2 type MessageChannel,
3 type MessagePort,
7d91a8cd 4 type TransferListItem,
c3719753 5 type Worker,
65d7a1c9 6 isMainThread
fc3e6586 7} from 'node:worker_threads'
e102732c 8import type { MessageValue } from '../../utility-types'
c97c7edb 9import { AbstractPool } from '../abstract-pool'
4b628b48
JB
10import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
11import { type WorkerType, WorkerTypes } from '../worker'
4ade5f1f 12
4ade5f1f 13/**
729c563d
S
14 * A thread pool with a fixed number of threads.
15 *
e102732c
JB
16 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
17 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
4ade5f1f
S
18 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
19 * @since 0.0.1
20 */
d3c8a1a8 21export class FixedThreadPool<
deb85c12
JB
22 Data = unknown,
23 Response = unknown
e102732c 24> extends AbstractPool<Worker, Data, Response> {
4ade5f1f 25 /**
729c563d
S
26 * Constructs a new poolifier fixed thread pool.
27 *
38e795c1
JB
28 * @param numberOfThreads - Number of threads for this pool.
29 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
30 * @param opts - Options for this fixed thread pool.
4ade5f1f
S
31 */
32 public constructor (
5c5a1fb7 33 numberOfThreads: number,
c97c7edb 34 filePath: string,
c3719753 35 protected readonly opts: PoolOptions<Worker> = {}
4ade5f1f 36 ) {
5c5a1fb7 37 super(numberOfThreads, filePath, opts)
c97c7edb 38 }
4ade5f1f 39
afc003b2 40 /** @inheritDoc */
c97c7edb
S
41 protected isMain (): boolean {
42 return isMainThread
4ade5f1f
S
43 }
44
afc003b2 45 /** @inheritDoc */
aa9eede8 46 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
ae3ab61d 47 this.flagWorkerNodeAsNotReady(workerNodeKey)
81c02522
JB
48 this.flushTasksQueue(workerNodeKey)
49 // FIXME: wait for tasks to be finished
aa9eede8 50 const workerNode = this.workerNodes[workerNodeKey]
041dc05b 51 const waitWorkerExit = new Promise<void>(resolve => {
c3719753 52 workerNode.registerOnceWorkerEventHandler('exit', () => {
81c02522
JB
53 resolve()
54 })
55 })
72ae84a2 56 await this.sendKillMessageToWorker(workerNodeKey)
aa9eede8 57 workerNode.closeChannel()
78f60f82 58 workerNode.removeAllListeners()
c3719753 59 await workerNode.worker.terminate()
c2301b8e 60 await waitWorkerExit
4ade5f1f
S
61 }
62
afc003b2 63 /** @inheritDoc */
aa9eede8
JB
64 protected sendToWorker (
65 workerNodeKey: number,
7d91a8cd
JB
66 message: MessageValue<Data>,
67 transferList?: TransferListItem[]
aa9eede8 68 ): void {
fa548cda 69 this.workerNodes[workerNodeKey].messageChannel?.port1?.postMessage(
dbfa7948 70 { ...message, workerId: this.getWorkerInfo(workerNodeKey).id },
72ae84a2
JB
71 transferList
72 )
85aeb3f3
JB
73 }
74
75 /** @inheritDoc */
aa9eede8 76 protected sendStartupMessageToWorker (workerNodeKey: number): void {
75de9f41 77 const workerNode = this.workerNodes[workerNodeKey]
75de9f41
JB
78 const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
79 .port2
e9dd5b66 80 workerNode.worker.postMessage(
85aeb3f3
JB
81 {
82 ready: false,
dbfa7948 83 workerId: this.getWorkerInfo(workerNodeKey).id,
85aeb3f3
JB
84 port: port2
85 },
86 [port2]
87 )
88 }
89
90 /** @inheritDoc */
91 protected registerWorkerMessageListener<Message extends Data | Response>(
aa9eede8 92 workerNodeKey: number,
85aeb3f3
JB
93 listener: (message: MessageValue<Message>) => void
94 ): void {
fa548cda
JB
95 this.workerNodes[workerNodeKey].messageChannel?.port1?.on(
96 'message',
97 listener
98 )
4ade5f1f
S
99 }
100
ae036c3e
JB
101 /** @inheritDoc */
102 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
103 workerNodeKey: number,
104 listener: (message: MessageValue<Message>) => void
105 ): void {
fa548cda
JB
106 this.workerNodes[workerNodeKey].messageChannel?.port1?.once(
107 'message',
108 listener
109 )
ae036c3e
JB
110 }
111
112 /** @inheritDoc */
113 protected deregisterWorkerMessageListener<Message extends Data | Response>(
114 workerNodeKey: number,
115 listener: (message: MessageValue<Message>) => void
116 ): void {
fa548cda
JB
117 this.workerNodes[workerNodeKey].messageChannel?.port1?.off(
118 'message',
119 listener
120 )
ae036c3e
JB
121 }
122
afc003b2 123 /** @inheritDoc */
8881ae32 124 protected get type (): PoolType {
6b27d407 125 return PoolTypes.fixed
7c0ba920
JB
126 }
127
184855e6
JB
128 /** @inheritDoc */
129 protected get worker (): WorkerType {
130 return WorkerTypes.thread
131 }
132
afc003b2 133 /** @inheritDoc */
c319c66b 134 protected get busy (): boolean {
c2ade475 135 return this.internalBusy()
7c0ba920 136 }
4ade5f1f 137}