e750126b929ec93cb7d8e38bcfd9d754eefe6059
[poolifier.git] / src / worker / cluster-worker.ts
1 import { AsyncResource } from 'async_hooks'
2 import { isMaster, worker } from 'cluster'
3 import type { MessageValue } from '../utility-types'
4 import type { WorkerOptions } from './worker-options'
5
6 /**
7 * An example worker that will be always alive, you just need to **extend** this class if you want a static pool.
8 *
9 * When this worker is inactive for more than 1 minute, it will send this info to the main worker,
10 * if you are using DynamicClusterPool, the workers created after will be killed, the min num of worker will be guaranteed.
11 *
12 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
13 * @since 2.0.0
14 */
15 // eslint-disable-next-line @typescript-eslint/no-explicit-any
16 export class ClusterWorker<Data = any, Response = any> extends AsyncResource {
17 protected readonly maxInactiveTime: number
18 protected readonly async: boolean
19 protected lastTask: number
20 protected readonly interval?: NodeJS.Timeout
21
22 public constructor (
23 fn: (data: Data) => Response,
24 public readonly opts: WorkerOptions = {}
25 ) {
26 super('worker-cluster-pool:pioardi')
27
28 this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
29 this.async = !!this.opts.async
30 this.lastTask = Date.now()
31 if (!fn) throw new Error('Fn parameter is mandatory')
32 // keep the worker active
33 if (!isMaster) {
34 // console.log('ClusterWorker#constructor', 'is not master')
35 this.interval = setInterval(
36 this.checkAlive.bind(this),
37 this.maxInactiveTime / 2
38 )
39 this.checkAlive.bind(this)()
40 }
41 worker.on('message', (value: MessageValue<Data>) => {
42 // console.log("cluster.on('message', value)", value)
43 if (value?.data && value.id) {
44 // here you will receive messages
45 // console.log('This is the main worker ' + isMaster)
46 if (this.async) {
47 this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
48 } else {
49 this.runInAsyncScope(this.run.bind(this), this, fn, value)
50 }
51 } else if (value.kill) {
52 // here is time to kill this worker, just clearing the interval
53 if (this.interval) clearInterval(this.interval)
54 this.emitDestroy()
55 }
56 })
57 }
58
59 protected checkAlive (): void {
60 if (Date.now() - this.lastTask > this.maxInactiveTime) {
61 worker.send({ kill: 1 })
62 }
63 }
64
65 protected run (
66 fn: (data?: Data) => Response,
67 value: MessageValue<Data>
68 ): void {
69 try {
70 const res = fn(value.data as Data)
71 worker.send({ data: res, id: value.id })
72 this.lastTask = Date.now()
73 } catch (e) {
74 const err = e instanceof Error ? e.message : e
75 worker.send({ error: err, id: value.id })
76 this.lastTask = Date.now()
77 }
78 }
79
80 protected runAsync (
81 fn: (data?: Data) => Promise<Response>,
82 value: MessageValue<Data>
83 ): void {
84 fn(value.data)
85 .then(res => {
86 worker.send({ data: res, id: value.id })
87 this.lastTask = Date.now()
88 return null
89 })
90 .catch(e => {
91 const err = e instanceof Error ? e.message : e
92 worker.send({ error: err, id: value.id })
93 this.lastTask = Date.now()
94 })
95 }
96 }