Eslint configuration refinement (#97)
[poolifier.git] / src / fixed.ts
1 import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads'
2
3 export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
4
5 export type WorkerWithMessageChannel = Worker & Draft<MessageChannel>
6
7 export interface FixedThreadPoolOptions {
8 /**
9 * A function that will listen for error event on each worker thread.
10 */
11 errorHandler?: (this: Worker, e: Error) => void
12 /**
13 * A function that will listen for online event on each worker thread.
14 */
15 onlineHandler?: (this: Worker) => void
16 /**
17 * A function that will listen for exit event on each worker thread.
18 */
19 exitHandler?: (this: Worker, code: number) => void
20 /**
21 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
22 *
23 * @default 1000
24 */
25 maxTasks?: number
26 }
27
28 /**
29 * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
30 *
31 * This pool will select the worker thread in a round robin fashion.
32 *
33 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
34 * @since 0.0.1
35 */
36 // eslint-disable-next-line @typescript-eslint/no-explicit-any
37 export class FixedThreadPool<Data = any, Response = any> {
38 public readonly workers: WorkerWithMessageChannel[] = []
39 public nextWorker: number = 0
40
41 // threadId as key and an integer value
42 public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map<
43 WorkerWithMessageChannel,
44 number
45 >()
46
47 protected id: number = 0
48
49 /**
50 * @param numThreads Num of threads for this worker pool.
51 * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
52 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
53 */
54 public constructor (
55 public readonly numThreads: number,
56 public readonly filePath: string,
57 public readonly opts: FixedThreadPoolOptions = { maxTasks: 1000 }
58 ) {
59 if (!isMainThread) {
60 throw new Error('Cannot start a thread pool from a worker thread !!!')
61 }
62 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
63 if (!this.filePath) {
64 throw new Error('Please specify a file with a worker implementation')
65 }
66
67 for (let i = 1; i <= this.numThreads; i++) {
68 this.newWorker()
69 }
70 }
71
72 public async destroy (): Promise<void> {
73 for (const worker of this.workers) {
74 await worker.terminate()
75 }
76 }
77
78 /**
79 * Execute the task specified into the constructor with the data parameter.
80 *
81 * @param data The input for the task specified.
82 * @returns Promise that is resolved when the task is done.
83 */
84 public execute (data: Data): Promise<Response> {
85 // configure worker to handle message with the specified task
86 const worker = this.chooseWorker()
87 const previousWorkerIndex = this.tasks.get(worker)
88 if (previousWorkerIndex !== undefined) {
89 this.tasks.set(worker, previousWorkerIndex + 1)
90 } else {
91 throw Error('Worker could not be found in tasks map')
92 }
93 const id = ++this.id
94 const res = this.internalExecute(worker, id)
95 worker.postMessage({ data: data || {}, id: id })
96 return res
97 }
98
99 protected internalExecute (
100 worker: WorkerWithMessageChannel,
101 id: number
102 ): Promise<Response> {
103 return new Promise((resolve, reject) => {
104 const listener = (message: {
105 id: number
106 error?: string
107 data: Response
108 }): void => {
109 if (message.id === id) {
110 worker.port2?.removeListener('message', listener)
111 const previousWorkerIndex = this.tasks.get(worker)
112 if (previousWorkerIndex !== undefined) {
113 this.tasks.set(worker, previousWorkerIndex + 1)
114 } else {
115 throw Error('Worker could not be found in tasks map')
116 }
117 if (message.error) reject(message.error)
118 else resolve(message.data)
119 }
120 }
121 worker.port2?.on('message', listener)
122 })
123 }
124
125 protected chooseWorker (): WorkerWithMessageChannel {
126 if (this.workers.length - 1 === this.nextWorker) {
127 this.nextWorker = 0
128 return this.workers[this.nextWorker]
129 } else {
130 this.nextWorker++
131 return this.workers[this.nextWorker]
132 }
133 }
134
135 protected newWorker (): WorkerWithMessageChannel {
136 const worker: WorkerWithMessageChannel = new Worker(this.filePath, {
137 env: SHARE_ENV
138 })
139 worker.on('error', this.opts.errorHandler ?? (() => {}))
140 worker.on('online', this.opts.onlineHandler ?? (() => {}))
141 // TODO handle properly when a thread exit
142 worker.on('exit', this.opts.exitHandler ?? (() => {}))
143 this.workers.push(worker)
144 const { port1, port2 } = new MessageChannel()
145 worker.postMessage({ parent: port1 }, [port1])
146 worker.port1 = port1
147 worker.port2 = port2
148 // we will attach a listener for every task,
149 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
150 worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000)
151 // init tasks map
152 this.tasks.set(worker, 0)
153 return worker
154 }
155 }