chore: cleanup `@std/expect` imports
[poolifier.git] / thread / fixed.ts
... / ...
CommitLineData
1import {
2 type MessageChannel,
3 type MessagePort,
4 SHARE_ENV,
5 type TransferListItem,
6 Worker,
7 type WorkerOptions,
8 isMainThread
9} from 'node:worker_threads'
10import type { MessageValue } from '../../utility-types'
11import { AbstractPool } from '../abstract-pool'
12import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
13import { type WorkerType, WorkerTypes } from '../worker'
14
15/**
16 * Options for a poolifier thread pool.
17 */
18export interface ThreadPoolOptions extends PoolOptions<Worker> {
19 /**
20 * Worker options.
21 *
22 * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
23 */
24 workerOptions?: WorkerOptions
25}
26
27/**
28 * A thread pool with a fixed number of threads.
29 *
30 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
31 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
32 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
33 * @since 0.0.1
34 */
35export class FixedThreadPool<
36 Data = unknown,
37 Response = unknown
38> extends AbstractPool<Worker, Data, Response> {
39 /**
40 * Constructs a new poolifier fixed thread pool.
41 *
42 * @param numberOfThreads - Number of threads for this pool.
43 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
44 * @param opts - Options for this fixed thread pool.
45 */
46 public constructor (
47 numberOfThreads: number,
48 filePath: string,
49 protected readonly opts: ThreadPoolOptions = {}
50 ) {
51 super(numberOfThreads, filePath, opts)
52 }
53
54 /** @inheritDoc */
55 protected isMain (): boolean {
56 return isMainThread
57 }
58
59 /** @inheritDoc */
60 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
61 this.flushTasksQueue(workerNodeKey)
62 // FIXME: wait for tasks to be finished
63 const workerNode = this.workerNodes[workerNodeKey]
64 const worker = workerNode.worker
65 const waitWorkerExit = new Promise<void>((resolve) => {
66 worker.on('exit', () => {
67 resolve()
68 })
69 })
70 await this.sendKillMessageToWorker(
71 workerNodeKey,
72 workerNode.info.id as number
73 )
74 workerNode.closeChannel()
75 await worker.terminate()
76 await waitWorkerExit
77 }
78
79 /** @inheritDoc */
80 protected sendToWorker (
81 workerNodeKey: number,
82 message: MessageValue<Data>,
83 transferList?: TransferListItem[]
84 ): void {
85 (
86 this.workerNodes[workerNodeKey].messageChannel as MessageChannel
87 ).port1.postMessage(message, transferList)
88 }
89
90 /** @inheritDoc */
91 protected sendStartupMessageToWorker (workerNodeKey: number): void {
92 const workerNode = this.workerNodes[workerNodeKey]
93 const worker = workerNode.worker
94 const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
95 .port2
96 worker.postMessage(
97 {
98 ready: false,
99 workerId: workerNode.info.id,
100 port: port2
101 },
102 [port2]
103 )
104 }
105
106 /** @inheritDoc */
107 protected registerWorkerMessageListener<Message extends Data | Response>(
108 workerNodeKey: number,
109 listener: (message: MessageValue<Message>) => void
110 ): void {
111 (
112 this.workerNodes[workerNodeKey].messageChannel as MessageChannel
113 ).port1.on('message', listener)
114 }
115
116 /** @inheritDoc */
117 protected createWorker (): Worker {
118 return new Worker(this.filePath, {
119 env: SHARE_ENV,
120 ...this.opts.workerOptions
121 })
122 }
123
124 /** @inheritDoc */
125 protected get type (): PoolType {
126 return PoolTypes.fixed
127 }
128
129 /** @inheritDoc */
130 protected get worker (): WorkerType {
131 return WorkerTypes.thread
132 }
133
134 /** @inheritDoc */
135 protected get busy (): boolean {
136 return this.internalBusy()
137 }
138}