Encapsulate logic of cluster and thread worker/pool (#116)
[poolifier.git] / src / worker / abstract-worker.ts
CommitLineData
c97c7edb
S
1import { AsyncResource } from 'async_hooks'
2import type { MessageValue } from '../utility-types'
3import type { WorkerOptions } from './worker-options'
4
5export abstract class AbstractWorker<
6 MainWorker,
7 // eslint-disable-next-line @typescript-eslint/no-explicit-any
8 Data = any,
9 // eslint-disable-next-line @typescript-eslint/no-explicit-any
10 Response = any
11> extends AsyncResource {
12 protected readonly maxInactiveTime: number
13 protected readonly async: boolean
14 protected lastTask: number
15 protected readonly interval?: NodeJS.Timeout
16
17 /**
18 *
19 * @param type The type of async event.
20 * @param isMain
21 * @param fn
22 * @param opts
23 */
24 public constructor (
25 type: string,
26 isMain: boolean,
27 fn: (data: Data) => Response,
28 public readonly opts: WorkerOptions = {}
29 ) {
30 super(type)
31
32 this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
33 this.async = !!this.opts.async
34 this.lastTask = Date.now()
35 if (!fn) throw new Error('Fn parameter is mandatory')
36 // keep the worker active
37 if (!isMain) {
38 this.interval = setInterval(
39 this.checkAlive.bind(this),
40 this.maxInactiveTime / 2
41 )
42 this.checkAlive.bind(this)()
43 }
44 }
45
46 protected abstract getMainWorker (): MainWorker
47
48 protected abstract sendToMainWorker (message: MessageValue<Response>): void
49
50 protected checkAlive (): void {
51 if (Date.now() - this.lastTask > this.maxInactiveTime) {
52 this.sendToMainWorker({ kill: 1 })
53 }
54 }
55
56 protected handleError (e: Error | string): string {
57 return (e as unknown) as string
58 }
59
60 protected run (
61 fn: (data?: Data) => Response,
62 value: MessageValue<Data>
63 ): void {
64 try {
65 const res = fn(value.data)
66 this.sendToMainWorker({ data: res, id: value.id })
67 this.lastTask = Date.now()
68 } catch (e) {
69 const err = this.handleError(e)
70 this.sendToMainWorker({ error: err, id: value.id })
71 this.lastTask = Date.now()
72 }
73 }
74
75 protected runAsync (
76 fn: (data?: Data) => Promise<Response>,
77 value: MessageValue<Data>
78 ): void {
79 fn(value.data)
80 .then(res => {
81 this.sendToMainWorker({ data: res, id: value.id })
82 this.lastTask = Date.now()
83 return null
84 })
85 .catch(e => {
86 const err = this.handleError(e)
87 this.sendToMainWorker({ error: err, id: value.id })
88 this.lastTask = Date.now()
89 })
90 }
91}