Commit | Line | Data |
---|---|---|
fa699c42 | 1 | import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads' |
325f50bc | 2 | import type { Draft, MessageValue } from '../../utility-types' |
c97c7edb S |
3 | import type { PoolOptions } from '../abstract-pool' |
4 | import { AbstractPool } from '../abstract-pool' | |
4ade5f1f | 5 | |
c97c7edb | 6 | export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel> |
4ade5f1f S |
7 | |
8 | /** | |
9 | * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer. | |
10 | * | |
11 | * This pool will select the worker thread in a round robin fashion. | |
12 | * | |
13 | * @author [Alessandro Pio Ardizio](https://github.com/pioardi) | |
14 | * @since 0.0.1 | |
15 | */ | |
777b7824 | 16 | // eslint-disable-next-line @typescript-eslint/no-explicit-any |
c97c7edb S |
17 | export class FixedThreadPool<Data = any, Response = any> extends AbstractPool< |
18 | ThreadWorkerWithMessageChannel, | |
19 | Data, | |
20 | Response | |
21 | > { | |
4ade5f1f S |
22 | /** |
23 | * @param numThreads Num of threads for this worker pool. | |
24 | * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine. | |
25 | * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` | |
26 | */ | |
27 | public constructor ( | |
c97c7edb S |
28 | numThreads: number, |
29 | filePath: string, | |
30 | opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 } | |
4ade5f1f | 31 | ) { |
c97c7edb S |
32 | super(numThreads, filePath, opts) |
33 | } | |
4ade5f1f | 34 | |
c97c7edb S |
35 | protected isMain (): boolean { |
36 | return isMainThread | |
4ade5f1f S |
37 | } |
38 | ||
c97c7edb S |
39 | protected async destroyWorker ( |
40 | worker: ThreadWorkerWithMessageChannel | |
41 | ): Promise<void> { | |
42 | await worker.terminate() | |
4ade5f1f S |
43 | } |
44 | ||
c97c7edb S |
45 | protected sendToWorker ( |
46 | worker: ThreadWorkerWithMessageChannel, | |
47 | message: MessageValue<Data> | |
48 | ): void { | |
49 | worker.postMessage(message) | |
4ade5f1f S |
50 | } |
51 | ||
c97c7edb S |
52 | protected registerWorkerMessageListener ( |
53 | port: ThreadWorkerWithMessageChannel, | |
54 | listener: (message: MessageValue<Response>) => void | |
55 | ): void { | |
56 | port.port2?.on('message', listener) | |
4ade5f1f S |
57 | } |
58 | ||
c97c7edb S |
59 | protected unregisterWorkerMessageListener ( |
60 | port: ThreadWorkerWithMessageChannel, | |
61 | listener: (message: MessageValue<Response>) => void | |
62 | ): void { | |
63 | port.port2?.removeListener('message', listener) | |
4ade5f1f S |
64 | } |
65 | ||
c97c7edb S |
66 | protected newWorker (): ThreadWorkerWithMessageChannel { |
67 | return new Worker(this.filePath, { | |
4ade5f1f S |
68 | env: SHARE_ENV |
69 | }) | |
c97c7edb S |
70 | } |
71 | ||
72 | protected afterNewWorkerPushed ( | |
73 | worker: ThreadWorkerWithMessageChannel | |
74 | ): void { | |
4ade5f1f S |
75 | const { port1, port2 } = new MessageChannel() |
76 | worker.postMessage({ parent: port1 }, [port1]) | |
77 | worker.port1 = port1 | |
78 | worker.port2 = port2 | |
79 | // we will attach a listener for every task, | |
80 | // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size | |
ee99693b | 81 | worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000) |
4ade5f1f S |
82 | } |
83 | } |