Jerome info
[poolifier.git] / src / fixed.ts
CommitLineData
ee99693b
S
1/* eslint-disable @typescript-eslint/strict-boolean-expressions */
2
f045358d 3import { MessageChannel, SHARE_ENV, Worker, isMainThread } from 'worker_threads'
4ade5f1f 4
ee99693b 5function empty (): void {}
4ade5f1f
S
6const _void = {}
7
8export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
9
10export type WorkerWithMessageChannel = Worker & Draft<MessageChannel>
11
12export interface FixedThreadPoolOptions {
13 /**
14 * A function that will listen for error event on each worker thread.
15 */
16 errorHandler?: (this: Worker, e: Error) => void
17 /**
18 * A function that will listen for online event on each worker thread.
19 */
20 onlineHandler?: (this: Worker) => void
21 /**
22 * A function that will listen for exit event on each worker thread.
23 */
24 exitHandler?: (this: Worker, code: number) => void
25 /**
26 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
27 *
28 * @default 1000
29 */
30 maxTasks?: number
31}
32
33/**
34 * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
35 *
36 * This pool will select the worker thread in a round robin fashion.
37 *
38 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
39 * @since 0.0.1
40 */
41export default class FixedThreadPool<Data = any, Response = any> {
42 public readonly workers: WorkerWithMessageChannel[] = []
43 public nextWorker: number = 0
44
45 // threadId as key and an integer value
ee99693b 46 /* eslint-disable @typescript-eslint/indent */
4ade5f1f
S
47 public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map<
48 WorkerWithMessageChannel,
49 number
50 >()
ee99693b 51 /* eslint-enable @typescript-eslint/indent */
4ade5f1f
S
52
53 protected _id: number = 0
54
55 /**
56 * @param numThreads Num of threads for this worker pool.
57 * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
58 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
59 */
60 public constructor (
61 public readonly numThreads: number,
62 public readonly filePath: string,
63 public readonly opts: FixedThreadPoolOptions = { maxTasks: 1000 }
64 ) {
f045358d 65 if (!isMainThread) {
4ade5f1f 66 throw new Error('Cannot start a thread pool from a worker thread !!!')
f045358d 67 }
ee99693b 68 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
f045358d 69 if (!this.filePath) {
4ade5f1f 70 throw new Error('Please specify a file with a worker implementation')
f045358d 71 }
4ade5f1f
S
72
73 for (let i = 1; i <= this.numThreads; i++) {
74 this._newWorker()
75 }
76 }
77
78 public async destroy (): Promise<void> {
79 for (const worker of this.workers) {
80 await worker.terminate()
81 }
82 }
83
84 /**
85 * Execute the task specified into the constructor with the data parameter.
86 *
87 * @param data The input for the task specified.
88 * @returns Promise that is resolved when the task is done.
89 */
ee99693b
S
90 // eslint-disable-next-line @typescript-eslint/promise-function-async
91 public execute (data: Data): Promise<Response> {
4ade5f1f
S
92 // configure worker to handle message with the specified task
93 const worker = this._chooseWorker()
d62d9c97
S
94 const previousWorkerIndex = this.tasks.get(worker)
95 if (previousWorkerIndex !== undefined) {
96 this.tasks.set(worker, previousWorkerIndex + 1)
97 } else {
98 throw Error('Worker could not be found in tasks map')
99 }
4ade5f1f
S
100 const id = ++this._id
101 const res = this._execute(worker, id)
102 worker.postMessage({ data: data || _void, _id: id })
103 return res
104 }
105
ee99693b 106 // eslint-disable-next-line @typescript-eslint/promise-function-async
4ade5f1f
S
107 protected _execute (
108 worker: WorkerWithMessageChannel,
109 id: number
110 ): Promise<Response> {
111 return new Promise((resolve, reject) => {
ee99693b
S
112 const listener = (message: {
113 _id: number
114 error?: string
115 data: Response
116 }): void => {
4ade5f1f 117 if (message._id === id) {
ee99693b 118 worker.port2?.removeListener('message', listener)
d62d9c97
S
119 const previousWorkerIndex = this.tasks.get(worker)
120 if (previousWorkerIndex !== undefined) {
121 this.tasks.set(worker, previousWorkerIndex + 1)
122 } else {
123 throw Error('Worker could not be found in tasks map')
124 }
4ade5f1f
S
125 if (message.error) reject(message.error)
126 else resolve(message.data)
127 }
128 }
ee99693b 129 worker.port2?.on('message', listener)
4ade5f1f
S
130 })
131 }
132
133 protected _chooseWorker (): WorkerWithMessageChannel {
134 if (this.workers.length - 1 === this.nextWorker) {
135 this.nextWorker = 0
136 return this.workers[this.nextWorker]
137 } else {
138 this.nextWorker++
139 return this.workers[this.nextWorker]
140 }
141 }
142
143 protected _newWorker (): WorkerWithMessageChannel {
144 const worker: WorkerWithMessageChannel = new Worker(this.filePath, {
145 env: SHARE_ENV
146 })
ee99693b
S
147 worker.on('error', this.opts.errorHandler ?? empty)
148 worker.on('online', this.opts.onlineHandler ?? empty)
4ade5f1f 149 // TODO handle properly when a thread exit
ee99693b 150 worker.on('exit', this.opts.exitHandler ?? empty)
4ade5f1f
S
151 this.workers.push(worker)
152 const { port1, port2 } = new MessageChannel()
153 worker.postMessage({ parent: port1 }, [port1])
154 worker.port1 = port1
155 worker.port2 = port2
156 // we will attach a listener for every task,
157 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
ee99693b 158 worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000)
4ade5f1f
S
159 // init tasks map
160 this.tasks.set(worker, 0)
161 return worker
162 }
163}
164
165module.exports = FixedThreadPool