1 import { isMainThread
, MessageChannel
, SHARE_ENV
, Worker
} from
'worker_threads'
2 import type { Draft
, JSONValue
, MessageValue
} from
'../../utility-types'
3 import type { PoolOptions
} from
'../abstract-pool'
4 import { AbstractPool
} from
'../abstract-pool'
6 export type ThreadWorkerWithMessageChannel
= Worker
& Draft
<MessageChannel
>
9 * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
11 * This pool will select the worker thread in a round robin fashion.
13 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
16 export class FixedThreadPool
<
17 Data
extends JSONValue
= JSONValue
,
18 Response
extends JSONValue
= JSONValue
19 > extends AbstractPool
<ThreadWorkerWithMessageChannel
, Data
, Response
> {
21 * @param numThreads Num of threads for this worker pool.
22 * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
23 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
28 opts
: PoolOptions
<ThreadWorkerWithMessageChannel
> = { maxTasks
: 1000 }
30 super(numThreads
, filePath
, opts
)
33 protected isMain (): boolean {
37 protected async destroyWorker (
38 worker
: ThreadWorkerWithMessageChannel
40 await worker
.terminate()
43 protected sendToWorker (
44 worker
: ThreadWorkerWithMessageChannel
,
45 message
: MessageValue
<Data
>
47 worker
.postMessage(message
)
50 protected registerWorkerMessageListener (
51 port
: ThreadWorkerWithMessageChannel
,
52 listener
: (message
: MessageValue
<Response
>) => void
54 port
.port2
?.on('message', listener
)
57 protected unregisterWorkerMessageListener (
58 port
: ThreadWorkerWithMessageChannel
,
59 listener
: (message
: MessageValue
<Response
>) => void
61 port
.port2
?.removeListener('message', listener
)
64 protected newWorker (): ThreadWorkerWithMessageChannel
{
65 return new Worker(this.filePath
, {
70 protected afterNewWorkerPushed (
71 worker
: ThreadWorkerWithMessageChannel
73 const { port1
, port2
} = new MessageChannel()
74 worker
.postMessage({ parent: port1
}, [port1
])
77 // we will attach a listener for every task,
78 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
79 worker
.port2
.setMaxListeners(this.opts
.maxTasks
?? 1000)