Merge most of the stuff from jerome-benoit:issue-62-ts-standard
[poolifier.git] / src / workers.ts
CommitLineData
4ade5f1f 1import { isMainThread, parentPort } from 'worker_threads'
f045358d 2
4ade5f1f
S
3import { AsyncResource } from 'async_hooks'
4
5export interface ThreadWorkerOptions {
6 /**
7 * Max time to wait tasks to work on (in ms), after this period the new worker threads will die.
8 *
9 * @default 60.000 ms
10 */
11 maxInactiveTime?: number
12 /**
13 * `true` if your function contains async pieces, else `false`.
14 *
15 * @default false
16 */
17 async?: boolean
18}
a32e02ba 19
a32e02ba 20/**
4ade5f1f
S
21 * An example worker that will be always alive, you just need to **extend** this class if you want a static pool.
22 *
23 * When this worker is inactive for more than 1 minute, it will send this info to the main thread,
24 * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed.
25 *
26 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
a32e02ba 27 * @since 0.0.1
28 */
4ade5f1f
S
29export class ThreadWorker<Data = any, Response = any> extends AsyncResource {
30 protected readonly maxInactiveTime: number
31 protected readonly async: boolean
32 protected lastTask: number
33 protected readonly interval: NodeJS.Timeout
34 protected parent: any
35
36 public constructor (
37 fn: (data: Data) => Response,
38 public readonly opts: ThreadWorkerOptions = {}
39 ) {
50811da2 40 super('worker-thread-pool:pioardi')
4ade5f1f 41
cf9aa6c3 42 this.maxInactiveTime = this.opts.maxInactiveTime || 1000 * 60
7784f548 43 this.async = !!this.opts.async
a32e02ba 44 this.lastTask = Date.now()
45 if (!fn) throw new Error('Fn parameter is mandatory')
46 // keep the worker active
47 if (!isMainThread) {
cf9aa6c3 48 this.interval = setInterval(
49 this._checkAlive.bind(this),
50 this.maxInactiveTime / 2
51 )
a32e02ba 52 this._checkAlive.bind(this)()
53 }
4ade5f1f 54 parentPort.on('message', (value) => {
506c2a14 55 if (value && value.data && value._id) {
a32e02ba 56 // here you will receive messages
57 // console.log('This is the main thread ' + isMainThread)
7784f548 58 if (this.async) {
59 this.runInAsyncScope(this._runAsync.bind(this), this, fn, value)
60 } else {
61 this.runInAsyncScope(this._run.bind(this), this, fn, value)
62 }
a32e02ba 63 } else if (value.parent) {
64 // save the port to communicate with the main thread
65 // this will be received once
66 this.parent = value.parent
67 } else if (value.kill) {
68 // here is time to kill this thread, just clearing the interval
69 clearInterval(this.interval)
34a572eb 70 this.emitDestroy()
a32e02ba 71 }
72 })
73 }
74
4ade5f1f 75 protected _checkAlive (): void {
cf9aa6c3 76 if (Date.now() - this.lastTask > this.maxInactiveTime) {
a32e02ba 77 this.parent.postMessage({ kill: 1 })
78 }
79 }
106744f7 80
4ade5f1f
S
81 protected _run (
82 fn: (data: Data) => Response,
83 value: { readonly data: Data, readonly _id: number }
84 ): void {
106744f7 85 try {
8950a941 86 const res = fn(value.data)
106744f7 87 this.parent.postMessage({ data: res, _id: value._id })
88 this.lastTask = Date.now()
89 } catch (e) {
90 this.parent.postMessage({ error: e, _id: value._id })
91 this.lastTask = Date.now()
92 }
93 }
7784f548 94
4ade5f1f
S
95 protected _runAsync (
96 fn: (data: Data) => Promise<Response>,
97 value: { readonly data: Data, readonly _id: number }
98 ): void {
cf9aa6c3 99 fn(value.data)
4ade5f1f 100 .then((res) => {
cf9aa6c3 101 this.parent.postMessage({ data: res, _id: value._id })
102 this.lastTask = Date.now()
103 })
4ade5f1f 104 .catch((e) => {
cf9aa6c3 105 this.parent.postMessage({ error: e, _id: value._id })
106 this.lastTask = Date.now()
107 })
7784f548 108 }
a32e02ba 109}