d0af904bfb16d9fd2ba396684a12fd2220d3c89e
1 import { AsyncResource
} from
'async_hooks'
2 import { isMainThread
, parentPort
} from
'worker_threads'
3 import type { MessageValue
} from
'../utility-types'
4 import type { WorkerOptions
} from
'./worker-options'
7 * An example worker that will be always alive, you just need to **extend** this class if you want a static pool.
9 * When this worker is inactive for more than 1 minute, it will send this info to the main thread,
10 * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed.
12 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
15 // eslint-disable-next-line @typescript-eslint/no-explicit-any
16 export class ThreadWorker
<Data
= any, Response
= any> extends AsyncResource
{
17 protected readonly maxInactiveTime
: number
18 protected readonly async: boolean
19 protected lastTask
: number
20 protected readonly interval
?: NodeJS
.Timeout
21 protected parent?: MessagePort
24 fn
: (data
: Data
) => Response
,
25 public readonly opts
: WorkerOptions
= {}
27 super('worker-thread-pool:pioardi')
29 this.maxInactiveTime
= this.opts
.maxInactiveTime
?? 1000 * 60
30 this.async = !!this.opts
.async
31 this.lastTask
= Date.now()
32 if (!fn
) throw new Error('Fn parameter is mandatory')
33 // keep the worker active
35 this.interval
= setInterval(
36 this.checkAlive
.bind(this),
37 this.maxInactiveTime
/ 2
39 this.checkAlive
.bind(this)()
41 parentPort
?.on('message', (value
: MessageValue
<Data
>) => {
42 if (value
?.data
&& value
.id
) {
43 // here you will receive messages
44 // console.log('This is the main thread ' + isMainThread)
46 this.runInAsyncScope(this.runAsync
.bind(this), this, fn
, value
)
48 this.runInAsyncScope(this.run
.bind(this), this, fn
, value
)
50 } else if (value
.parent) {
51 // save the port to communicate with the main thread
52 // this will be received once
53 this.parent = value
.parent
54 } else if (value
.kill
) {
55 // here is time to kill this thread, just clearing the interval
56 if (this.interval
) clearInterval(this.interval
)
62 protected checkAlive (): void {
63 if (Date.now() - this.lastTask
> this.maxInactiveTime
) {
64 this.parent?.postMessage({ kill
: 1 })
69 fn
: (data
?: Data
) => Response
,
70 value
: MessageValue
<Data
>
73 const res
= fn(value
.data
)
74 this.parent?.postMessage({ data
: res
, id
: value
.id
})
75 this.lastTask
= Date.now()
77 this.parent?.postMessage({ error
: e
, id
: value
.id
})
78 this.lastTask
= Date.now()
83 fn
: (data
?: Data
) => Promise
<Response
>,
84 value
: MessageValue
<Data
>
88 this.parent?.postMessage({ data
: res
, id
: value
.id
})
89 this.lastTask
= Date.now()
93 this.parent?.postMessage({ error
: e
, id
: value
.id
})
94 this.lastTask
= Date.now()