1 /* eslint-disable @typescript-eslint/strict-boolean-expressions */
3 import { MessageChannel
, SHARE_ENV
, Worker
, isMainThread
} from
'worker_threads'
5 function empty (): void {}
8 export type Draft
<T
> = { -readonly [P
in keyof T
]?: T
[P
] }
10 export type WorkerWithMessageChannel
= Worker
& Draft
<MessageChannel
>
12 export interface FixedThreadPoolOptions
{
14 * A function that will listen for error event on each worker thread.
16 errorHandler
?: (this: Worker
, e
: Error) => void
18 * A function that will listen for online event on each worker thread.
20 onlineHandler
?: (this: Worker
) => void
22 * A function that will listen for exit event on each worker thread.
24 exitHandler
?: (this: Worker
, code
: number) => void
26 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
34 * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
36 * This pool will select the worker thread in a round robin fashion.
38 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
41 export default class FixedThreadPool
<Data
= any, Response
= any> {
42 public readonly workers
: WorkerWithMessageChannel
[] = []
43 public nextWorker
: number = 0
45 // threadId as key and an integer value
46 /* eslint-disable @typescript-eslint/indent */
47 public readonly tasks
: Map
<WorkerWithMessageChannel
, number> = new Map
<
48 WorkerWithMessageChannel
,
51 /* eslint-enable @typescript-eslint/indent */
53 protected _id
: number = 0
56 * @param numThreads Num of threads for this worker pool.
57 * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
58 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
61 public readonly numThreads
: number,
62 public readonly filePath
: string,
63 public readonly opts
: FixedThreadPoolOptions
= { maxTasks
: 1000 }
66 throw new Error('Cannot start a thread pool from a worker thread !!!')
68 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
70 throw new Error('Please specify a file with a worker implementation')
73 for (let i
= 1; i
<= this.numThreads
; i
++) {
78 public async destroy (): Promise
<void> {
79 for (const worker
of this.workers
) {
80 await worker
.terminate()
85 * Execute the task specified into the constructor with the data parameter.
87 * @param data The input for the task specified.
88 * @returns Promise that is resolved when the task is done.
90 // eslint-disable-next-line @typescript-eslint/promise-function-async
91 public execute (data
: Data
): Promise
<Response
> {
92 // configure worker to handle message with the specified task
93 const worker
= this._chooseWorker()
94 this.tasks
.set(worker
, (this.tasks
.get(worker
) ?? 0) + 1)
96 const res
= this._execute(worker
, id
)
97 worker
.postMessage({ data
: data
|| _void
, _id
: id
})
101 // eslint-disable-next-line @typescript-eslint/promise-function-async
103 worker
: WorkerWithMessageChannel
,
105 ): Promise
<Response
> {
106 return new Promise((resolve
, reject
) => {
107 const listener
= (message
: {
112 if (message
._id
=== id
) {
113 worker
.port2
?.removeListener('message', listener
)
114 this.tasks
.set(worker
, (this.tasks
.get(worker
) ?? 0) - 1)
115 if (message
.error
) reject(message
.error
)
116 else resolve(message
.data
)
119 worker
.port2
?.on('message', listener
)
123 protected _chooseWorker (): WorkerWithMessageChannel
{
124 if (this.workers
.length
- 1 === this.nextWorker
) {
126 return this.workers
[this.nextWorker
]
129 return this.workers
[this.nextWorker
]
133 protected _newWorker (): WorkerWithMessageChannel
{
134 const worker
: WorkerWithMessageChannel
= new Worker(this.filePath
, {
137 worker
.on('error', this.opts
.errorHandler
?? empty
)
138 worker
.on('online', this.opts
.onlineHandler
?? empty
)
139 // TODO handle properly when a thread exit
140 worker
.on('exit', this.opts
.exitHandler
?? empty
)
141 this.workers
.push(worker
)
142 const { port1
, port2
} = new MessageChannel()
143 worker
.postMessage({ parent: port1
}, [port1
])
146 // we will attach a listener for every task,
147 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
148 worker
.port2
.setMaxListeners(this.opts
.maxTasks
?? 1000)
150 this.tasks
.set(worker
, 0)
155 module.exports
= FixedThreadPool