Only allow primitive JSON for transfer between worker and main worker (#128)
[poolifier.git] / src / worker / abstract-worker.ts
1 import { AsyncResource } from 'async_hooks'
2 import type { MessageValue } from '../utility-types'
3 import type { WorkerOptions } from './worker-options'
4
5 export abstract class AbstractWorker<
6 MainWorker,
7 Data = unknown,
8 Response = unknown
9 > extends AsyncResource {
10 protected readonly maxInactiveTime: number
11 protected readonly async: boolean
12 protected lastTask: number
13 protected readonly interval?: NodeJS.Timeout
14
15 /**
16 *
17 * @param type The type of async event.
18 * @param isMain
19 * @param fn
20 * @param opts
21 */
22 public constructor (
23 type: string,
24 isMain: boolean,
25 fn: (data: Data) => Response,
26 public readonly opts: WorkerOptions = {}
27 ) {
28 super(type)
29
30 this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
31 this.async = !!this.opts.async
32 this.lastTask = Date.now()
33 if (!fn) throw new Error('Fn parameter is mandatory')
34 // keep the worker active
35 if (!isMain) {
36 this.interval = setInterval(
37 this.checkAlive.bind(this),
38 this.maxInactiveTime / 2
39 )
40 this.checkAlive.bind(this)()
41 }
42 }
43
44 protected abstract getMainWorker (): MainWorker
45
46 protected abstract sendToMainWorker (message: MessageValue<Response>): void
47
48 protected checkAlive (): void {
49 if (Date.now() - this.lastTask > this.maxInactiveTime) {
50 this.sendToMainWorker({ kill: 1 })
51 }
52 }
53
54 protected handleError (e: Error | string): string {
55 return (e as unknown) as string
56 }
57
58 protected run (
59 fn: (data?: Data) => Response,
60 value: MessageValue<Data>
61 ): void {
62 try {
63 const res = fn(value.data)
64 this.sendToMainWorker({ data: res, id: value.id })
65 this.lastTask = Date.now()
66 } catch (e) {
67 const err = this.handleError(e)
68 this.sendToMainWorker({ error: err, id: value.id })
69 this.lastTask = Date.now()
70 }
71 }
72
73 protected runAsync (
74 fn: (data?: Data) => Promise<Response>,
75 value: MessageValue<Data>
76 ): void {
77 fn(value.data)
78 .then(res => {
79 this.sendToMainWorker({ data: res, id: value.id })
80 this.lastTask = Date.now()
81 return null
82 })
83 .catch(e => {
84 const err = this.handleError(e)
85 this.sendToMainWorker({ error: err, id: value.id })
86 this.lastTask = Date.now()
87 })
88 }
89 }