X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fworker%2Fabstract-worker.ts;h=fbf99ab8f1dfffe8936b4f25140c3d9597772d3b;hb=67de15a041152a973334839ad2c73284b1b3cbb3;hp=10a0c82d0565de18529343f163b295688c22bc70;hpb=aee467366d8c393b79e7af82c6a7ab12338ee64e;p=poolifier.git diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 10a0c82d..fbf99ab8 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -1,7 +1,12 @@ import { AsyncResource } from 'node:async_hooks' import type { Worker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' -import type { MessageValue } from '../utility-types' +import type { + MessageValue, + WorkerAsyncFunction, + WorkerFunction, + WorkerSyncFunction +} from '../utility-types' import { EMPTY_FUNCTION } from '../utils' import type { KillBehavior, WorkerOptions } from './worker-options' import { KillBehaviors } from './worker-options' @@ -41,7 +46,7 @@ export abstract class AbstractWorker< public constructor ( type: string, protected readonly isMain: boolean, - fn: (data: Data) => Response, + fn: WorkerFunction, protected mainWorker: MainWorker | undefined | null, protected readonly opts: WorkerOptions = { /** @@ -56,10 +61,10 @@ export abstract class AbstractWorker< } ) { super(type) - this.checkFunctionInput(fn) this.checkWorkerOptions(this.opts) + this.checkFunctionInput(fn) if (!this.isMain) { - this.lastTaskTimestamp = Date.now() + this.lastTaskTimestamp = performance.now() this.aliveInterval = setInterval( this.checkAlive.bind(this), (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2 @@ -75,6 +80,30 @@ export abstract class AbstractWorker< ) } + private checkWorkerOptions (opts: WorkerOptions): void { + this.opts.killBehavior = opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR + this.opts.maxInactiveTime = + opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME + this.opts.async = opts.async ?? false + } + + /** + * Checks if the `fn` parameter is passed to the constructor. + * + * @param fn - The function that should be defined. + */ + private checkFunctionInput (fn: WorkerFunction): void { + if (fn == null) throw new Error('fn parameter is mandatory') + if (typeof fn !== 'function') { + throw new TypeError('fn parameter is not a function') + } + if (fn.constructor.name === 'AsyncFunction' && this.opts.async === false) { + throw new Error( + 'fn parameter is an async function, please set the async option to true' + ) + } + } + /** * Worker message listener. * @@ -83,9 +112,9 @@ export abstract class AbstractWorker< */ protected messageListener ( message: MessageValue, - fn: (data: Data) => Response + fn: WorkerFunction ): void { - if (message.data != null && message.id != null) { + if (message.id != null && message.data != null) { // Task message received if (this.opts.async === true) { this.runInAsyncScope(this.runAsync.bind(this), this, fn, message) @@ -102,25 +131,6 @@ export abstract class AbstractWorker< } } - private checkWorkerOptions (opts: WorkerOptions): void { - this.opts.killBehavior = opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR - this.opts.maxInactiveTime = - opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME - this.opts.async = opts.async ?? false - } - - /** - * Checks if the `fn` parameter is passed to the constructor. - * - * @param fn - The function that should be defined. - */ - private checkFunctionInput (fn: (data: Data) => Response): void { - if (fn == null) throw new Error('fn parameter is mandatory') - if (typeof fn !== 'function') { - throw new TypeError('fn parameter is not a function') - } - } - /** * Returns the main worker. * @@ -145,7 +155,7 @@ export abstract class AbstractWorker< */ protected checkAlive (): void { if ( - Date.now() - this.lastTaskTimestamp > + performance.now() - this.lastTaskTimestamp > (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) ) { this.sendToMainWorker({ kill: this.opts.killBehavior }) @@ -169,19 +179,23 @@ export abstract class AbstractWorker< * @param message - Input data for the given function. */ protected run ( - fn: (data?: Data) => Response, + fn: WorkerSyncFunction, message: MessageValue ): void { try { - const startTimestamp = Date.now() + const startTimestamp = performance.now() const res = fn(message.data) - const runTime = Date.now() - startTimestamp - this.sendToMainWorker({ data: res, id: message.id, runTime }) + const runTime = performance.now() - startTimestamp + this.sendToMainWorker({ + data: res, + id: message.id, + runTime + }) } catch (e) { const err = this.handleError(e as Error) this.sendToMainWorker({ error: err, id: message.id }) } finally { - !this.isMain && (this.lastTaskTimestamp = Date.now()) + !this.isMain && (this.lastTaskTimestamp = performance.now()) } } @@ -192,14 +206,18 @@ export abstract class AbstractWorker< * @param message - Input data for the given function. */ protected runAsync ( - fn: (data?: Data) => Promise, + fn: WorkerAsyncFunction, message: MessageValue ): void { - const startTimestamp = Date.now() + const startTimestamp = performance.now() fn(message.data) .then(res => { - const runTime = Date.now() - startTimestamp - this.sendToMainWorker({ data: res, id: message.id, runTime }) + const runTime = performance.now() - startTimestamp + this.sendToMainWorker({ + data: res, + id: message.id, + runTime + }) return null }) .catch(e => { @@ -207,7 +225,7 @@ export abstract class AbstractWorker< this.sendToMainWorker({ error: err, id: message.id }) }) .finally(() => { - !this.isMain && (this.lastTaskTimestamp = Date.now()) + !this.isMain && (this.lastTaskTimestamp = performance.now()) }) .catch(EMPTY_FUNCTION) }