Commit | Line | Data |
---|---|---|
fa699c42 | 1 | import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads' |
d3c8a1a8 | 2 | import type { Draft, JSONValue, MessageValue } from '../../utility-types' |
c97c7edb S |
3 | import type { PoolOptions } from '../abstract-pool' |
4 | import { AbstractPool } from '../abstract-pool' | |
4ade5f1f | 5 | |
c97c7edb | 6 | export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel> |
4ade5f1f S |
7 | |
8 | /** | |
9 | * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer. | |
10 | * | |
11 | * This pool will select the worker thread in a round robin fashion. | |
12 | * | |
13 | * @author [Alessandro Pio Ardizio](https://github.com/pioardi) | |
14 | * @since 0.0.1 | |
15 | */ | |
d3c8a1a8 S |
16 | export class FixedThreadPool< |
17 | Data extends JSONValue = JSONValue, | |
18 | Response extends JSONValue = JSONValue | |
19 | > extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> { | |
4ade5f1f S |
20 | /** |
21 | * @param numThreads Num of threads for this worker pool. | |
22 | * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine. | |
23 | * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` | |
24 | */ | |
25 | public constructor ( | |
c97c7edb S |
26 | numThreads: number, |
27 | filePath: string, | |
28 | opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 } | |
4ade5f1f | 29 | ) { |
c97c7edb S |
30 | super(numThreads, filePath, opts) |
31 | } | |
4ade5f1f | 32 | |
c97c7edb S |
33 | protected isMain (): boolean { |
34 | return isMainThread | |
4ade5f1f S |
35 | } |
36 | ||
c97c7edb S |
37 | protected async destroyWorker ( |
38 | worker: ThreadWorkerWithMessageChannel | |
39 | ): Promise<void> { | |
40 | await worker.terminate() | |
4ade5f1f S |
41 | } |
42 | ||
c97c7edb S |
43 | protected sendToWorker ( |
44 | worker: ThreadWorkerWithMessageChannel, | |
45 | message: MessageValue<Data> | |
46 | ): void { | |
47 | worker.postMessage(message) | |
4ade5f1f S |
48 | } |
49 | ||
c97c7edb S |
50 | protected registerWorkerMessageListener ( |
51 | port: ThreadWorkerWithMessageChannel, | |
52 | listener: (message: MessageValue<Response>) => void | |
53 | ): void { | |
54 | port.port2?.on('message', listener) | |
4ade5f1f S |
55 | } |
56 | ||
c97c7edb S |
57 | protected unregisterWorkerMessageListener ( |
58 | port: ThreadWorkerWithMessageChannel, | |
59 | listener: (message: MessageValue<Response>) => void | |
60 | ): void { | |
61 | port.port2?.removeListener('message', listener) | |
4ade5f1f S |
62 | } |
63 | ||
c97c7edb S |
64 | protected newWorker (): ThreadWorkerWithMessageChannel { |
65 | return new Worker(this.filePath, { | |
4ade5f1f S |
66 | env: SHARE_ENV |
67 | }) | |
c97c7edb S |
68 | } |
69 | ||
70 | protected afterNewWorkerPushed ( | |
71 | worker: ThreadWorkerWithMessageChannel | |
72 | ): void { | |
4ade5f1f S |
73 | const { port1, port2 } = new MessageChannel() |
74 | worker.postMessage({ parent: port1 }, [port1]) | |
75 | worker.port1 = port1 | |
76 | worker.port2 = port2 | |
77 | // we will attach a listener for every task, | |
78 | // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size | |
ee99693b | 79 | worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000) |
4ade5f1f S |
80 | } |
81 | } |