Rewrite to TypeScript
[poolifier.git] / src / fixed.ts
CommitLineData
4ade5f1f
S
1import {
2 isMainThread,
3 MessageChannel,
4 SHARE_ENV,
5 Worker
6} from 'worker_threads'
7
8function empty () {}
9const _void = {}
10
11export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
12
13export type WorkerWithMessageChannel = Worker & Draft<MessageChannel>
14
15export interface FixedThreadPoolOptions {
16 /**
17 * A function that will listen for error event on each worker thread.
18 */
19 errorHandler?: (this: Worker, e: Error) => void
20 /**
21 * A function that will listen for online event on each worker thread.
22 */
23 onlineHandler?: (this: Worker) => void
24 /**
25 * A function that will listen for exit event on each worker thread.
26 */
27 exitHandler?: (this: Worker, code: number) => void
28 /**
29 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
30 *
31 * @default 1000
32 */
33 maxTasks?: number
34}
35
36/**
37 * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
38 *
39 * This pool will select the worker thread in a round robin fashion.
40 *
41 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
42 * @since 0.0.1
43 */
44export default class FixedThreadPool<Data = any, Response = any> {
45 public readonly workers: WorkerWithMessageChannel[] = []
46 public nextWorker: number = 0
47
48 // threadId as key and an integer value
49 public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map<
50 WorkerWithMessageChannel,
51 number
52 >()
53
54 protected _id: number = 0
55
56 /**
57 * @param numThreads Num of threads for this worker pool.
58 * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
59 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
60 */
61 public constructor (
62 public readonly numThreads: number,
63 public readonly filePath: string,
64 public readonly opts: FixedThreadPoolOptions = { maxTasks: 1000 }
65 ) {
66 if (!isMainThread)
67 throw new Error('Cannot start a thread pool from a worker thread !!!')
68 if (!this.filePath)
69 throw new Error('Please specify a file with a worker implementation')
70
71 for (let i = 1; i <= this.numThreads; i++) {
72 this._newWorker()
73 }
74 }
75
76 public async destroy (): Promise<void> {
77 for (const worker of this.workers) {
78 await worker.terminate()
79 }
80 }
81
82 /**
83 * Execute the task specified into the constructor with the data parameter.
84 *
85 * @param data The input for the task specified.
86 * @returns Promise that is resolved when the task is done.
87 */
88 public async execute (data: Data): Promise<Response> {
89 // configure worker to handle message with the specified task
90 const worker = this._chooseWorker()
91 this.tasks.set(worker, this.tasks.get(worker) + 1)
92 const id = ++this._id
93 const res = this._execute(worker, id)
94 worker.postMessage({ data: data || _void, _id: id })
95 return res
96 }
97
98 protected _execute (
99 worker: WorkerWithMessageChannel,
100 id: number
101 ): Promise<Response> {
102 return new Promise((resolve, reject) => {
103 const listener = (message) => {
104 if (message._id === id) {
105 worker.port2.removeListener('message', listener)
106 this.tasks.set(worker, this.tasks.get(worker) - 1)
107 if (message.error) reject(message.error)
108 else resolve(message.data)
109 }
110 }
111 worker.port2.on('message', listener)
112 })
113 }
114
115 protected _chooseWorker (): WorkerWithMessageChannel {
116 if (this.workers.length - 1 === this.nextWorker) {
117 this.nextWorker = 0
118 return this.workers[this.nextWorker]
119 } else {
120 this.nextWorker++
121 return this.workers[this.nextWorker]
122 }
123 }
124
125 protected _newWorker (): WorkerWithMessageChannel {
126 const worker: WorkerWithMessageChannel = new Worker(this.filePath, {
127 env: SHARE_ENV
128 })
129 worker.on('error', this.opts.errorHandler || empty)
130 worker.on('online', this.opts.onlineHandler || empty)
131 // TODO handle properly when a thread exit
132 worker.on('exit', this.opts.exitHandler || empty)
133 this.workers.push(worker)
134 const { port1, port2 } = new MessageChannel()
135 worker.postMessage({ parent: port1 }, [port1])
136 worker.port1 = port1
137 worker.port2 = port2
138 // we will attach a listener for every task,
139 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
140 worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
141 // init tasks map
142 this.tasks.set(worker, 0)
143 return worker
144 }
145}
146
147module.exports = FixedThreadPool