fix: emit pool error if continous tasks stealing on idle fail
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fc3e6586 1import {
7d91a8cd 2 type TransferListItem,
c3719753 3 type Worker,
65d7a1c9 4 isMainThread
fc3e6586 5} from 'node:worker_threads'
d35e5717
JB
6import type { MessageValue } from '../../utility-types.js'
7import { AbstractPool } from '../abstract-pool.js'
8import { type PoolOptions, type PoolType, PoolTypes } from '../pool.js'
9import { type WorkerType, WorkerTypes } from '../worker.js'
4ade5f1f 10
2889bd70
JB
11/**
12 * Options for a poolifier thread pool.
13 */
14export type ThreadPoolOptions = PoolOptions<Worker>
15
4ade5f1f 16/**
729c563d
S
17 * A thread pool with a fixed number of threads.
18 *
e102732c
JB
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
4ade5f1f
S
21 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
22 * @since 0.0.1
23 */
d3c8a1a8 24export class FixedThreadPool<
deb85c12
JB
25 Data = unknown,
26 Response = unknown
e102732c 27> extends AbstractPool<Worker, Data, Response> {
4ade5f1f 28 /**
729c563d
S
29 * Constructs a new poolifier fixed thread pool.
30 *
38e795c1
JB
31 * @param numberOfThreads - Number of threads for this pool.
32 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
33 * @param opts - Options for this fixed thread pool.
4ade5f1f
S
34 */
35 public constructor (
5c5a1fb7 36 numberOfThreads: number,
c97c7edb 37 filePath: string,
2889bd70 38 opts: ThreadPoolOptions = {},
26ce26ca 39 maximumNumberOfThreads?: number
4ade5f1f 40 ) {
26ce26ca 41 super(numberOfThreads, filePath, opts, maximumNumberOfThreads)
c97c7edb 42 }
4ade5f1f 43
afc003b2 44 /** @inheritDoc */
c97c7edb
S
45 protected isMain (): boolean {
46 return isMainThread
4ade5f1f
S
47 }
48
afc003b2 49 /** @inheritDoc */
aa9eede8
JB
50 protected sendToWorker (
51 workerNodeKey: number,
7d91a8cd
JB
52 message: MessageValue<Data>,
53 transferList?: TransferListItem[]
aa9eede8 54 ): void {
c63a35a0 55 this.workerNodes[workerNodeKey].messageChannel?.port1.postMessage(
1851fed0 56 { ...message, workerId: this.getWorkerInfo(workerNodeKey)?.id },
72ae84a2
JB
57 transferList
58 )
85aeb3f3
JB
59 }
60
61 /** @inheritDoc */
aa9eede8 62 protected sendStartupMessageToWorker (workerNodeKey: number): void {
75de9f41 63 const workerNode = this.workerNodes[workerNodeKey]
67f3f2d6 64 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
7f0e1334 65 const port2 = workerNode.messageChannel!.port2
e9dd5b66 66 workerNode.worker.postMessage(
85aeb3f3
JB
67 {
68 ready: false,
1851fed0 69 workerId: this.getWorkerInfo(workerNodeKey)?.id,
85aeb3f3
JB
70 port: port2
71 },
72 [port2]
73 )
74 }
75
76 /** @inheritDoc */
77 protected registerWorkerMessageListener<Message extends Data | Response>(
aa9eede8 78 workerNodeKey: number,
85aeb3f3
JB
79 listener: (message: MessageValue<Message>) => void
80 ): void {
c63a35a0 81 this.workerNodes[workerNodeKey].messageChannel?.port1.on(
fa548cda
JB
82 'message',
83 listener
84 )
4ade5f1f
S
85 }
86
ae036c3e
JB
87 /** @inheritDoc */
88 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
89 workerNodeKey: number,
90 listener: (message: MessageValue<Message>) => void
91 ): void {
c63a35a0 92 this.workerNodes[workerNodeKey].messageChannel?.port1.once(
fa548cda
JB
93 'message',
94 listener
95 )
ae036c3e
JB
96 }
97
98 /** @inheritDoc */
99 protected deregisterWorkerMessageListener<Message extends Data | Response>(
100 workerNodeKey: number,
101 listener: (message: MessageValue<Message>) => void
102 ): void {
c63a35a0 103 this.workerNodes[workerNodeKey].messageChannel?.port1.off(
fa548cda
JB
104 'message',
105 listener
106 )
ae036c3e
JB
107 }
108
9d9fb7b6
JB
109 /** @inheritDoc */
110 protected shallCreateDynamicWorker (): boolean {
111 return false
112 }
113
d0878034 114 /** @inheritDoc */
d1c03778
JB
115 protected checkAndEmitDynamicWorkerCreationEvents (): void {
116 /* noop */
117 }
d0878034 118
afc003b2 119 /** @inheritDoc */
8881ae32 120 protected get type (): PoolType {
6b27d407 121 return PoolTypes.fixed
7c0ba920
JB
122 }
123
184855e6
JB
124 /** @inheritDoc */
125 protected get worker (): WorkerType {
126 return WorkerTypes.thread
127 }
128
afc003b2 129 /** @inheritDoc */
c319c66b 130 protected get busy (): boolean {
c2ade475 131 return this.internalBusy()
7c0ba920 132 }
4ade5f1f 133}