X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fworker%2Fabstract-worker.ts;h=7d5494deeb3bbe80d2a725bd6ef0d685ca288775;hb=646d040a3beab622286af393459777f74b1366ba;hp=bfdff391cba86be9c9399837062ac8c5523b6087;hpb=6b81370106fdec4cab2d203f6892a7d79c2cd5c2;p=poolifier.git diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index bfdff391..7d5494de 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -58,9 +58,9 @@ export abstract class AbstractWorker< */ protected statistics!: WorkerStatistics /** - * Handler id of the `aliveInterval` worker alive check. + * Handler id of the `activeInterval` worker activity check. */ - protected aliveInterval?: NodeJS.Timeout + protected activeInterval?: NodeJS.Timeout /** * Constructs a new poolifier worker. * @@ -83,7 +83,7 @@ export abstract class AbstractWorker< */ killBehavior: DEFAULT_KILL_BEHAVIOR, /** - * The maximum time to keep this worker alive while idle. + * The maximum time to keep this worker active while idle. * The pool automatically checks and terminates this worker when the time expires. */ maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME @@ -119,7 +119,15 @@ export abstract class AbstractWorker< } this.taskFunctions = new Map>() if (typeof taskFunctions === 'function') { - this.taskFunctions.set(DEFAULT_TASK_NAME, taskFunctions.bind(this)) + const boundFn = taskFunctions.bind(this) + this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn) + this.taskFunctions.set( + typeof taskFunctions.name === 'string' && + taskFunctions.name.trim().length > 0 + ? taskFunctions.name + : 'fn1', + boundFn + ) } else if (isPlainObject(taskFunctions)) { let firstEntry = true for (const [name, fn] of Object.entries(taskFunctions)) { @@ -133,11 +141,12 @@ export abstract class AbstractWorker< 'A taskFunctions parameter object value is not a function' ) } - this.taskFunctions.set(name, fn.bind(this)) + const boundFn = fn.bind(this) if (firstEntry) { - this.taskFunctions.set(DEFAULT_TASK_NAME, fn.bind(this)) + this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn) firstEntry = false } + this.taskFunctions.set(name, boundFn) } if (firstEntry) { throw new Error('taskFunctions parameter object is empty') @@ -190,13 +199,14 @@ export abstract class AbstractWorker< throw new TypeError('fn parameter is not a function') } try { + const boundFn = fn.bind(this) if ( this.taskFunctions.get(name) === this.taskFunctions.get(DEFAULT_TASK_NAME) ) { - this.taskFunctions.set(DEFAULT_TASK_NAME, fn.bind(this)) + this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn) } - this.taskFunctions.set(name, fn.bind(this)) + this.taskFunctions.set(name, boundFn) return true } catch { return false @@ -232,7 +242,16 @@ export abstract class AbstractWorker< } /** - * Sets the default task function to use when no task function name is specified. + * Lists the names of the worker's task functions. + * + * @returns The names of the worker's task functions. + */ + public listTaskFunctions (): string[] { + return Array.from(this.taskFunctions.keys()) + } + + /** + * Sets the default task function to use in the worker. * * @param name - The name of the task function to use as default task function. * @returns Whether the default task function was set or not. @@ -257,10 +276,7 @@ export abstract class AbstractWorker< try { this.taskFunctions.set( DEFAULT_TASK_NAME, - this.taskFunctions.get(name)?.bind(this) as WorkerFunction< - Data, - Response - > + this.taskFunctions.get(name) as WorkerFunction ) return true } catch { @@ -277,54 +293,53 @@ export abstract class AbstractWorker< if (message.workerId === this.id) { if (message.ready != null) { // Startup message received - this.workerReady() + this.sendReadyResponse() } else if (message.statistics != null) { // Statistics message received this.statistics = message.statistics - } else if (message.checkAlive != null) { - // Check alive message received - message.checkAlive ? this.startCheckAlive() : this.stopCheckAlive() + } else if (message.checkActive != null) { + // Check active message received + message.checkActive ? this.startCheckActive() : this.stopCheckActive() } else if (message.id != null && message.data != null) { // Task message received this.run(message) } else if (message.kill === true) { // Kill message received - this.stopCheckAlive() + this.stopCheckActive() this.emitDestroy() } } } /** - * Notifies the main worker that this worker is ready to process tasks. + * Sends the ready response to the main worker. */ - protected workerReady (): void { + protected sendReadyResponse (): void { !this.isMain && this.sendToMainWorker({ ready: true, workerId: this.id }) } /** - * Starts the worker alive check interval. + * Starts the worker check active interval. */ - private startCheckAlive (): void { + private startCheckActive (): void { this.lastTaskTimestamp = performance.now() - this.aliveInterval = setInterval( - this.checkAlive.bind(this), + this.activeInterval = setInterval( + this.checkActive.bind(this), (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2 - ) - this.checkAlive.bind(this)() + ).unref() } /** - * Stops the worker alive check interval. + * Stops the worker check active interval. */ - private stopCheckAlive (): void { - this.aliveInterval != null && clearInterval(this.aliveInterval) + private stopCheckActive (): void { + this.activeInterval != null && clearInterval(this.activeInterval) } /** * Checks if the worker should be terminated, because its living too long. */ - private checkAlive (): void { + private checkActive (): void { if ( performance.now() - this.lastTaskTimestamp > (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) @@ -411,7 +426,7 @@ export abstract class AbstractWorker< id: task.id }) } finally { - if (!this.isMain && this.aliveInterval != null) { + if (!this.isMain && this.activeInterval != null) { this.lastTaskTimestamp = performance.now() } } @@ -452,7 +467,7 @@ export abstract class AbstractWorker< }) }) .finally(() => { - if (!this.isMain && this.aliveInterval != null) { + if (!this.isMain && this.activeInterval != null) { this.lastTaskTimestamp = performance.now() } })