From 5c4d16da7677746e563fdcfe7f82cbb842d1c9e6 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 9 Jul 2023 23:19:13 +0200 Subject: [PATCH] refactor: cleanup task handling in worker code MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/index.ts | 2 +- src/pools/abstract-pool.ts | 7 ++-- src/pools/worker-node.ts | 2 +- src/pools/worker.ts | 30 +---------------- src/utility-types.ts | 31 ++++++++++++++++- src/worker/abstract-worker.ts | 63 +++++++++++++++++++++-------------- 6 files changed, 76 insertions(+), 59 deletions(-) diff --git a/src/index.ts b/src/index.ts index 54c352fc..8a3cb9b1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -24,7 +24,6 @@ export type { MeasurementStatistics, MessageHandler, OnlineHandler, - Task, TaskStatistics, WorkerInfo, WorkerType, @@ -61,6 +60,7 @@ export type { export type { MessageValue, PromiseResponseWrapper, + Task, TaskError, TaskPerformance, WorkerStatistics diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 615cf296..026bf918 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,6 +1,10 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' -import type { MessageValue, PromiseResponseWrapper } from '../utility-types' +import type { + MessageValue, + PromiseResponseWrapper, + Task +} from '../utility-types' import { DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, @@ -25,7 +29,6 @@ import type { IWorker, IWorkerNode, MessageHandler, - Task, WorkerInfo, WorkerType, WorkerUsage diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index a5cbbbfb..5c172d8a 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -1,9 +1,9 @@ import { CircularArray } from '../circular-array' import { Queue } from '../queue' +import type { Task } from '../utility-types' import { type IWorker, type IWorkerNode, - type Task, type WorkerInfo, type WorkerType, WorkerTypes, diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 055a326a..58610eaa 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -1,4 +1,5 @@ import type { CircularArray } from '../circular-array' +import type { Task } from '../utility-types' /** * Callback invoked if the worker has received a message. @@ -29,35 +30,6 @@ export type ExitHandler = ( exitCode: number ) => void -/** - * Message object that is passed as a task between main worker and worker. - * - * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. - * @internal - */ -export interface Task { - /** - * Worker id. - */ - readonly workerId: number - /** - * Task name. - */ - readonly name?: string - /** - * Task input data that will be passed to the worker. - */ - readonly data?: Data - /** - * Timestamp. - */ - readonly timestamp?: number - /** - * Message UUID. - */ - readonly id?: string -} - /** * Measurement statistics. * diff --git a/src/utility-types.ts b/src/utility-types.ts index fa1c0ffe..28bf19e2 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,6 +1,6 @@ import type { EventLoopUtilization } from 'node:perf_hooks' import type { KillBehavior } from './worker/worker-options' -import type { IWorker, Task } from './pools/worker' +import type { IWorker } from './pools/worker' /** * Task error. @@ -56,6 +56,35 @@ export interface WorkerStatistics { elu: boolean } +/** + * Message object that is passed as a task between main worker and worker. + * + * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. + * @internal + */ +export interface Task { + /** + * Worker id. + */ + readonly workerId: number + /** + * Task name. + */ + readonly name?: string + /** + * Task input data that will be passed to the worker. + */ + readonly data?: Data + /** + * Timestamp. + */ + readonly timestamp?: number + /** + * Message UUID. + */ + readonly id?: string +} + /** * Message object that is passed between main worker and worker. * diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 5e714487..7aa626f1 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -4,6 +4,7 @@ import type { MessagePort } from 'node:worker_threads' import { performance } from 'node:perf_hooks' import type { MessageValue, + Task, TaskPerformance, WorkerStatistics } from '../utility-types' @@ -280,12 +281,7 @@ export abstract class AbstractWorker< message.checkAlive ? this.startCheckAlive() : this.stopCheckAlive() } else if (message.id != null && message.data != null) { // Task message received - const fn = this.getTaskFunction(message.name) - if (isAsyncFunction(fn)) { - this.runInAsyncScope(this.runAsync.bind(this), this, fn, message) - } else { - this.runInAsyncScope(this.runSync.bind(this), this, fn, message) - } + this.run(message) } else if (message.kill === true) { // Kill message received this.stopCheckAlive() @@ -363,36 +359,51 @@ export abstract class AbstractWorker< return e instanceof Error ? e.message : e } + /** + * Runs the given task. + * + * @param task - The task to execute. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found. + */ + protected run (task: Task): void { + const fn = this.getTaskFunction(task.name) + if (isAsyncFunction(fn)) { + this.runInAsyncScope(this.runAsync.bind(this), this, fn, task) + } else { + this.runInAsyncScope(this.runSync.bind(this), this, fn, task) + } + } + /** * Runs the given function synchronously. * - * @param fn - Function that will be executed. - * @param message - Input data for the given function. + * @param fn - Task function that will be executed. + * @param task - Input data for the task function. */ protected runSync ( fn: WorkerSyncFunction, - message: MessageValue + task: Task ): void { try { - let taskPerformance = this.beginTaskPerformance(message.name) - const res = fn(message.data) + let taskPerformance = this.beginTaskPerformance(task.name) + const res = fn(task.data) taskPerformance = this.endTaskPerformance(taskPerformance) this.sendToMainWorker({ data: res, taskPerformance, workerId: this.id, - id: message.id + id: task.id }) } catch (e) { const errorMessage = this.handleError(e as Error | string) this.sendToMainWorker({ taskError: { - name: message.name ?? DEFAULT_TASK_NAME, + name: task.name ?? DEFAULT_TASK_NAME, message: errorMessage, - data: message.data + data: task.data }, workerId: this.id, - id: message.id + id: task.id }) } finally { if (!this.isMain && this.aliveInterval != null) { @@ -404,22 +415,22 @@ export abstract class AbstractWorker< /** * Runs the given function asynchronously. * - * @param fn - Function that will be executed. - * @param message - Input data for the given function. + * @param fn - Task function that will be executed. + * @param task - Input data for the task function. */ protected runAsync ( fn: WorkerAsyncFunction, - message: MessageValue + task: Task ): void { - let taskPerformance = this.beginTaskPerformance(message.name) - fn(message.data) + let taskPerformance = this.beginTaskPerformance(task.name) + fn(task.data) .then(res => { taskPerformance = this.endTaskPerformance(taskPerformance) this.sendToMainWorker({ data: res, taskPerformance, workerId: this.id, - id: message.id + id: task.id }) return null }) @@ -427,12 +438,12 @@ export abstract class AbstractWorker< const errorMessage = this.handleError(e as Error | string) this.sendToMainWorker({ taskError: { - name: message.name ?? DEFAULT_TASK_NAME, + name: task.name ?? DEFAULT_TASK_NAME, message: errorMessage, - data: message.data + data: task.data }, workerId: this.id, - id: message.id + id: task.id }) }) .finally(() => { @@ -444,9 +455,11 @@ export abstract class AbstractWorker< } /** - * Gets the task function in the given scope. + * Gets the task function with the given name. * * @param name - Name of the task function that will be returned. + * @returns The task function. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found. */ private getTaskFunction (name?: string): WorkerFunction { name = name ?? DEFAULT_TASK_NAME -- 2.34.1