e750126b929ec93cb7d8e38bcfd9d754eefe6059
1 import { AsyncResource
} from
'async_hooks'
2 import { isMaster
, worker
} from
'cluster'
3 import type { MessageValue
} from
'../utility-types'
4 import type { WorkerOptions
} from
'./worker-options'
7 * An example worker that will be always alive, you just need to **extend** this class if you want a static pool.
9 * When this worker is inactive for more than 1 minute, it will send this info to the main worker,
10 * if you are using DynamicClusterPool, the workers created after will be killed, the min num of worker will be guaranteed.
12 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
15 // eslint-disable-next-line @typescript-eslint/no-explicit-any
16 export class ClusterWorker
<Data
= any, Response
= any> extends AsyncResource
{
17 protected readonly maxInactiveTime
: number
18 protected readonly async: boolean
19 protected lastTask
: number
20 protected readonly interval
?: NodeJS
.Timeout
23 fn
: (data
: Data
) => Response
,
24 public readonly opts
: WorkerOptions
= {}
26 super('worker-cluster-pool:pioardi')
28 this.maxInactiveTime
= this.opts
.maxInactiveTime
?? 1000 * 60
29 this.async = !!this.opts
.async
30 this.lastTask
= Date.now()
31 if (!fn
) throw new Error('Fn parameter is mandatory')
32 // keep the worker active
34 // console.log('ClusterWorker#constructor', 'is not master')
35 this.interval
= setInterval(
36 this.checkAlive
.bind(this),
37 this.maxInactiveTime
/ 2
39 this.checkAlive
.bind(this)()
41 worker
.on('message', (value
: MessageValue
<Data
>) => {
42 // console.log("cluster.on('message', value)", value)
43 if (value
?.data
&& value
.id
) {
44 // here you will receive messages
45 // console.log('This is the main worker ' + isMaster)
47 this.runInAsyncScope(this.runAsync
.bind(this), this, fn
, value
)
49 this.runInAsyncScope(this.run
.bind(this), this, fn
, value
)
51 } else if (value
.kill
) {
52 // here is time to kill this worker, just clearing the interval
53 if (this.interval
) clearInterval(this.interval
)
59 protected checkAlive (): void {
60 if (Date.now() - this.lastTask
> this.maxInactiveTime
) {
61 worker
.send({ kill
: 1 })
66 fn
: (data
?: Data
) => Response
,
67 value
: MessageValue
<Data
>
70 const res
= fn(value
.data
as Data
)
71 worker
.send({ data
: res
, id
: value
.id
})
72 this.lastTask
= Date.now()
74 const err
= e
instanceof Error ? e
.message
: e
75 worker
.send({ error
: err
, id
: value
.id
})
76 this.lastTask
= Date.now()
81 fn
: (data
?: Data
) => Promise
<Response
>,
82 value
: MessageValue
<Data
>
86 worker
.send({ data
: res
, id
: value
.id
})
87 this.lastTask
= Date.now()
91 const err
= e
instanceof Error ? e
.message
: e
92 worker
.send({ error
: err
, id
: value
.id
})
93 this.lastTask
= Date.now()