Commit | Line | Data |
---|---|---|
fa699c42 | 1 | import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads' |
d3c8a1a8 | 2 | import type { Draft, JSONValue, MessageValue } from '../../utility-types' |
c97c7edb S |
3 | import type { PoolOptions } from '../abstract-pool' |
4 | import { AbstractPool } from '../abstract-pool' | |
4ade5f1f | 5 | |
c97c7edb | 6 | export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel> |
4ade5f1f S |
7 | |
8 | /** | |
9 | * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer. | |
10 | * | |
11 | * This pool will select the worker thread in a round robin fashion. | |
12 | * | |
13 | * @author [Alessandro Pio Ardizio](https://github.com/pioardi) | |
14 | * @since 0.0.1 | |
15 | */ | |
d3c8a1a8 S |
16 | export class FixedThreadPool< |
17 | Data extends JSONValue = JSONValue, | |
18 | Response extends JSONValue = JSONValue | |
19 | > extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> { | |
4ade5f1f S |
20 | /** |
21 | * @param numThreads Num of threads for this worker pool. | |
22 | * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine. | |
23 | * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` | |
24 | */ | |
25 | public constructor ( | |
c97c7edb S |
26 | numThreads: number, |
27 | filePath: string, | |
28 | opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 } | |
4ade5f1f | 29 | ) { |
c97c7edb S |
30 | super(numThreads, filePath, opts) |
31 | } | |
4ade5f1f | 32 | |
c97c7edb S |
33 | protected isMain (): boolean { |
34 | return isMainThread | |
4ade5f1f S |
35 | } |
36 | ||
c97c7edb S |
37 | protected async destroyWorker ( |
38 | worker: ThreadWorkerWithMessageChannel | |
39 | ): Promise<void> { | |
40 | await worker.terminate() | |
f2fdaa86 | 41 | // FIXME: The tests are currently failing, so these must be changed first |
4ade5f1f S |
42 | } |
43 | ||
c97c7edb S |
44 | protected sendToWorker ( |
45 | worker: ThreadWorkerWithMessageChannel, | |
46 | message: MessageValue<Data> | |
47 | ): void { | |
48 | worker.postMessage(message) | |
4ade5f1f S |
49 | } |
50 | ||
c97c7edb S |
51 | protected registerWorkerMessageListener ( |
52 | port: ThreadWorkerWithMessageChannel, | |
53 | listener: (message: MessageValue<Response>) => void | |
54 | ): void { | |
55 | port.port2?.on('message', listener) | |
4ade5f1f S |
56 | } |
57 | ||
c97c7edb S |
58 | protected unregisterWorkerMessageListener ( |
59 | port: ThreadWorkerWithMessageChannel, | |
60 | listener: (message: MessageValue<Response>) => void | |
61 | ): void { | |
62 | port.port2?.removeListener('message', listener) | |
4ade5f1f S |
63 | } |
64 | ||
c97c7edb S |
65 | protected newWorker (): ThreadWorkerWithMessageChannel { |
66 | return new Worker(this.filePath, { | |
4ade5f1f S |
67 | env: SHARE_ENV |
68 | }) | |
c97c7edb S |
69 | } |
70 | ||
71 | protected afterNewWorkerPushed ( | |
72 | worker: ThreadWorkerWithMessageChannel | |
73 | ): void { | |
4ade5f1f S |
74 | const { port1, port2 } = new MessageChannel() |
75 | worker.postMessage({ parent: port1 }, [port1]) | |
76 | worker.port1 = port1 | |
77 | worker.port2 = port2 | |
78 | // we will attach a listener for every task, | |
79 | // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size | |
ee99693b | 80 | worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000) |
4ade5f1f S |
81 | } |
82 | } |