36eddace5ec149c41b8daafbd4ada0b91babb32d
1 import { isMainThread
, MessageChannel
, SHARE_ENV
, Worker
} from
'worker_threads'
2 import type { Draft
, MessageValue
} from
'../../utility-types'
4 export type WorkerWithMessageChannel
= Worker
& Draft
<MessageChannel
>
6 export interface FixedThreadPoolOptions
{
8 * A function that will listen for error event on each worker thread.
10 errorHandler
?: (this: Worker
, e
: Error) => void
12 * A function that will listen for online event on each worker thread.
14 onlineHandler
?: (this: Worker
) => void
16 * A function that will listen for exit event on each worker thread.
18 exitHandler
?: (this: Worker
, code
: number) => void
20 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
28 * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
30 * This pool will select the worker thread in a round robin fashion.
32 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
35 // eslint-disable-next-line @typescript-eslint/no-explicit-any
36 export class FixedThreadPool
<Data
= any, Response
= any> {
37 public readonly workers
: WorkerWithMessageChannel
[] = []
38 public nextWorker
: number = 0
40 // threadId as key and an integer value
41 public readonly tasks
: Map
<WorkerWithMessageChannel
, number> = new Map
<
42 WorkerWithMessageChannel
,
46 protected id
: number = 0
49 * @param numThreads Num of threads for this worker pool.
50 * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
51 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
54 public readonly numThreads
: number,
55 public readonly filePath
: string,
56 public readonly opts
: FixedThreadPoolOptions
= { maxTasks
: 1000 }
59 throw new Error('Cannot start a thread pool from a worker thread !!!')
61 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
63 throw new Error('Please specify a file with a worker implementation')
66 for (let i
= 1; i
<= this.numThreads
; i
++) {
71 public async destroy (): Promise
<void> {
72 for (const worker
of this.workers
) {
73 await worker
.terminate()
78 * Execute the task specified into the constructor with the data parameter.
80 * @param data The input for the task specified.
81 * @returns Promise that is resolved when the task is done.
83 public execute (data
: Data
): Promise
<Response
> {
84 // configure worker to handle message with the specified task
85 const worker
= this.chooseWorker()
86 const previousWorkerIndex
= this.tasks
.get(worker
)
87 if (previousWorkerIndex
!== undefined) {
88 this.tasks
.set(worker
, previousWorkerIndex
+ 1)
90 throw Error('Worker could not be found in tasks map')
93 const res
= this.internalExecute(worker
, id
)
94 worker
.postMessage({ data
: data
|| {}, id
: id
})
98 protected internalExecute (
99 worker
: WorkerWithMessageChannel
,
101 ): Promise
<Response
> {
102 return new Promise((resolve
, reject
) => {
103 const listener
: (message
: MessageValue
<Response
>) => void = message
=> {
104 if (message
.id
=== id
) {
105 worker
.port2
?.removeListener('message', listener
)
106 const previousWorkerIndex
= this.tasks
.get(worker
)
107 if (previousWorkerIndex
!== undefined) {
108 this.tasks
.set(worker
, previousWorkerIndex
+ 1)
110 throw Error('Worker could not be found in tasks map')
112 if (message
.error
) reject(message
.error
)
113 else resolve(message
.data
as Response
)
116 worker
.port2
?.on('message', listener
)
120 protected chooseWorker (): WorkerWithMessageChannel
{
121 if (this.workers
.length
- 1 === this.nextWorker
) {
123 return this.workers
[this.nextWorker
]
126 return this.workers
[this.nextWorker
]
130 protected newWorker (): WorkerWithMessageChannel
{
131 const worker
: WorkerWithMessageChannel
= new Worker(this.filePath
, {
134 worker
.on('error', this.opts
.errorHandler
?? (() => {}))
135 worker
.on('online', this.opts
.onlineHandler
?? (() => {}))
136 // TODO handle properly when a thread exit
137 worker
.on('exit', this.opts
.exitHandler
?? (() => {}))
138 this.workers
.push(worker
)
139 const { port1
, port2
} = new MessageChannel()
140 worker
.postMessage({ parent: port1
}, [port1
])
143 // we will attach a listener for every task,
144 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
145 worker
.port2
.setMaxListeners(this.opts
.maxTasks
?? 1000)
147 this.tasks
.set(worker
, 0)