Commit | Line | Data |
---|---|---|
4ade5f1f | 1 | import { AsyncResource } from 'async_hooks' |
fa699c42 | 2 | import { isMainThread, parentPort } from 'worker_threads' |
4ade5f1f S |
3 | |
4 | export interface ThreadWorkerOptions { | |
5 | /** | |
6 | * Max time to wait tasks to work on (in ms), after this period the new worker threads will die. | |
7 | * | |
8 | * @default 60.000 ms | |
9 | */ | |
10 | maxInactiveTime?: number | |
11 | /** | |
12 | * `true` if your function contains async pieces, else `false`. | |
13 | * | |
14 | * @default false | |
15 | */ | |
16 | async?: boolean | |
17 | } | |
a32e02ba | 18 | |
a32e02ba | 19 | /** |
4ade5f1f S |
20 | * An example worker that will be always alive, you just need to **extend** this class if you want a static pool. |
21 | * | |
22 | * When this worker is inactive for more than 1 minute, it will send this info to the main thread, | |
23 | * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed. | |
24 | * | |
25 | * @author [Alessandro Pio Ardizio](https://github.com/pioardi) | |
a32e02ba | 26 | * @since 0.0.1 |
27 | */ | |
777b7824 | 28 | // eslint-disable-next-line @typescript-eslint/no-explicit-any |
4ade5f1f S |
29 | export class ThreadWorker<Data = any, Response = any> extends AsyncResource { |
30 | protected readonly maxInactiveTime: number | |
31 | protected readonly async: boolean | |
32 | protected lastTask: number | |
ee99693b S |
33 | protected readonly interval?: NodeJS.Timeout |
34 | protected parent?: MessagePort | |
4ade5f1f S |
35 | |
36 | public constructor ( | |
37 | fn: (data: Data) => Response, | |
38 | public readonly opts: ThreadWorkerOptions = {} | |
39 | ) { | |
50811da2 | 40 | super('worker-thread-pool:pioardi') |
4ade5f1f | 41 | |
ee99693b | 42 | this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60 |
7784f548 | 43 | this.async = !!this.opts.async |
a32e02ba | 44 | this.lastTask = Date.now() |
45 | if (!fn) throw new Error('Fn parameter is mandatory') | |
46 | // keep the worker active | |
47 | if (!isMainThread) { | |
cf9aa6c3 | 48 | this.interval = setInterval( |
fa0f5b28 | 49 | this.checkAlive.bind(this), |
cf9aa6c3 | 50 | this.maxInactiveTime / 2 |
51 | ) | |
fa0f5b28 | 52 | this.checkAlive.bind(this)() |
a32e02ba | 53 | } |
ee99693b S |
54 | parentPort?.on( |
55 | 'message', | |
56 | (value: { | |
57 | data?: Response | |
fa0f5b28 | 58 | id?: number |
ee99693b S |
59 | parent?: MessagePort |
60 | kill?: number | |
61 | }) => { | |
fa0f5b28 | 62 | if (value?.data && value.id) { |
ee99693b S |
63 | // here you will receive messages |
64 | // console.log('This is the main thread ' + isMainThread) | |
65 | if (this.async) { | |
fa0f5b28 | 66 | this.runInAsyncScope(this.runAsync.bind(this), this, fn, value) |
ee99693b | 67 | } else { |
fa0f5b28 | 68 | this.runInAsyncScope(this.run.bind(this), this, fn, value) |
ee99693b S |
69 | } |
70 | } else if (value.parent) { | |
71 | // save the port to communicate with the main thread | |
72 | // this will be received once | |
73 | this.parent = value.parent | |
74 | } else if (value.kill) { | |
75 | // here is time to kill this thread, just clearing the interval | |
76 | if (this.interval) clearInterval(this.interval) | |
77 | this.emitDestroy() | |
7784f548 | 78 | } |
a32e02ba | 79 | } |
ee99693b | 80 | ) |
a32e02ba | 81 | } |
82 | ||
fa0f5b28 | 83 | protected checkAlive (): void { |
cf9aa6c3 | 84 | if (Date.now() - this.lastTask > this.maxInactiveTime) { |
ee99693b | 85 | this.parent?.postMessage({ kill: 1 }) |
a32e02ba | 86 | } |
87 | } | |
106744f7 | 88 | |
fa0f5b28 | 89 | protected run ( |
4ade5f1f | 90 | fn: (data: Data) => Response, |
777b7824 | 91 | value: { readonly data: Data; readonly id: number } |
4ade5f1f | 92 | ): void { |
106744f7 | 93 | try { |
8950a941 | 94 | const res = fn(value.data) |
fa0f5b28 | 95 | this.parent?.postMessage({ data: res, id: value.id }) |
106744f7 | 96 | this.lastTask = Date.now() |
97 | } catch (e) { | |
fa0f5b28 | 98 | this.parent?.postMessage({ error: e, id: value.id }) |
106744f7 | 99 | this.lastTask = Date.now() |
100 | } | |
101 | } | |
7784f548 | 102 | |
fa0f5b28 | 103 | protected runAsync ( |
4ade5f1f | 104 | fn: (data: Data) => Promise<Response>, |
777b7824 | 105 | value: { readonly data: Data; readonly id: number } |
4ade5f1f | 106 | ): void { |
cf9aa6c3 | 107 | fn(value.data) |
ee99693b | 108 | .then(res => { |
fa0f5b28 | 109 | this.parent?.postMessage({ data: res, id: value.id }) |
cf9aa6c3 | 110 | this.lastTask = Date.now() |
111 | }) | |
ee99693b | 112 | .catch(e => { |
fa0f5b28 | 113 | this.parent?.postMessage({ error: e, id: value.id }) |
cf9aa6c3 | 114 | this.lastTask = Date.now() |
115 | }) | |
7784f548 | 116 | } |
a32e02ba | 117 | } |