Rename functions and methods to not use prefix underscore (#86)
[poolifier.git] / src / fixed.ts
1 /* eslint-disable @typescript-eslint/strict-boolean-expressions */
2
3 import { MessageChannel, SHARE_ENV, Worker, isMainThread } from 'worker_threads'
4
5 export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
6
7 export type WorkerWithMessageChannel = Worker & Draft<MessageChannel>
8
9 export interface FixedThreadPoolOptions {
10 /**
11 * A function that will listen for error event on each worker thread.
12 */
13 errorHandler?: (this: Worker, e: Error) => void
14 /**
15 * A function that will listen for online event on each worker thread.
16 */
17 onlineHandler?: (this: Worker) => void
18 /**
19 * A function that will listen for exit event on each worker thread.
20 */
21 exitHandler?: (this: Worker, code: number) => void
22 /**
23 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
24 *
25 * @default 1000
26 */
27 maxTasks?: number
28 }
29
30 /**
31 * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
32 *
33 * This pool will select the worker thread in a round robin fashion.
34 *
35 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
36 * @since 0.0.1
37 */
38 export class FixedThreadPool<Data = any, Response = any> {
39 public readonly workers: WorkerWithMessageChannel[] = []
40 public nextWorker: number = 0
41
42 // threadId as key and an integer value
43 /* eslint-disable @typescript-eslint/indent */
44 public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map<
45 WorkerWithMessageChannel,
46 number
47 >()
48 /* eslint-enable @typescript-eslint/indent */
49
50 protected id: number = 0
51
52 /**
53 * @param numThreads Num of threads for this worker pool.
54 * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
55 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
56 */
57 public constructor (
58 public readonly numThreads: number,
59 public readonly filePath: string,
60 public readonly opts: FixedThreadPoolOptions = { maxTasks: 1000 }
61 ) {
62 if (!isMainThread) {
63 throw new Error('Cannot start a thread pool from a worker thread !!!')
64 }
65 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
66 if (!this.filePath) {
67 throw new Error('Please specify a file with a worker implementation')
68 }
69
70 for (let i = 1; i <= this.numThreads; i++) {
71 this.newWorker()
72 }
73 }
74
75 public async destroy (): Promise<void> {
76 for (const worker of this.workers) {
77 await worker.terminate()
78 }
79 }
80
81 /**
82 * Execute the task specified into the constructor with the data parameter.
83 *
84 * @param data The input for the task specified.
85 * @returns Promise that is resolved when the task is done.
86 */
87 // eslint-disable-next-line @typescript-eslint/promise-function-async
88 public execute (data: Data): Promise<Response> {
89 // configure worker to handle message with the specified task
90 const worker = this.chooseWorker()
91 const previousWorkerIndex = this.tasks.get(worker)
92 if (previousWorkerIndex !== undefined) {
93 this.tasks.set(worker, previousWorkerIndex + 1)
94 } else {
95 throw Error('Worker could not be found in tasks map')
96 }
97 const id = ++this.id
98 const res = this.internalExecute(worker, id)
99 worker.postMessage({ data: data || {}, id: id })
100 return res
101 }
102
103 // eslint-disable-next-line @typescript-eslint/promise-function-async
104 protected internalExecute (
105 worker: WorkerWithMessageChannel,
106 id: number
107 ): Promise<Response> {
108 return new Promise((resolve, reject) => {
109 const listener = (message: {
110 id: number
111 error?: string
112 data: Response
113 }): void => {
114 if (message.id === id) {
115 worker.port2?.removeListener('message', listener)
116 const previousWorkerIndex = this.tasks.get(worker)
117 if (previousWorkerIndex !== undefined) {
118 this.tasks.set(worker, previousWorkerIndex + 1)
119 } else {
120 throw Error('Worker could not be found in tasks map')
121 }
122 if (message.error) reject(message.error)
123 else resolve(message.data)
124 }
125 }
126 worker.port2?.on('message', listener)
127 })
128 }
129
130 protected chooseWorker (): WorkerWithMessageChannel {
131 if (this.workers.length - 1 === this.nextWorker) {
132 this.nextWorker = 0
133 return this.workers[this.nextWorker]
134 } else {
135 this.nextWorker++
136 return this.workers[this.nextWorker]
137 }
138 }
139
140 protected newWorker (): WorkerWithMessageChannel {
141 const worker: WorkerWithMessageChannel = new Worker(this.filePath, {
142 env: SHARE_ENV
143 })
144 worker.on('error', this.opts.errorHandler ?? (() => {}))
145 worker.on('online', this.opts.onlineHandler ?? (() => {}))
146 // TODO handle properly when a thread exit
147 worker.on('exit', this.opts.exitHandler ?? (() => {}))
148 this.workers.push(worker)
149 const { port1, port2 } = new MessageChannel()
150 worker.postMessage({ parent: port1 }, [port1])
151 worker.port1 = port1
152 worker.port2 = port2
153 // we will attach a listener for every task,
154 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
155 worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000)
156 // init tasks map
157 this.tasks.set(worker, 0)
158 return worker
159 }
160 }