990f2208bc0cd506d12776eaf1397035116294c9
1 import { isMainThread
, parentPort
} from
'worker_threads'
3 import { AsyncResource
} from
'async_hooks'
5 export interface ThreadWorkerOptions
{
7 * Max time to wait tasks to work on (in ms), after this period the new worker threads will die.
11 maxInactiveTime
?: number
13 * `true` if your function contains async pieces, else `false`.
21 * An example worker that will be always alive, you just need to **extend** this class if you want a static pool.
23 * When this worker is inactive for more than 1 minute, it will send this info to the main thread,
24 * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed.
26 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
29 export class ThreadWorker
<Data
= any, Response
= any> extends AsyncResource
{
30 protected readonly maxInactiveTime
: number
31 protected readonly async: boolean
32 protected lastTask
: number
33 protected readonly interval
: NodeJS
.Timeout
37 fn
: (data
: Data
) => Response
,
38 public readonly opts
: ThreadWorkerOptions
= {}
40 super('worker-thread-pool:pioardi')
42 this.maxInactiveTime
= this.opts
.maxInactiveTime
|| 1000 * 60
43 this.async = !!this.opts
.async
44 this.lastTask
= Date.now()
45 if (!fn
) throw new Error('Fn parameter is mandatory')
46 // keep the worker active
48 this.interval
= setInterval(
49 this._checkAlive
.bind(this),
50 this.maxInactiveTime
/ 2
52 this._checkAlive
.bind(this)()
54 parentPort
.on('message', (value
) => {
55 if (value
&& value
.data
&& value
._id
) {
56 // here you will receive messages
57 // console.log('This is the main thread ' + isMainThread)
59 this.runInAsyncScope(this._runAsync
.bind(this), this, fn
, value
)
61 this.runInAsyncScope(this._run
.bind(this), this, fn
, value
)
63 } else if (value
.parent) {
64 // save the port to communicate with the main thread
65 // this will be received once
66 this.parent = value
.parent
67 } else if (value
.kill
) {
68 // here is time to kill this thread, just clearing the interval
69 clearInterval(this.interval
)
75 protected _checkAlive (): void {
76 if (Date.now() - this.lastTask
> this.maxInactiveTime
) {
77 this.parent.postMessage({ kill
: 1 })
82 fn
: (data
: Data
) => Response
,
83 value
: { readonly data
: Data
, readonly _id
: number }
86 const res
= fn(value
.data
)
87 this.parent.postMessage({ data
: res
, _id
: value
._id
})
88 this.lastTask
= Date.now()
90 this.parent.postMessage({ error
: e
, _id
: value
._id
})
91 this.lastTask
= Date.now()
96 fn
: (data
: Data
) => Promise
<Response
>,
97 value
: { readonly data
: Data
, readonly _id
: number }
101 this.parent.postMessage({ data
: res
, _id
: value
._id
})
102 this.lastTask
= Date.now()
105 this.parent.postMessage({ error
: e
, _id
: value
._id
})
106 this.lastTask
= Date.now()