1 import { isMainThread
, MessageChannel
, SHARE_ENV
, Worker
} from
'worker_threads'
2 import type { Draft
, MessageValue
} from
'../../utility-types'
3 import type { PoolOptions
} from
'../abstract-pool'
4 import { AbstractPool
} from
'../abstract-pool'
6 export type ThreadWorkerWithMessageChannel
= Worker
& Draft
<MessageChannel
>
9 * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
11 * This pool will select the worker thread in a round robin fashion.
13 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
16 // eslint-disable-next-line @typescript-eslint/no-explicit-any
17 export class FixedThreadPool
<Data
= any, Response
= any> extends AbstractPool
<
18 ThreadWorkerWithMessageChannel
,
23 * @param numThreads Num of threads for this worker pool.
24 * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
25 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
30 opts
: PoolOptions
<ThreadWorkerWithMessageChannel
> = { maxTasks
: 1000 }
32 super(numThreads
, filePath
, opts
)
35 protected isMain (): boolean {
39 protected async destroyWorker (
40 worker
: ThreadWorkerWithMessageChannel
42 await worker
.terminate()
45 protected sendToWorker (
46 worker
: ThreadWorkerWithMessageChannel
,
47 message
: MessageValue
<Data
>
49 worker
.postMessage(message
)
52 protected registerWorkerMessageListener (
53 port
: ThreadWorkerWithMessageChannel
,
54 listener
: (message
: MessageValue
<Response
>) => void
56 port
.port2
?.on('message', listener
)
59 protected unregisterWorkerMessageListener (
60 port
: ThreadWorkerWithMessageChannel
,
61 listener
: (message
: MessageValue
<Response
>) => void
63 port
.port2
?.removeListener('message', listener
)
66 protected newWorker (): ThreadWorkerWithMessageChannel
{
67 return new Worker(this.filePath
, {
72 protected afterNewWorkerPushed (
73 worker
: ThreadWorkerWithMessageChannel
75 const { port1
, port2
} = new MessageChannel()
76 worker
.postMessage({ parent: port1
}, [port1
])
79 // we will attach a listener for every task,
80 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
81 worker
.port2
.setMaxListeners(this.opts
.maxTasks
?? 1000)