X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fworker%2Fabstract-worker.ts;h=7d81a88ccc816803bcdcdae4f89cb9dd56486f51;hb=bff4b6ed4419b66ebeb64e73c39b0054cf015a70;hp=8df09ee02f1ed5078f10393d48c4c84d3428683a;hpb=e102732c0e3966b81834b2c0bdd087eb051162ad;p=poolifier.git diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 8df09ee0..7d81a88c 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -1,4 +1,5 @@ import { AsyncResource } from 'node:async_hooks' +import type { Worker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' import { performance } from 'node:perf_hooks' import type { @@ -6,7 +7,7 @@ import type { TaskPerformance, WorkerStatistics } from '../utility-types' -import { EMPTY_FUNCTION, isPlainObject } from '../utils' +import { EMPTY_FUNCTION, isAsyncFunction, isPlainObject } from '../utils' import { type KillBehavior, KillBehaviors, @@ -31,10 +32,14 @@ const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT * @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data. */ export abstract class AbstractWorker< - MainWorker extends NodeJS.Process | MessagePort, + MainWorker extends Worker | MessagePort, Data = unknown, Response = unknown > extends AsyncResource { + /** + * Worker id. + */ + protected abstract id: number /** * Task function(s) processed by the worker when the pool's `execution` function is invoked. */ @@ -66,7 +71,7 @@ export abstract class AbstractWorker< taskFunctions: | WorkerFunction | TaskFunctions, - protected mainWorker: MainWorker, + protected readonly mainWorker: MainWorker, protected readonly opts: WorkerOptions = { /** * The kill behavior option on this worker or its default value. @@ -89,8 +94,8 @@ export abstract class AbstractWorker< (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2 ) this.checkAlive.bind(this)() + this.mainWorker?.on('message', this.messageListener.bind(this)) } - this.mainWorker?.on('message', this.messageListener.bind(this)) } private checkWorkerOptions (opts: WorkerOptions): void { @@ -145,20 +150,15 @@ export abstract class AbstractWorker< * * @param message - Message received. */ - protected messageListener ( - message: MessageValue - ): void { + protected messageListener (message: MessageValue): void { if (message.id != null && message.data != null) { // Task message received const fn = this.getTaskFunction(message.name) - if (fn?.constructor.name === 'AsyncFunction') { + if (isAsyncFunction(fn)) { this.runInAsyncScope(this.runAsync.bind(this), this, fn, message) } else { this.runInAsyncScope(this.runSync.bind(this), this, fn, message) } - } else if (message.parent != null) { - // Main worker reference message received - this.mainWorker = message.parent } else if (message.statistics != null) { // Statistics message received this.statistics = message.statistics @@ -206,10 +206,10 @@ export abstract class AbstractWorker< * Handles an error and convert it to a string so it can be sent back to the main worker. * * @param e - The error raised by the worker. - * @returns Message of the error. + * @returns The error message. */ protected handleError (e: Error | string): string { - return e as string + return e instanceof Error ? e.message : e } /** @@ -229,13 +229,15 @@ export abstract class AbstractWorker< this.sendToMainWorker({ data: res, taskPerformance, + workerId: this.id, id: message.id }) } catch (e) { - const err = this.handleError(e as Error) + const errorMessage = this.handleError(e as Error | string) this.sendToMainWorker({ taskError: { - message: err, + workerId: this.id, + message: errorMessage, data: message.data }, id: message.id @@ -262,15 +264,17 @@ export abstract class AbstractWorker< this.sendToMainWorker({ data: res, taskPerformance, + workerId: this.id, id: message.id }) return null }) .catch(e => { - const err = this.handleError(e as Error) + const errorMessage = this.handleError(e as Error | string) this.sendToMainWorker({ taskError: { - message: err, + workerId: this.id, + message: errorMessage, data: message.data }, id: message.id