Merge most of the stuff from jerome-benoit:issue-62-ts-standard
[poolifier.git] / src / workers.ts
1 import { isMainThread, parentPort } from 'worker_threads'
2
3 import { AsyncResource } from 'async_hooks'
4
5 export interface ThreadWorkerOptions {
6 /**
7 * Max time to wait tasks to work on (in ms), after this period the new worker threads will die.
8 *
9 * @default 60.000 ms
10 */
11 maxInactiveTime?: number
12 /**
13 * `true` if your function contains async pieces, else `false`.
14 *
15 * @default false
16 */
17 async?: boolean
18 }
19
20 /**
21 * An example worker that will be always alive, you just need to **extend** this class if you want a static pool.
22 *
23 * When this worker is inactive for more than 1 minute, it will send this info to the main thread,
24 * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed.
25 *
26 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
27 * @since 0.0.1
28 */
29 export class ThreadWorker<Data = any, Response = any> extends AsyncResource {
30 protected readonly maxInactiveTime: number
31 protected readonly async: boolean
32 protected lastTask: number
33 protected readonly interval: NodeJS.Timeout
34 protected parent: any
35
36 public constructor (
37 fn: (data: Data) => Response,
38 public readonly opts: ThreadWorkerOptions = {}
39 ) {
40 super('worker-thread-pool:pioardi')
41
42 this.maxInactiveTime = this.opts.maxInactiveTime || 1000 * 60
43 this.async = !!this.opts.async
44 this.lastTask = Date.now()
45 if (!fn) throw new Error('Fn parameter is mandatory')
46 // keep the worker active
47 if (!isMainThread) {
48 this.interval = setInterval(
49 this._checkAlive.bind(this),
50 this.maxInactiveTime / 2
51 )
52 this._checkAlive.bind(this)()
53 }
54 parentPort.on('message', (value) => {
55 if (value && value.data && value._id) {
56 // here you will receive messages
57 // console.log('This is the main thread ' + isMainThread)
58 if (this.async) {
59 this.runInAsyncScope(this._runAsync.bind(this), this, fn, value)
60 } else {
61 this.runInAsyncScope(this._run.bind(this), this, fn, value)
62 }
63 } else if (value.parent) {
64 // save the port to communicate with the main thread
65 // this will be received once
66 this.parent = value.parent
67 } else if (value.kill) {
68 // here is time to kill this thread, just clearing the interval
69 clearInterval(this.interval)
70 this.emitDestroy()
71 }
72 })
73 }
74
75 protected _checkAlive (): void {
76 if (Date.now() - this.lastTask > this.maxInactiveTime) {
77 this.parent.postMessage({ kill: 1 })
78 }
79 }
80
81 protected _run (
82 fn: (data: Data) => Response,
83 value: { readonly data: Data, readonly _id: number }
84 ): void {
85 try {
86 const res = fn(value.data)
87 this.parent.postMessage({ data: res, _id: value._id })
88 this.lastTask = Date.now()
89 } catch (e) {
90 this.parent.postMessage({ error: e, _id: value._id })
91 this.lastTask = Date.now()
92 }
93 }
94
95 protected _runAsync (
96 fn: (data: Data) => Promise<Response>,
97 value: { readonly data: Data, readonly _id: number }
98 ): void {
99 fn(value.data)
100 .then((res) => {
101 this.parent.postMessage({ data: res, _id: value._id })
102 this.lastTask = Date.now()
103 })
104 .catch((e) => {
105 this.parent.postMessage({ error: e, _id: value._id })
106 this.lastTask = Date.now()
107 })
108 }
109 }