a7060770be39e70a73a9a8327d9d4f99bc1582c5
[poolifier.git] / src / worker / abstract-worker.ts
1 import { AsyncResource } from 'node:async_hooks'
2 import type { Worker } from 'node:cluster'
3 import type { MessagePort } from 'node:worker_threads'
4 import type {
5 MessageValue,
6 TaskFunctions,
7 WorkerAsyncFunction,
8 WorkerFunction,
9 WorkerSyncFunction
10 } from '../utility-types'
11 import { EMPTY_FUNCTION, isPlainObject } from '../utils'
12 import type { KillBehavior, WorkerOptions } from './worker-options'
13 import { KillBehaviors } from './worker-options'
14
15 const DEFAULT_FUNCTION_NAME = 'default'
16 const DEFAULT_MAX_INACTIVE_TIME = 60000
17 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT
18
19 /**
20 * Base class that implements some shared logic for all poolifier workers.
21 *
22 * @typeParam MainWorker - Type of main worker.
23 * @typeParam Data - Type of data this worker receives from pool's execution. This can only be serializable data.
24 * @typeParam Response - Type of response the worker sends back to the main worker. This can only be serializable data.
25 */
26 export abstract class AbstractWorker<
27 MainWorker extends Worker | MessagePort,
28 Data = unknown,
29 Response = unknown
30 > extends AsyncResource {
31 /**
32 * Task function(s) processed by the worker when the pool's `execution` function is invoked.
33 */
34 protected taskFunctions!: Map<string, WorkerFunction<Data, Response>>
35 /**
36 * Timestamp of the last task processed by this worker.
37 */
38 protected lastTaskTimestamp!: number
39 /**
40 * Handler id of the `aliveInterval` worker alive check.
41 */
42 protected readonly aliveInterval?: NodeJS.Timeout
43 /**
44 * Constructs a new poolifier worker.
45 *
46 * @param type - The type of async event.
47 * @param isMain - Whether this is the main worker or not.
48 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
49 * @param mainWorker - Reference to main worker.
50 * @param opts - Options for the worker.
51 */
52 public constructor (
53 type: string,
54 protected readonly isMain: boolean,
55 taskFunctions:
56 | WorkerFunction<Data, Response>
57 | TaskFunctions<Data, Response>,
58 protected mainWorker: MainWorker | undefined | null,
59 protected readonly opts: WorkerOptions = {
60 /**
61 * The kill behavior option on this worker or its default value.
62 */
63 killBehavior: DEFAULT_KILL_BEHAVIOR,
64 /**
65 * The maximum time to keep this worker alive while idle.
66 * The pool automatically checks and terminates this worker when the time expires.
67 */
68 maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME
69 }
70 ) {
71 super(type)
72 this.checkWorkerOptions(this.opts)
73 this.checkTaskFunctions(taskFunctions)
74 if (!this.isMain) {
75 this.lastTaskTimestamp = performance.now()
76 this.aliveInterval = setInterval(
77 this.checkAlive.bind(this),
78 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2
79 )
80 this.checkAlive.bind(this)()
81 }
82
83 this.mainWorker?.on('message', this.messageListener.bind(this))
84 }
85
86 private checkWorkerOptions (opts: WorkerOptions): void {
87 this.opts.killBehavior = opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR
88 this.opts.maxInactiveTime =
89 opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME
90 }
91
92 /**
93 * Checks if the `taskFunctions` parameter is passed to the constructor.
94 *
95 * @param taskFunctions - The task function(s) parameter that should be checked.
96 */
97 private checkTaskFunctions (
98 taskFunctions:
99 | WorkerFunction<Data, Response>
100 | TaskFunctions<Data, Response>
101 ): void {
102 if (taskFunctions == null) {
103 throw new Error('taskFunctions parameter is mandatory')
104 }
105 this.taskFunctions = new Map<string, WorkerFunction<Data, Response>>()
106 if (typeof taskFunctions === 'function') {
107 this.taskFunctions.set(DEFAULT_FUNCTION_NAME, taskFunctions.bind(this))
108 } else if (isPlainObject(taskFunctions)) {
109 let firstEntry = true
110 for (const [name, fn] of Object.entries(taskFunctions)) {
111 if (typeof fn !== 'function') {
112 throw new TypeError(
113 'A taskFunctions parameter object value is not a function'
114 )
115 }
116 this.taskFunctions.set(name, fn.bind(this))
117 if (firstEntry) {
118 this.taskFunctions.set(DEFAULT_FUNCTION_NAME, fn.bind(this))
119 firstEntry = false
120 }
121 }
122 if (firstEntry) {
123 throw new Error('taskFunctions parameter object is empty')
124 }
125 } else {
126 throw new TypeError(
127 'taskFunctions parameter is not a function or a plain object'
128 )
129 }
130 }
131
132 /**
133 * Worker message listener.
134 *
135 * @param message - Message received.
136 */
137 protected messageListener (message: MessageValue<Data, MainWorker>): void {
138 if (message.id != null && message.data != null) {
139 // Task message received
140 const fn = this.getTaskFunction(message.name)
141 if (fn?.constructor.name === 'AsyncFunction') {
142 this.runInAsyncScope(this.runAsync.bind(this), this, fn, message)
143 } else {
144 this.runInAsyncScope(this.runSync.bind(this), this, fn, message)
145 }
146 } else if (message.parent != null) {
147 // Main worker reference message received
148 this.mainWorker = message.parent
149 } else if (message.kill != null) {
150 // Kill message received
151 this.aliveInterval != null && clearInterval(this.aliveInterval)
152 this.emitDestroy()
153 }
154 }
155
156 /**
157 * Returns the main worker.
158 *
159 * @returns Reference to the main worker.
160 */
161 protected getMainWorker (): MainWorker {
162 if (this.mainWorker == null) {
163 throw new Error('Main worker was not set')
164 }
165 return this.mainWorker
166 }
167
168 /**
169 * Sends a message to the main worker.
170 *
171 * @param message - The response message.
172 */
173 protected abstract sendToMainWorker (message: MessageValue<Response>): void
174
175 /**
176 * Checks if the worker should be terminated, because its living too long.
177 */
178 protected checkAlive (): void {
179 if (
180 performance.now() - this.lastTaskTimestamp >
181 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
182 ) {
183 this.sendToMainWorker({ kill: this.opts.killBehavior })
184 }
185 }
186
187 /**
188 * Handles an error and convert it to a string so it can be sent back to the main worker.
189 *
190 * @param e - The error raised by the worker.
191 * @returns Message of the error.
192 */
193 protected handleError (e: Error | string): string {
194 return e as string
195 }
196
197 /**
198 * Runs the given function synchronously.
199 *
200 * @param fn - Function that will be executed.
201 * @param message - Input data for the given function.
202 */
203 protected runSync (
204 fn: WorkerSyncFunction<Data, Response>,
205 message: MessageValue<Data>
206 ): void {
207 try {
208 const startTimestamp = performance.now()
209 const res = fn(message.data)
210 const runTime = performance.now() - startTimestamp
211 this.sendToMainWorker({
212 data: res,
213 id: message.id,
214 runTime
215 })
216 } catch (e) {
217 const err = this.handleError(e as Error)
218 this.sendToMainWorker({ error: err, id: message.id })
219 } finally {
220 !this.isMain && (this.lastTaskTimestamp = performance.now())
221 }
222 }
223
224 /**
225 * Runs the given function asynchronously.
226 *
227 * @param fn - Function that will be executed.
228 * @param message - Input data for the given function.
229 */
230 protected runAsync (
231 fn: WorkerAsyncFunction<Data, Response>,
232 message: MessageValue<Data>
233 ): void {
234 const startTimestamp = performance.now()
235 fn(message.data)
236 .then(res => {
237 const runTime = performance.now() - startTimestamp
238 this.sendToMainWorker({
239 data: res,
240 id: message.id,
241 runTime
242 })
243 return null
244 })
245 .catch(e => {
246 const err = this.handleError(e as Error)
247 this.sendToMainWorker({ error: err, id: message.id })
248 })
249 .finally(() => {
250 !this.isMain && (this.lastTaskTimestamp = performance.now())
251 })
252 .catch(EMPTY_FUNCTION)
253 }
254
255 /**
256 * Gets the task function in the given scope.
257 *
258 * @param name - Name of the function that will be returned.
259 */
260 private getTaskFunction (name?: string): WorkerFunction<Data, Response> {
261 name = name ?? DEFAULT_FUNCTION_NAME
262 const fn = this.taskFunctions.get(name)
263 if (fn == null) {
264 throw new Error(`Task function "${name}" not found`)
265 }
266 return fn
267 }
268 }