Merge most of the stuff from jerome-benoit:issue-62-ts-standard
[poolifier.git] / src / fixed.ts
1 import { MessageChannel, SHARE_ENV, Worker, isMainThread } from 'worker_threads'
2
3 function empty () {}
4 const _void = {}
5
6 export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
7
8 export type WorkerWithMessageChannel = Worker & Draft<MessageChannel>
9
10 export interface FixedThreadPoolOptions {
11 /**
12 * A function that will listen for error event on each worker thread.
13 */
14 errorHandler?: (this: Worker, e: Error) => void
15 /**
16 * A function that will listen for online event on each worker thread.
17 */
18 onlineHandler?: (this: Worker) => void
19 /**
20 * A function that will listen for exit event on each worker thread.
21 */
22 exitHandler?: (this: Worker, code: number) => void
23 /**
24 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
25 *
26 * @default 1000
27 */
28 maxTasks?: number
29 }
30
31 /**
32 * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
33 *
34 * This pool will select the worker thread in a round robin fashion.
35 *
36 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
37 * @since 0.0.1
38 */
39 export default class FixedThreadPool<Data = any, Response = any> {
40 public readonly workers: WorkerWithMessageChannel[] = []
41 public nextWorker: number = 0
42
43 // threadId as key and an integer value
44 public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map<
45 WorkerWithMessageChannel,
46 number
47 >()
48
49 protected _id: number = 0
50
51 /**
52 * @param numThreads Num of threads for this worker pool.
53 * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
54 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
55 */
56 public constructor (
57 public readonly numThreads: number,
58 public readonly filePath: string,
59 public readonly opts: FixedThreadPoolOptions = { maxTasks: 1000 }
60 ) {
61 if (!isMainThread) {
62 throw new Error('Cannot start a thread pool from a worker thread !!!')
63 }
64 if (!this.filePath) {
65 throw new Error('Please specify a file with a worker implementation')
66 }
67
68 for (let i = 1; i <= this.numThreads; i++) {
69 this._newWorker()
70 }
71 }
72
73 public async destroy (): Promise<void> {
74 for (const worker of this.workers) {
75 await worker.terminate()
76 }
77 }
78
79 /**
80 * Execute the task specified into the constructor with the data parameter.
81 *
82 * @param data The input for the task specified.
83 * @returns Promise that is resolved when the task is done.
84 */
85 public async execute (data: Data): Promise<Response> {
86 // configure worker to handle message with the specified task
87 const worker = this._chooseWorker()
88 this.tasks.set(worker, this.tasks.get(worker) + 1)
89 const id = ++this._id
90 const res = this._execute(worker, id)
91 worker.postMessage({ data: data || _void, _id: id })
92 return res
93 }
94
95 protected _execute (
96 worker: WorkerWithMessageChannel,
97 id: number
98 ): Promise<Response> {
99 return new Promise((resolve, reject) => {
100 const listener = (message) => {
101 if (message._id === id) {
102 worker.port2.removeListener('message', listener)
103 this.tasks.set(worker, this.tasks.get(worker) - 1)
104 if (message.error) reject(message.error)
105 else resolve(message.data)
106 }
107 }
108 worker.port2.on('message', listener)
109 })
110 }
111
112 protected _chooseWorker (): WorkerWithMessageChannel {
113 if (this.workers.length - 1 === this.nextWorker) {
114 this.nextWorker = 0
115 return this.workers[this.nextWorker]
116 } else {
117 this.nextWorker++
118 return this.workers[this.nextWorker]
119 }
120 }
121
122 protected _newWorker (): WorkerWithMessageChannel {
123 const worker: WorkerWithMessageChannel = new Worker(this.filePath, {
124 env: SHARE_ENV
125 })
126 worker.on('error', this.opts.errorHandler || empty)
127 worker.on('online', this.opts.onlineHandler || empty)
128 // TODO handle properly when a thread exit
129 worker.on('exit', this.opts.exitHandler || empty)
130 this.workers.push(worker)
131 const { port1, port2 } = new MessageChannel()
132 worker.postMessage({ parent: port1 }, [port1])
133 worker.port1 = port1
134 worker.port2 = port2
135 // we will attach a listener for every task,
136 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
137 worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
138 // init tasks map
139 this.tasks.set(worker, 0)
140 return worker
141 }
142 }
143
144 module.exports = FixedThreadPool