X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fworker%2Fabstract-worker.ts;h=176a5893caeeec7cb9d82095840b97498b3822b0;hb=09a6305fb250c17cb2565f8cbe3d9afbb33f307c;hp=5390ec029719549aeef5c31791b22e3365eeaa4c;hpb=48ef910747c7eafb18ebfff83d8eb24e5f5be26c;p=poolifier.git diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 5390ec02..176a5893 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -3,14 +3,16 @@ import type { Worker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' import type { MessageValue, + TaskFunctions, WorkerAsyncFunction, WorkerFunction, WorkerSyncFunction } from '../utility-types' -import { EMPTY_FUNCTION } from '../utils' +import { EMPTY_FUNCTION, isPlainObject } from '../utils' import type { KillBehavior, WorkerOptions } from './worker-options' import { KillBehaviors } from './worker-options' +const DEFAULT_FUNCTION_NAME = 'default' const DEFAULT_MAX_INACTIVE_TIME = 60000 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT @@ -26,6 +28,10 @@ export abstract class AbstractWorker< Data = unknown, Response = unknown > extends AsyncResource { + /** + * Task function(s) processed by the worker when the pool's `execution` function is invoked. + */ + protected taskFunctions!: Map> /** * Timestamp of the last task processed by this worker. */ @@ -39,14 +45,16 @@ export abstract class AbstractWorker< * * @param type - The type of async event. * @param isMain - Whether this is the main worker or not. - * @param fn - Function processed by the worker when the pool's `execution` function is invoked. + * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function. * @param mainWorker - Reference to main worker. * @param opts - Options for the worker. */ public constructor ( type: string, protected readonly isMain: boolean, - fn: WorkerFunction, + taskFunctions: + | WorkerFunction + | TaskFunctions, protected mainWorker: MainWorker | undefined | null, protected readonly opts: WorkerOptions = { /** @@ -62,7 +70,7 @@ export abstract class AbstractWorker< ) { super(type) this.checkWorkerOptions(this.opts) - this.checkFunctionInput(fn) + this.checkTaskFunctions(taskFunctions) if (!this.isMain) { this.lastTaskTimestamp = performance.now() this.aliveInterval = setInterval( @@ -72,30 +80,69 @@ export abstract class AbstractWorker< this.checkAlive.bind(this)() } - this.mainWorker?.on( - 'message', - (message: MessageValue) => { - this.messageListener(message, fn) + this.mainWorker?.on('message', this.messageListener.bind(this)) + } + + private checkWorkerOptions (opts: WorkerOptions): void { + this.opts.killBehavior = opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR + this.opts.maxInactiveTime = + opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME + delete this.opts.async + } + + /** + * Checks if the `taskFunctions` parameter is passed to the constructor. + * + * @param taskFunctions - The task function(s) parameter that should be checked. + */ + private checkTaskFunctions ( + taskFunctions: + | WorkerFunction + | TaskFunctions + ): void { + if (taskFunctions == null) { + throw new Error('taskFunctions parameter is mandatory') + } + this.taskFunctions = new Map>() + if (typeof taskFunctions === 'function') { + this.taskFunctions.set(DEFAULT_FUNCTION_NAME, taskFunctions.bind(this)) + } else if (isPlainObject(taskFunctions)) { + let firstEntry = true + for (const [name, fn] of Object.entries(taskFunctions)) { + if (typeof fn !== 'function') { + throw new TypeError( + 'A taskFunctions parameter object value is not a function' + ) + } + this.taskFunctions.set(name, fn.bind(this)) + if (firstEntry) { + this.taskFunctions.set(DEFAULT_FUNCTION_NAME, fn.bind(this)) + firstEntry = false + } } - ) + if (firstEntry) { + throw new Error('taskFunctions parameter object is empty') + } + } else { + throw new TypeError( + 'taskFunctions parameter is not a function or a plain object' + ) + } } /** * Worker message listener. * * @param message - Message received. - * @param fn - Function processed by the worker when the pool's `execution` function is invoked. */ - protected messageListener ( - message: MessageValue, - fn: WorkerFunction - ): void { + protected messageListener (message: MessageValue): void { if (message.id != null && message.data != null) { // Task message received - if (this.opts.async === true) { + const fn = this.getTaskFunction(message.name) + if (fn?.constructor.name === 'AsyncFunction') { this.runInAsyncScope(this.runAsync.bind(this), this, fn, message) } else { - this.runInAsyncScope(this.run.bind(this), this, fn, message) + this.runInAsyncScope(this.runSync.bind(this), this, fn, message) } } else if (message.parent != null) { // Main worker reference message received @@ -107,30 +154,6 @@ export abstract class AbstractWorker< } } - private checkWorkerOptions (opts: WorkerOptions): void { - this.opts.killBehavior = opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR - this.opts.maxInactiveTime = - opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME - this.opts.async = opts.async ?? false - } - - /** - * Checks if the `fn` parameter is passed to the constructor. - * - * @param fn - The function that should be defined. - */ - private checkFunctionInput (fn: WorkerFunction): void { - if (fn == null) throw new Error('fn parameter is mandatory') - if (typeof fn !== 'function') { - throw new TypeError('fn parameter is not a function') - } - if (fn.constructor.name === 'AsyncFunction' && this.opts.async === false) { - throw new Error( - 'fn parameter is an async function, please set the async option to true' - ) - } - } - /** * Returns the main worker. * @@ -178,18 +201,20 @@ export abstract class AbstractWorker< * @param fn - Function that will be executed. * @param message - Input data for the given function. */ - protected run ( + protected runSync ( fn: WorkerSyncFunction, message: MessageValue ): void { try { const startTimestamp = performance.now() + const waitTime = startTimestamp - (message.submissionTimestamp ?? 0) const res = fn(message.data) const runTime = performance.now() - startTimestamp this.sendToMainWorker({ data: res, id: message.id, - runTime + runTime, + waitTime }) } catch (e) { const err = this.handleError(e as Error) @@ -210,13 +235,15 @@ export abstract class AbstractWorker< message: MessageValue ): void { const startTimestamp = performance.now() + const waitTime = startTimestamp - (message.submissionTimestamp ?? 0) fn(message.data) .then(res => { const runTime = performance.now() - startTimestamp this.sendToMainWorker({ data: res, id: message.id, - runTime + runTime, + waitTime }) return null }) @@ -229,4 +256,18 @@ export abstract class AbstractWorker< }) .catch(EMPTY_FUNCTION) } + + /** + * Gets the task function in the given scope. + * + * @param name - Name of the function that will be returned. + */ + private getTaskFunction (name?: string): WorkerFunction { + name = name ?? DEFAULT_FUNCTION_NAME + const fn = this.taskFunctions.get(name) + if (fn == null) { + throw new Error(`Task function "${name}" not found`) + } + return fn + } }