Commit | Line | Data |
---|---|---|
ee99693b S |
1 | /* eslint-disable @typescript-eslint/strict-boolean-expressions */ |
2 | ||
4ade5f1f | 3 | import { isMainThread, parentPort } from 'worker_threads' |
f045358d | 4 | |
4ade5f1f S |
5 | import { AsyncResource } from 'async_hooks' |
6 | ||
7 | export interface ThreadWorkerOptions { | |
8 | /** | |
9 | * Max time to wait tasks to work on (in ms), after this period the new worker threads will die. | |
10 | * | |
11 | * @default 60.000 ms | |
12 | */ | |
13 | maxInactiveTime?: number | |
14 | /** | |
15 | * `true` if your function contains async pieces, else `false`. | |
16 | * | |
17 | * @default false | |
18 | */ | |
19 | async?: boolean | |
20 | } | |
a32e02ba | 21 | |
a32e02ba | 22 | /** |
4ade5f1f S |
23 | * An example worker that will be always alive, you just need to **extend** this class if you want a static pool. |
24 | * | |
25 | * When this worker is inactive for more than 1 minute, it will send this info to the main thread, | |
26 | * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed. | |
27 | * | |
28 | * @author [Alessandro Pio Ardizio](https://github.com/pioardi) | |
a32e02ba | 29 | * @since 0.0.1 |
30 | */ | |
4ade5f1f S |
31 | export class ThreadWorker<Data = any, Response = any> extends AsyncResource { |
32 | protected readonly maxInactiveTime: number | |
33 | protected readonly async: boolean | |
34 | protected lastTask: number | |
ee99693b S |
35 | protected readonly interval?: NodeJS.Timeout |
36 | protected parent?: MessagePort | |
4ade5f1f S |
37 | |
38 | public constructor ( | |
39 | fn: (data: Data) => Response, | |
40 | public readonly opts: ThreadWorkerOptions = {} | |
41 | ) { | |
50811da2 | 42 | super('worker-thread-pool:pioardi') |
4ade5f1f | 43 | |
ee99693b | 44 | this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60 |
7784f548 | 45 | this.async = !!this.opts.async |
a32e02ba | 46 | this.lastTask = Date.now() |
47 | if (!fn) throw new Error('Fn parameter is mandatory') | |
48 | // keep the worker active | |
49 | if (!isMainThread) { | |
cf9aa6c3 | 50 | this.interval = setInterval( |
51 | this._checkAlive.bind(this), | |
52 | this.maxInactiveTime / 2 | |
53 | ) | |
a32e02ba | 54 | this._checkAlive.bind(this)() |
55 | } | |
ee99693b S |
56 | parentPort?.on( |
57 | 'message', | |
58 | (value: { | |
59 | data?: Response | |
60 | _id?: number | |
61 | parent?: MessagePort | |
62 | kill?: number | |
63 | }) => { | |
64 | if (value?.data && value._id) { | |
65 | // here you will receive messages | |
66 | // console.log('This is the main thread ' + isMainThread) | |
67 | if (this.async) { | |
68 | this.runInAsyncScope(this._runAsync.bind(this), this, fn, value) | |
69 | } else { | |
70 | this.runInAsyncScope(this._run.bind(this), this, fn, value) | |
71 | } | |
72 | } else if (value.parent) { | |
73 | // save the port to communicate with the main thread | |
74 | // this will be received once | |
75 | this.parent = value.parent | |
76 | } else if (value.kill) { | |
77 | // here is time to kill this thread, just clearing the interval | |
78 | if (this.interval) clearInterval(this.interval) | |
79 | this.emitDestroy() | |
7784f548 | 80 | } |
a32e02ba | 81 | } |
ee99693b | 82 | ) |
a32e02ba | 83 | } |
84 | ||
4ade5f1f | 85 | protected _checkAlive (): void { |
cf9aa6c3 | 86 | if (Date.now() - this.lastTask > this.maxInactiveTime) { |
ee99693b | 87 | this.parent?.postMessage({ kill: 1 }) |
a32e02ba | 88 | } |
89 | } | |
106744f7 | 90 | |
4ade5f1f S |
91 | protected _run ( |
92 | fn: (data: Data) => Response, | |
93 | value: { readonly data: Data, readonly _id: number } | |
94 | ): void { | |
106744f7 | 95 | try { |
8950a941 | 96 | const res = fn(value.data) |
ee99693b | 97 | this.parent?.postMessage({ data: res, _id: value._id }) |
106744f7 | 98 | this.lastTask = Date.now() |
99 | } catch (e) { | |
ee99693b | 100 | this.parent?.postMessage({ error: e, _id: value._id }) |
106744f7 | 101 | this.lastTask = Date.now() |
102 | } | |
103 | } | |
7784f548 | 104 | |
4ade5f1f S |
105 | protected _runAsync ( |
106 | fn: (data: Data) => Promise<Response>, | |
107 | value: { readonly data: Data, readonly _id: number } | |
108 | ): void { | |
cf9aa6c3 | 109 | fn(value.data) |
ee99693b S |
110 | .then(res => { |
111 | this.parent?.postMessage({ data: res, _id: value._id }) | |
cf9aa6c3 | 112 | this.lastTask = Date.now() |
113 | }) | |
ee99693b S |
114 | .catch(e => { |
115 | this.parent?.postMessage({ error: e, _id: value._id }) | |
cf9aa6c3 | 116 | this.lastTask = Date.now() |
117 | }) | |
7784f548 | 118 | } |
a32e02ba | 119 | } |