Commit | Line | Data |
---|---|---|
4ade5f1f | 1 | import { isMainThread, parentPort } from 'worker_threads' |
f045358d | 2 | |
4ade5f1f S |
3 | import { AsyncResource } from 'async_hooks' |
4 | ||
5 | export interface ThreadWorkerOptions { | |
6 | /** | |
7 | * Max time to wait tasks to work on (in ms), after this period the new worker threads will die. | |
8 | * | |
9 | * @default 60.000 ms | |
10 | */ | |
11 | maxInactiveTime?: number | |
12 | /** | |
13 | * `true` if your function contains async pieces, else `false`. | |
14 | * | |
15 | * @default false | |
16 | */ | |
17 | async?: boolean | |
18 | } | |
a32e02ba | 19 | |
a32e02ba | 20 | /** |
4ade5f1f S |
21 | * An example worker that will be always alive, you just need to **extend** this class if you want a static pool. |
22 | * | |
23 | * When this worker is inactive for more than 1 minute, it will send this info to the main thread, | |
24 | * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed. | |
25 | * | |
26 | * @author [Alessandro Pio Ardizio](https://github.com/pioardi) | |
a32e02ba | 27 | * @since 0.0.1 |
28 | */ | |
4ade5f1f S |
29 | export class ThreadWorker<Data = any, Response = any> extends AsyncResource { |
30 | protected readonly maxInactiveTime: number | |
31 | protected readonly async: boolean | |
32 | protected lastTask: number | |
33 | protected readonly interval: NodeJS.Timeout | |
34 | protected parent: any | |
35 | ||
36 | public constructor ( | |
37 | fn: (data: Data) => Response, | |
38 | public readonly opts: ThreadWorkerOptions = {} | |
39 | ) { | |
50811da2 | 40 | super('worker-thread-pool:pioardi') |
4ade5f1f | 41 | |
cf9aa6c3 | 42 | this.maxInactiveTime = this.opts.maxInactiveTime || 1000 * 60 |
7784f548 | 43 | this.async = !!this.opts.async |
a32e02ba | 44 | this.lastTask = Date.now() |
45 | if (!fn) throw new Error('Fn parameter is mandatory') | |
46 | // keep the worker active | |
47 | if (!isMainThread) { | |
cf9aa6c3 | 48 | this.interval = setInterval( |
49 | this._checkAlive.bind(this), | |
50 | this.maxInactiveTime / 2 | |
51 | ) | |
a32e02ba | 52 | this._checkAlive.bind(this)() |
53 | } | |
4ade5f1f | 54 | parentPort.on('message', (value) => { |
506c2a14 | 55 | if (value && value.data && value._id) { |
a32e02ba | 56 | // here you will receive messages |
57 | // console.log('This is the main thread ' + isMainThread) | |
7784f548 | 58 | if (this.async) { |
59 | this.runInAsyncScope(this._runAsync.bind(this), this, fn, value) | |
60 | } else { | |
61 | this.runInAsyncScope(this._run.bind(this), this, fn, value) | |
62 | } | |
a32e02ba | 63 | } else if (value.parent) { |
64 | // save the port to communicate with the main thread | |
65 | // this will be received once | |
66 | this.parent = value.parent | |
67 | } else if (value.kill) { | |
68 | // here is time to kill this thread, just clearing the interval | |
69 | clearInterval(this.interval) | |
34a572eb | 70 | this.emitDestroy() |
a32e02ba | 71 | } |
72 | }) | |
73 | } | |
74 | ||
4ade5f1f | 75 | protected _checkAlive (): void { |
cf9aa6c3 | 76 | if (Date.now() - this.lastTask > this.maxInactiveTime) { |
a32e02ba | 77 | this.parent.postMessage({ kill: 1 }) |
78 | } | |
79 | } | |
106744f7 | 80 | |
4ade5f1f S |
81 | protected _run ( |
82 | fn: (data: Data) => Response, | |
83 | value: { readonly data: Data, readonly _id: number } | |
84 | ): void { | |
106744f7 | 85 | try { |
8950a941 | 86 | const res = fn(value.data) |
106744f7 | 87 | this.parent.postMessage({ data: res, _id: value._id }) |
88 | this.lastTask = Date.now() | |
89 | } catch (e) { | |
90 | this.parent.postMessage({ error: e, _id: value._id }) | |
91 | this.lastTask = Date.now() | |
92 | } | |
93 | } | |
7784f548 | 94 | |
4ade5f1f S |
95 | protected _runAsync ( |
96 | fn: (data: Data) => Promise<Response>, | |
97 | value: { readonly data: Data, readonly _id: number } | |
98 | ): void { | |
cf9aa6c3 | 99 | fn(value.data) |
4ade5f1f | 100 | .then((res) => { |
cf9aa6c3 | 101 | this.parent.postMessage({ data: res, _id: value._id }) |
102 | this.lastTask = Date.now() | |
103 | }) | |
4ade5f1f | 104 | .catch((e) => { |
cf9aa6c3 | 105 | this.parent.postMessage({ error: e, _id: value._id }) |
106 | this.lastTask = Date.now() | |
107 | }) | |
7784f548 | 108 | } |
a32e02ba | 109 | } |