From f4289ecb624ab84f01cf82e9177befaab2b50e28 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 1 Jul 2025 15:24:02 +0200 Subject: [PATCH] feat!: add abortable task support (#1625) MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * refactor: make worker a first class event emitter Signed-off-by: Jérôme Benoit * docs: fix API documentation Signed-off-by: Jérôme Benoit * docs: move changelog entry at the beginning Signed-off-by: Jérôme Benoit * build(ci): silence linter errors Signed-off-by: Jérôme Benoit * build: fix linter errors Signed-off-by: Jérôme Benoit * build: silence linter error Signed-off-by: Jérôme Benoit * feat: add delete op to priority queue Signed-off-by: Jérôme Benoit * test: add priority queue delete() test Signed-off-by: Jérôme Benoit * docs: add missing @param to workerNode.deleteTask() Signed-off-by: Jérôme Benoit * docs: add code comment Signed-off-by: Jérôme Benoit * refactor: silencer linter Signed-off-by: Jérôme Benoit * fix: fix queues delete() implementation Signed-off-by: Jérôme Benoit * fix: fix delete() in fixed queue Signed-off-by: Jérôme Benoit * Update src/priority-queue.ts * Update src/priority-queue.ts * Update src/priority-queue.ts * test: improve priority queue delete() coverage Signed-off-by: Jérôme Benoit * build(ci): silence linting errors Signed-off-by: Jérôme Benoit * refactor: code reformatting Signed-off-by: Jérôme Benoit * refactor: code reformatting Signed-off-by: Jérôme Benoit * refactor: code formatting Signed-off-by: Jérôme Benoit * chore: fix merge issues Signed-off-by: Jérôme Benoit * docs(api): formatting Signed-off-by: Jérôme Benoit * chore: merge eslint-plugin-perfectionist reformatting Signed-off-by: Jérôme Benoit * [autofix.ci] apply automated fixes * chore: fix mismerge Signed-off-by: Jérôme Benoit * chore: fix CHANGELOG.md mismerge Signed-off-by: Jérôme Benoit * chore: silence linter Signed-off-by: Jérôme Benoit * fix: ensure abort signal is defined before usage Signed-off-by: Jérôme Benoit * refactor: code formatting Signed-off-by: Jérôme Benoit * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * refactor: make mapExecute() properly handle one abort signal per task data Signed-off-by: Jérôme Benoit * refactor: propagate abort signal error when possible Signed-off-by: Jérôme Benoit * test: improve queue delete() coverage Signed-off-by: Jérôme Benoit * test: add abort error UT Signed-off-by: Jérôme Benoit * test: tests for tasks abortion Signed-off-by: Jérôme Benoit * refactor: fix taskId type definition Signed-off-by: Jérôme Benoit * refactor: cleanup AbortError properties Signed-off-by: Jérôme Benoit * docs(README.md): refinement Signed-off-by: Jérôme Benoit * fix: ensure task abortion play nice with task stealing Signed-off-by: Jérôme Benoit * fix: cascaded tasks abortion is legit Signed-off-by: Jérôme Benoit --------- Signed-off-by: Jérôme Benoit Co-authored-by: Jérôme Benoit Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .vscode/settings.json | 1 + README.md | 1 + docs/api.md | 15 +- .../fastify-hybrid/@types/fastify/index.d.ts | 2 + .../fastify-hybrid/src/fastify-poolifier.ts | 6 +- .../@types/fastify/index.d.ts | 2 + .../src/fastify-poolifier.ts | 7 +- src/pools/abstract-pool.ts | 165 +++++++++++++++--- src/pools/pool.ts | 8 +- src/pools/utils.ts | 1 + src/pools/worker-node.ts | 8 +- src/pools/worker.ts | 17 ++ src/queues/abstract-fixed-queue.ts | 13 ++ src/queues/priority-queue.ts | 30 ++++ src/queues/queue-types.ts | 6 + src/utility-types.ts | 17 ++ src/worker/abort-error.ts | 6 + src/worker/abstract-worker.ts | 82 ++++++++- src/worker/cluster-worker.ts | 13 +- src/worker/thread-worker.ts | 9 +- src/worker/worker-types.ts | 3 + tests/pools/abstract-pool.test.mjs | 32 +++- tests/pools/cluster/fixed.test.mjs | 29 +++ tests/pools/thread/fixed.test.mjs | 33 +++- tests/pools/utils.test.mjs | 2 + tests/pools/worker-node.test.mjs | 2 + tests/queues/fixed-priority-queue.test.mjs | 35 ++++ tests/queues/fixed-queue.test.mjs | 33 ++++ tests/queues/priority-queue.test.mjs | 106 +++++++++++ tests/worker/abort-error.test.mjs | 16 ++ tests/worker/cluster-worker.test.mjs | 1 + tests/worker/thread-worker.test.mjs | 1 + 32 files changed, 645 insertions(+), 57 deletions(-) create mode 100644 src/worker/abort-error.ts create mode 100644 src/worker/worker-types.ts create mode 100644 tests/worker/abort-error.test.mjs diff --git a/.vscode/settings.json b/.vscode/settings.json index 510af97ac..e9b64b23b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,6 +3,7 @@ "source.fixAll": "explicit" }, "cSpell.words": [ + "abortable", "Alessandro", "Ardizio", "autobuild", diff --git a/README.md b/README.md index ddaa511dd..2bd85571e 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ Please consult our [general guidelines](#general-guidelines). - Tasks stealing under back pressure :white_check_mark: - Tasks redistribution on worker error :white_check_mark: - Support for sync and async task function :white_check_mark: +- Support for abortable task function :white_check_mark: - Support for multiple task functions with per task function queuing priority and tasks distribution strategy :white_check_mark: - Support for task functions [CRUD](https://en.wikipedia.org/wiki/Create,_read,_update_and_delete) operations at runtime :white_check_mark: - General guidelines on pool choice :white_check_mark: diff --git a/docs/api.md b/docs/api.md index 12b552af2..79243daa2 100644 --- a/docs/api.md +++ b/docs/api.md @@ -5,8 +5,8 @@ - [Pool](#pool) - [`pool = new FixedThreadPool/FixedClusterPool(numberOfThreads/numberOfWorkers, filePath, opts)`](#pool--new-fixedthreadpoolfixedclusterpoolnumberofthreadsnumberofworkers-filepath-opts) - [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts) - - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist) - - [`pool.mapExecute(data, name, transferList)`](#poolmapexecutedata-name-transferlist) + - [`pool.execute(data, name, abortSignal, transferList)`](#poolexecutedata-name-abortsignal-transferlist) + - [`pool.mapExecute(data, name, abortSignals, transferList)`](#poolmapexecutedata-name-abortsignals-transferlist) - [`pool.start()`](#poolstart) - [`pool.destroy()`](#pooldestroy) - [`pool.hasTaskFunction(name)`](#poolhastaskfunctionname) @@ -38,18 +38,20 @@ `filePath` (mandatory) Path to a file with a worker implementation. `opts` (optional) An object with the pool options properties described below. -### `pool.execute(data, name, transferList)` +### `pool.execute(data, name, abortSignal, transferList)` `data` (optional) An object that you want to pass to your worker task function implementation. `name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'` +`abortSignal` (optional) An abort signal to abort the task function execution. `transferList` (optional) An array of transferable objects that you want to transfer to your [`ThreadWorker`](#class-yourworker-extends-threadworkerclusterworker) worker implementation. This method is available on both pool implementations and returns a promise with the task function execution response. -### `pool.mapExecute(data, name, transferList)` +### `pool.mapExecute(data, name, abortSignals, transferList)` -`data` Iterable objects that you want to pass to your worker task function implementation. +`data` An iterable of objects that you want to pass to your worker task function implementation. `name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'` +`abortSignals` (optional) An iterable of AbortSignal to abort the matching object task function execution. `transferList` (optional) An array of transferable objects that you want to transfer to your [`ThreadWorker`](#class-yourworker-extends-threadworkerclusterworker) worker implementation. This method is available on both pool implementations and returns a promise with the task function execution responses array. @@ -105,7 +107,6 @@ An object with these properties: Default: `() => {}` - `workerChoiceStrategy` (optional) - The default worker choice strategy to use in this pool: - - `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executing and queued tasks - `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks execution time @@ -119,7 +120,6 @@ An object with these properties: - `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool. Properties: - - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`. - `runTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md) runtime instead of the tasks simple moving average runtime in worker choice strategies. - `waitTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md) wait time instead of the tasks simple moving average wait time in worker choice strategies. @@ -139,7 +139,6 @@ An object with these properties: - `tasksQueueOptions` (optional) - The worker tasks queue options object to use in this pool. Properties: - - `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer. - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer. - `taskStealing` (optional) - Task stealing enablement on idle. diff --git a/examples/typescript/http-server-pool/fastify-hybrid/@types/fastify/index.d.ts b/examples/typescript/http-server-pool/fastify-hybrid/@types/fastify/index.d.ts index 2ffda7085..5d61b6eaf 100644 --- a/examples/typescript/http-server-pool/fastify-hybrid/@types/fastify/index.d.ts +++ b/examples/typescript/http-server-pool/fastify-hybrid/@types/fastify/index.d.ts @@ -9,11 +9,13 @@ declare module 'fastify' { execute: ( data?: ThreadWorkerData, name?: string, + abortSignal?: AbortSignal, transferList?: readonly Transferable[] ) => Promise mapExecute: ( data: Iterable, name?: string, + abortSignals?: Iterable, transferList?: readonly Transferable[] ) => Promise pool: DynamicThreadPool diff --git a/examples/typescript/http-server-pool/fastify-hybrid/src/fastify-poolifier.ts b/examples/typescript/http-server-pool/fastify-hybrid/src/fastify-poolifier.ts index 9808af10a..ef247f7cd 100644 --- a/examples/typescript/http-server-pool/fastify-hybrid/src/fastify-poolifier.ts +++ b/examples/typescript/http-server-pool/fastify-hybrid/src/fastify-poolifier.ts @@ -40,9 +40,10 @@ const fastifyPoolifierPlugin: FastifyPluginCallback = ( async ( data?: ThreadWorkerData, name?: string, + abortSignal?: AbortSignal, transferList?: readonly Transferable[] ): Promise => - await pool.execute(data, name, transferList) + await pool.execute(data, name, abortSignal, transferList) ) } if (!fastify.hasDecorator('mapExecute')) { @@ -51,9 +52,10 @@ const fastifyPoolifierPlugin: FastifyPluginCallback = ( async ( data: Iterable, name?: string, + abortSignals?: Iterable, transferList?: readonly Transferable[] ): Promise => - await pool.mapExecute(data, name, transferList) + await pool.mapExecute(data, name, abortSignals, transferList) ) } done() diff --git a/examples/typescript/http-server-pool/fastify-worker_threads/@types/fastify/index.d.ts b/examples/typescript/http-server-pool/fastify-worker_threads/@types/fastify/index.d.ts index 7e46d4218..bd52a2f83 100644 --- a/examples/typescript/http-server-pool/fastify-worker_threads/@types/fastify/index.d.ts +++ b/examples/typescript/http-server-pool/fastify-worker_threads/@types/fastify/index.d.ts @@ -9,11 +9,13 @@ declare module 'fastify' { execute: ( data?: WorkerData, name?: string, + abortSignal?: AbortSignal, transferList?: readonly Transferable[] ) => Promise mapExecute: ( data: Iterable, name?: string, + abortSignals?: Iterable, transferList?: readonly Transferable[] ) => Promise pool: DynamicThreadPool diff --git a/examples/typescript/http-server-pool/fastify-worker_threads/src/fastify-poolifier.ts b/examples/typescript/http-server-pool/fastify-worker_threads/src/fastify-poolifier.ts index 2273b0317..1d97e64c1 100644 --- a/examples/typescript/http-server-pool/fastify-worker_threads/src/fastify-poolifier.ts +++ b/examples/typescript/http-server-pool/fastify-worker_threads/src/fastify-poolifier.ts @@ -40,8 +40,10 @@ const fastifyPoolifierPlugin: FastifyPluginCallback = ( async ( data?: WorkerData, name?: string, + abortSignal?: AbortSignal, transferList?: readonly Transferable[] - ): Promise => await pool.execute(data, name, transferList) + ): Promise => + await pool.execute(data, name, abortSignal, transferList) ) } if (!fastify.hasDecorator('mapExecute')) { @@ -50,9 +52,10 @@ const fastifyPoolifierPlugin: FastifyPluginCallback = ( async ( data: Iterable, name?: string, + abortSignals?: Iterable, transferList?: readonly Transferable[] ): Promise => - await pool.mapExecute(data, name, transferList) + await pool.mapExecute(data, name, abortSignals, transferList) ) } done() diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index dad20de3d..695861966 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -667,6 +667,7 @@ export abstract class AbstractPool< public async execute ( data?: Data, name?: string, + abortSignal?: AbortSignal, transferList?: readonly Transferable[] ): Promise { if (!this.started) { @@ -681,10 +682,13 @@ export abstract class AbstractPool< if (name != null && typeof name === 'string' && name.trim().length === 0) { throw new TypeError('name argument must not be an empty string') } + if (abortSignal != null && !(abortSignal instanceof AbortSignal)) { + throw new TypeError('abortSignal argument must be an AbortSignal') + } if (transferList != null && !Array.isArray(transferList)) { throw new TypeError('transferList argument must be an array') } - return await this.internalExecute(data, name, transferList) + return await this.internalExecute(data, name, abortSignal, transferList) } /** @inheritDoc */ @@ -711,6 +715,7 @@ export abstract class AbstractPool< public async mapExecute ( data: Iterable, name?: string, + abortSignals?: Iterable, transferList?: readonly Transferable[] ): Promise { if (!this.started) { @@ -732,15 +737,42 @@ export abstract class AbstractPool< if (name != null && typeof name === 'string' && name.trim().length === 0) { throw new TypeError('name argument must not be an empty string') } - if (transferList != null && !Array.isArray(transferList)) { - throw new TypeError('transferList argument must be an array') - } if (!Array.isArray(data)) { data = [...data] } + if (abortSignals != null) { + if (typeof abortSignals[Symbol.iterator] !== 'function') { + throw new TypeError('abortSignals argument must be an iterable') + } + for (const abortSignal of abortSignals) { + if (!(abortSignal instanceof AbortSignal)) { + throw new TypeError( + 'abortSignals argument must be an iterable of AbortSignal' + ) + } + } + if (!Array.isArray(abortSignals)) { + abortSignals = [...abortSignals] + } + if ((data as Data[]).length !== (abortSignals as AbortSignal[]).length) { + throw new Error( + 'data and abortSignals arguments must have the same length' + ) + } + } + if (transferList != null && !Array.isArray(transferList)) { + throw new TypeError('transferList argument must be an array') + } + const tasks: [Data, AbortSignal | undefined][] = Array.from( + { length: (data as Data[]).length }, + (_, i) => [ + (data as Data[])[i], + abortSignals != null ? (abortSignals as AbortSignal[])[i] : undefined, + ] + ) return await Promise.all( - (data as Data[]).map(data => - this.internalExecute(data, name, transferList) + tasks.map(([data, abortSignal]) => + this.internalExecute(data, name, abortSignal, transferList) ) ) } @@ -969,6 +1001,7 @@ export abstract class AbstractPool< ) } } + this.workerNodes[workerNodeKey].on('abortTask', this.abortTask) } /** @@ -1318,6 +1351,43 @@ export abstract class AbstractPool< } } + private readonly abortTask = (eventDetail: WorkerNodeEventDetail): void => { + if (!this.started || this.destroying) { + return + } + const { taskId, workerId } = eventDetail + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const promiseResponse = this.promiseResponseMap.get(taskId!) + if (promiseResponse == null) { + return + } + const { abortSignal, reject } = promiseResponse + if (abortSignal?.aborted === false) { + return + } + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerNode = this.workerNodes[workerNodeKey] + if (!workerNode.info.ready) { + return + } + if (this.opts.enableTasksQueue === true) { + for (const task of workerNode.tasksQueue) { + const { abortable, name } = task + if (taskId === task.taskId && abortable === true) { + workerNode.info.queuedTaskAbortion = true + workerNode.deleteTask(task) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.promiseResponseMap.delete(taskId!) + workerNode.info.queuedTaskAbortion = false + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + reject(this.getAbortError(name!, taskId!)) + return + } + } + } + this.sendToWorker(workerNodeKey, { taskId, taskOperation: 'abort' }) + } + /** * Adds the given worker node in the pool worker nodes. * @param workerNode - The worker node. @@ -1559,8 +1629,9 @@ export abstract class AbstractPool< * @param task - The task to execute. */ private executeTask (workerNodeKey: number, task: Task): void { + const { transferList } = task this.beforeTaskExecutionHook(workerNodeKey, task) - this.sendToWorker(workerNodeKey, task, task.transferList) + this.sendToWorker(workerNodeKey, task, transferList) this.checkAndEmitTaskExecutionEvents() } @@ -1570,6 +1641,19 @@ export abstract class AbstractPool< } } + private readonly getAbortError = ( + taskName: string, + taskId: `${string}-${string}-${string}-${string}-${string}` + ): Error => { + const abortError = this.promiseResponseMap.get(taskId)?.abortSignal + ?.reason as Error | string + return abortError instanceof Error + ? abortError + : typeof abortError === 'string' + ? new Error(abortError) + : new Error(`Task '${taskName}' id '${taskId}' aborted`) + } + /** * Gets task function worker choice strategy, if any. * @param name - The task function name. @@ -1691,7 +1775,8 @@ export abstract class AbstractPool< const workerNode = this.workerNodes[workerNodeKey] if (workerError != null) { this.emitter?.emit(PoolEvents.taskError, workerError) - const error = this.handleWorkerError(workerError) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const error = this.handleWorkerError(taskId!, workerError) asyncResource != null ? asyncResource.runInAsyncScope(reject, this.emitter, error) : reject(error) @@ -1729,13 +1814,21 @@ export abstract class AbstractPool< } } - private handleWorkerError (workerError: WorkerError): Error { - if (workerError.error != null) { - return workerError.error + private readonly handleWorkerError = ( + taskId: `${string}-${string}-${string}-${string}-${string}`, + workerError: WorkerError + ): Error => { + const { aborted, error, message, name, stack } = workerError + if (aborted) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + return this.getAbortError(name!, taskId) + } + if (error != null) { + return error } - const error = new Error(workerError.message) - error.stack = workerError.stack - return error + const wError = new Error(message) + wError.stack = stack + return wError } private readonly handleWorkerNodeBackPressureEvent = ( @@ -1896,12 +1989,14 @@ export abstract class AbstractPool< private async internalExecute ( data?: Data, name?: string, + abortSignal?: AbortSignal, transferList?: readonly Transferable[] ): Promise { return await new Promise((resolve, reject) => { const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode(name) const task: Task = { + abortable: abortSignal != null, data: data ?? ({} as Data), name: name ?? DEFAULT_TASK_NAME, priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name), @@ -1913,6 +2008,30 @@ export abstract class AbstractPool< timestamp, transferList, } + abortSignal?.addEventListener( + 'abort', + () => { + this.workerNodes[workerNodeKey].emit('abortTask', { + taskId: task.taskId, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerId: this.getWorkerInfo(workerNodeKey)!.id!, + }) + }, + { once: true } + ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.promiseResponseMap.set(task.taskId!, { + reject, + resolve, + workerNodeKey, + ...(this.emitter != null && { + asyncResource: new AsyncResource('poolifier:task', { + requireManualDestroy: true, + triggerAsyncId: this.emitter.asyncId, + }), + }), + abortSignal, + }) if ( this.opts.enableTasksQueue === false || (this.opts.enableTasksQueue === true && @@ -1922,20 +2041,6 @@ export abstract class AbstractPool< } else { this.enqueueTask(workerNodeKey, task) } - queueMicrotask(() => { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.promiseResponseMap.set(task.taskId!, { - reject, - resolve, - workerNodeKey, - ...(this.emitter != null && { - asyncResource: new AsyncResource('poolifier:task', { - requireManualDestroy: true, - triggerAsyncId: this.emitter.asyncId, - }), - }), - }) - }) }) } @@ -2275,9 +2380,11 @@ export abstract class AbstractPool< !sourceWorkerNode.info.ready || sourceWorkerNode.info.stolen || sourceWorkerNode.info.stealing || + sourceWorkerNode.info.queuedTaskAbortion || !destinationWorkerNode.info.ready || destinationWorkerNode.info.stolen || - destinationWorkerNode.info.stealing + destinationWorkerNode.info.stealing || + destinationWorkerNode.info.queuedTaskAbortion ) { return } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 96050444a..ba9d3b479 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -132,12 +132,14 @@ export interface IPool< * Executes the specified function in the worker constructor with the task data input parameter. * @param data - The optional task input data for the specified task function. This can only be structured-cloneable data. * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed. - * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards. + * @param abortSignal - The optional AbortSignal to abort the task. + * @param transferList - The optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards. * @returns Promise with a task function response that will be fulfilled when the task is completed. */ readonly execute: ( data?: Data, name?: string, + abortSignal?: AbortSignal, transferList?: readonly Transferable[] ) => Promise /** @@ -159,12 +161,14 @@ export interface IPool< * Executes the specified function in the worker constructor with the tasks data iterable input parameter. * @param data - The tasks iterable input data for the specified task function. This can only be an iterable of structured-cloneable data. * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed. - * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards. + * @param abortSignals - The optional iterable of AbortSignal to abort the tasks iterable. + * @param transferList - The optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards. * @returns Promise with an array of task function responses that will be fulfilled when the tasks are completed. */ readonly mapExecute: ( data: Iterable, name?: string, + abortSignals?: Iterable, transferList?: readonly Transferable[] ) => Promise /** diff --git a/src/pools/utils.ts b/src/pools/utils.ts index baa4e4480..d708a5cf5 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -440,6 +440,7 @@ export const initWorkerInfo = (worker: IWorker): WorkerInfo => { continuousStealing: false, dynamic: false, id: getWorkerId(worker), + queuedTaskAbortion: false, ready: false, stealing: false, stolen: false, diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 6c7235b8c..8e33add0f 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -39,6 +39,8 @@ export class WorkerNode /** @inheritdoc */ public strategyData?: StrategyData /** @inheritdoc */ + public readonly tasksQueue: PriorityQueue> + /** @inheritdoc */ public tasksQueueBackPressureSize: number /** @inheritdoc */ public usage: WorkerUsage @@ -46,7 +48,6 @@ export class WorkerNode public readonly worker: Worker private setBackPressureFlag: boolean private readonly taskFunctionsUsage: Map - private readonly tasksQueue: PriorityQueue> /** * Constructs a new worker node. @@ -81,6 +82,11 @@ export class WorkerNode this.tasksQueue.clear() } + /** @inheritdoc */ + public deleteTask (task: Task): boolean { + return this.tasksQueue.delete(task) + } + /** @inheritdoc */ public deleteTaskFunctionWorkerUsage (name: string): boolean { return this.taskFunctionsUsage.delete(name) diff --git a/src/pools/worker.ts b/src/pools/worker.ts index ff3a5c828..edc659d37 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -2,6 +2,7 @@ import type { EventEmitter } from 'node:events' import type { MessageChannel, WorkerOptions } from 'node:worker_threads' import type { CircularBuffer } from '../circular-buffer.js' +import type { PriorityQueue } from '../queues/priority-queue.js' import type { Task, TaskFunctionProperties } from '../utility-types.js' /** @@ -194,6 +195,12 @@ export interface IWorkerNode * Clears tasks queue. */ readonly clearTasksQueue: () => void + /** + * Deletes a task from the tasks queue. + * @param task - The task to delete. + * @returns `true` if the task was deleted, `false` otherwise. + */ + readonly deleteTask: (task: Task) => boolean /** * Deletes task function worker usage statistics. * @param name - The task function name. @@ -259,6 +266,10 @@ export interface IWorkerNode * This is used to store data that are specific to the worker choice strategy. */ strategyData?: StrategyData + /** + * Tasks queue. + */ + readonly tasksQueue: PriorityQueue> /** * Tasks queue back pressure size. * This is the number of tasks that can be enqueued before the worker node has back pressure. @@ -319,6 +330,11 @@ export interface WorkerInfo { * Worker id. */ readonly id: number | undefined + /** + * Queued task abortion flag. + * This flag is set to `true` when worker node is aborting a queued task. + */ + queuedTaskAbortion: boolean /** * Ready flag. */ @@ -348,6 +364,7 @@ export interface WorkerInfo { * @internal */ export interface WorkerNodeEventDetail { + taskId?: `${string}-${string}-${string}-${string}-${string}` workerId?: number workerNodeKey?: number } diff --git a/src/queues/abstract-fixed-queue.ts b/src/queues/abstract-fixed-queue.ts index b3090adff..9bbd32fe4 100644 --- a/src/queues/abstract-fixed-queue.ts +++ b/src/queues/abstract-fixed-queue.ts @@ -36,6 +36,19 @@ export abstract class AbstractFixedQueue implements IFixedQueue { this.size = 0 } + /** @inheritdoc */ + public delete (data: T): boolean { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + const index = this.nodeArray.findIndex(node => node?.data === data) + if (index !== -1) { + this.nodeArray.splice(index, 1) + this.nodeArray.length = this.capacity + --this.size + return true + } + return false + } + /** @inheritdoc */ public dequeue (): T | undefined { if (this.empty()) { diff --git a/src/queues/priority-queue.ts b/src/queues/priority-queue.ts index 51be4266f..86961b396 100644 --- a/src/queues/priority-queue.ts +++ b/src/queues/priority-queue.ts @@ -119,6 +119,36 @@ export class PriorityQueue { this.maxSize = 0 } + /** + * Deletes the given data from the priority queue. + * @param data - Data to delete. + * @returns `true` if the data was deleted, `false` otherwise. + */ + public delete (data: T): boolean { + let node: PriorityQueueNode | undefined = this.tail + let prev: PriorityQueueNode | undefined + while (node != null) { + if (node.delete(data)) { + if (node.empty()) { + if (node === this.tail && node.next != null) { + this.tail = node.next + delete node.next + } else if (node.next != null && prev != null) { + prev.next = node.next + delete node.next + } else if (node.next == null && prev != null) { + delete prev.next + this.head = prev + } + } + return true + } + prev = node + node = node.next + } + return false + } + /** * Dequeue data from the priority queue. * @param bucket - The prioritized bucket to dequeue from. diff --git a/src/queues/queue-types.ts b/src/queues/queue-types.ts index 1e374b594..c8aba72eb 100644 --- a/src/queues/queue-types.ts +++ b/src/queues/queue-types.ts @@ -32,6 +32,12 @@ export interface IFixedQueue { * Clears the fixed queue. */ clear: () => void + /** + * Deletes the given data from the fixed priority queue. + * @param data - Data to delete. + * @returns `true` if the data was deleted, `false` otherwise. + */ + delete: (data: T) => boolean /** * Dequeue data from the fixed queue. * @returns The dequeued data or `undefined` if the fixed queue is empty. diff --git a/src/utility-types.ts b/src/utility-types.ts index 571de00c1..cd395c551 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -56,6 +56,11 @@ export interface MessageValue * Task functions properties. */ readonly taskFunctionsProperties?: TaskFunctionProperties[] + /** + * Task operation: + * - `'abort'` - Abort a task. + */ + readonly taskOperation?: 'abort' /** * Task performance. */ @@ -76,6 +81,10 @@ export interface MessageValue * @internal */ export interface PromiseResponseWrapper { + /** + * The task abort signal. + */ + readonly abortSignal?: AbortSignal /** * The asynchronous resource used to track the task execution. */ @@ -100,6 +109,10 @@ export interface PromiseResponseWrapper { * @internal */ export interface Task { + /** + * Whether the task is abortable or not. + */ + readonly abortable?: boolean /** * Task input data that will be passed to the worker. */ @@ -177,6 +190,10 @@ export interface TaskPerformance { * @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data. */ export interface WorkerError { + /** + * Whether the error is an abort error or not. + */ + readonly aborted: boolean /** * Data triggering the error. */ diff --git a/src/worker/abort-error.ts b/src/worker/abort-error.ts new file mode 100644 index 000000000..198a9691d --- /dev/null +++ b/src/worker/abort-error.ts @@ -0,0 +1,6 @@ +export class AbortError extends Error { + public constructor (message: string) { + super(message) + this.name = 'AbortError' + } +} diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index b54834d77..3b5e6af07 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -1,6 +1,7 @@ import type { Worker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' +import { EventEmitter } from 'node:events' import { performance } from 'node:perf_hooks' import type { @@ -18,6 +19,7 @@ import type { TaskFunctions, TaskSyncFunction, } from './task-functions.js' +import type { AbortTaskEventDetail } from './worker-types.js' import { buildTaskFunctionProperties, @@ -26,6 +28,7 @@ import { isAsyncFunction, isPlainObject, } from '../utils.js' +import { AbortError } from './abort-error.js' import { checkTaskFunctionName, checkValidTaskFunctionObjectEntry, @@ -60,7 +63,7 @@ export abstract class AbstractWorker< MainWorker extends MessagePort | Worker, Data = unknown, Response = unknown -> { +> extends EventEmitter { /** * Handler id of the `activeInterval` worker activity check. */ @@ -79,6 +82,14 @@ export abstract class AbstractWorker< */ protected statistics?: WorkerStatistics + /** + * Task abort functions processed by the worker when task operation 'abort' is received. + */ + protected taskAbortFunctions: Map< + `${string}-${string}-${string}-${string}-${string}`, + () => void + > + /** * Task function object(s) processed by the worker when the pool's `execute` method is invoked. */ @@ -97,10 +108,21 @@ export abstract class AbstractWorker< taskFunctions: TaskFunction | TaskFunctions, protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS ) { + super() if (this.isMain == null) { throw new Error('isMain parameter is mandatory') } this.checkTaskFunctions(taskFunctions) + this.taskAbortFunctions = new Map< + `${string}-${string}-${string}-${string}-${string}`, + () => void + >() + this.on('abortTask', (eventDetail: AbortTaskEventDetail) => { + const { taskId } = eventDetail + if (this.taskAbortFunctions.has(taskId)) { + this.taskAbortFunctions.get(taskId)?.() + } + }) this.checkWorkerOptions(this.opts) if (!this.isMain) { // Should be once() but Node.js on windows has a bug that prevents it from working @@ -268,6 +290,7 @@ export abstract class AbstractWorker< * @returns The worker error object. */ protected abstract handleError (error: Error): { + aborted: boolean error?: Error message: string stack?: string @@ -296,6 +319,7 @@ export abstract class AbstractWorker< this.sendToMainWorker({ kill: 'failure' }) } } + this.removeAllListeners() } /** @@ -376,6 +400,7 @@ export abstract class AbstractWorker< statistics, taskFunctionOperation, taskId, + taskOperation, } = message if (statistics != null) { // Statistics message received @@ -389,6 +414,9 @@ export abstract class AbstractWorker< } else if (taskId != null && data != null) { // Task message received this.run(message) + } else if (taskOperation === 'abort' && taskId != null) { + // Abort task operation message received + this.emit('abortTask', { taskId }) } else if (kill === true) { // Kill message received this.handleKillMessage(message) @@ -400,7 +428,7 @@ export abstract class AbstractWorker< * @param task - The task to execute. */ protected readonly run = (task: Task): void => { - const { data, name, taskId } = task + const { abortable, data, name, taskId } = task const taskFunctionName = name ?? DEFAULT_TASK_NAME if (!this.taskFunctions.has(taskFunctionName)) { this.sendToMainWorker({ @@ -416,7 +444,14 @@ export abstract class AbstractWorker< }) return } - const fn = this.taskFunctions.get(taskFunctionName)?.taskFunction + let fn: TaskFunction + if (abortable === true) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + fn = this.getAbortableTaskFunction(taskFunctionName, taskId!) + } else { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + fn = this.taskFunctions.get(taskFunctionName)!.taskFunction + } if (isAsyncFunction(fn)) { this.runAsync(fn as TaskAsyncFunction, task) } else { @@ -433,7 +468,7 @@ export abstract class AbstractWorker< fn: TaskAsyncFunction, task: Task ): void => { - const { data, name, taskId } = task + const { abortable, data, name, taskId } = task let taskPerformance = this.beginTaskPerformance(name) fn(data) .then(res => { @@ -457,6 +492,10 @@ export abstract class AbstractWorker< }) .finally(() => { this.updateLastTaskTimestamp() + if (abortable === true) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.taskAbortFunctions.delete(taskId!) + } }) .catch(EMPTY_FUNCTION) } @@ -470,7 +509,7 @@ export abstract class AbstractWorker< fn: TaskSyncFunction, task: Task ): void => { - const { data, name, taskId } = task + const { abortable, data, name, taskId } = task try { let taskPerformance = this.beginTaskPerformance(name) const res = fn(data) @@ -491,6 +530,10 @@ export abstract class AbstractWorker< }) } finally { this.updateLastTaskTimestamp() + if (abortable === true) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.taskAbortFunctions.delete(taskId!) + } } } @@ -624,6 +667,35 @@ export abstract class AbstractWorker< } } + /** + * Gets abortable task function. + * An abortable promise is built to permit the task to be aborted. + * @param name - The name of the task. + * @param taskId - The task id. + * @returns The abortable task function. + */ + private getAbortableTaskFunction ( + name: string, + taskId: `${string}-${string}-${string}-${string}-${string}` + ): TaskAsyncFunction { + return async (data?: Data): Promise => + await new Promise( + (resolve, reject: (reason?: unknown) => void) => { + this.taskAbortFunctions.set(taskId, () => { + reject(new AbortError(`Task '${name}' id '${taskId}' aborted`)) + }) + const taskFunction = this.taskFunctions.get(name)?.taskFunction + if (isAsyncFunction(taskFunction)) { + ;(taskFunction as TaskAsyncFunction)(data) + .then(resolve) + .catch(reject) + } else { + resolve((taskFunction as TaskSyncFunction)(data)) + } + } + ) + } + /** * Starts the worker check active interval. */ diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 394e36743..4a0b56554 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -4,6 +4,7 @@ import type { MessageValue } from '../utility-types.js' import type { TaskFunction, TaskFunctions } from './task-functions.js' import type { WorkerOptions } from './worker-options.js' +import { AbortError } from './abort-error.js' import { AbstractWorker } from './abstract-worker.js' /** @@ -43,8 +44,16 @@ export class ClusterWorker< /** * @inheritDoc */ - protected handleError (error: Error): { message: string; stack?: string } { - return { message: error.message, stack: error.stack } + protected handleError (error: Error): { + aborted: boolean + message: string + stack?: string + } { + return { + aborted: error instanceof AbortError, + message: error.message, + stack: error.stack, + } } /** @inheritDoc */ diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index 9cbba3a57..2bf172b8e 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -9,6 +9,7 @@ import type { MessageValue } from '../utility-types.js' import type { TaskFunction, TaskFunctions } from './task-functions.js' import type { WorkerOptions } from './worker-options.js' +import { AbortError } from './abort-error.js' import { AbstractWorker } from './abstract-worker.js' /** @@ -54,11 +55,17 @@ export class ThreadWorker< * @inheritDoc */ protected handleError (error: Error): { + aborted: boolean error: Error message: string stack?: string } { - return { error, message: error.message, stack: error.stack } + return { + aborted: error instanceof AbortError, + error, + message: error.message, + stack: error.stack, + } } /** @inheritDoc */ diff --git a/src/worker/worker-types.ts b/src/worker/worker-types.ts new file mode 100644 index 000000000..706b155e1 --- /dev/null +++ b/src/worker/worker-types.ts @@ -0,0 +1,3 @@ +export interface AbortTaskEventDetail { + taskId: `${string}-${string}-${string}-${string}-${string}` +} diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 46487a0ba..542e491c1 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -872,6 +872,7 @@ describe('Abstract pool test suite', () => { continuousStealing: false, dynamic: false, id: expect.any(Number), + queuedTaskAbortion: false, ready: true, stealing: false, stolen: false, @@ -892,6 +893,7 @@ describe('Abstract pool test suite', () => { continuousStealing: false, dynamic: false, id: expect.any(Number), + queuedTaskAbortion: false, ready: true, stealing: false, stolen: false, @@ -959,8 +961,11 @@ describe('Abstract pool test suite', () => { new TypeError('name argument must not be an empty string') ) await expect(pool.execute(undefined, undefined, {})).rejects.toThrow( - new TypeError('transferList argument must be an array') + new TypeError('abortSignal argument must be an AbortSignal') ) + await expect( + pool.execute(undefined, undefined, new AbortController().signal, {}) + ).rejects.toThrow(new TypeError('transferList argument must be an array')) await expect(pool.execute(undefined, 'unknown')).rejects.toThrow( new Error("Task function 'unknown' not found") ) @@ -1917,9 +1922,30 @@ describe('Abstract pool test suite', () => { await expect(pool.mapExecute([undefined], '')).rejects.toThrow( new TypeError('name argument must not be an empty string') ) - await expect(pool.mapExecute([undefined], undefined, {})).rejects.toThrow( - new TypeError('transferList argument must be an array') + await expect(pool.mapExecute([undefined], undefined, 0)).rejects.toThrow( + new TypeError('abortSignals argument must be an iterable') + ) + await expect( + pool.mapExecute([undefined], undefined, [undefined]) + ).rejects.toThrow( + new TypeError('abortSignals argument must be an iterable of AbortSignal') + ) + await expect( + pool.mapExecute([undefined], undefined, [ + new AbortController().signal, + new AbortController().signal, + ]) + ).rejects.toThrow( + new Error('data and abortSignals arguments must have the same length') ) + await expect( + pool.mapExecute( + [undefined], + undefined, + [new AbortController().signal], + {} + ) + ).rejects.toThrow(new TypeError('transferList argument must be an array')) await expect(pool.mapExecute([undefined], 'unknown')).rejects.toThrow( new Error("Task function 'unknown' not found") ) diff --git a/tests/pools/cluster/fixed.test.mjs b/tests/pools/cluster/fixed.test.mjs index e911a60c0..e41a8a6fc 100644 --- a/tests/pools/cluster/fixed.test.mjs +++ b/tests/pools/cluster/fixed.test.mjs @@ -180,6 +180,7 @@ describe('Fixed cluster pool test suite', () => { expect(inError.message).toStrictEqual('Error Message from ClusterWorker') expect(typeof inError.stack === 'string').toBe(true) expect(taskError).toStrictEqual({ + aborted: false, data, message: inError.message, name: DEFAULT_TASK_NAME, @@ -214,6 +215,7 @@ describe('Fixed cluster pool test suite', () => { ) expect(typeof inError.stack === 'string').toBe(true) expect(taskError).toStrictEqual({ + aborted: false, data, message: inError.message, name: DEFAULT_TASK_NAME, @@ -235,6 +237,33 @@ describe('Fixed cluster pool test suite', () => { expect(usedTime).toBeGreaterThanOrEqual(2000) }) + it('Verify that task can be aborted', async () => { + let error + + try { + await asyncErrorPool.execute({}, 'default', AbortSignal.timeout(500)) + } catch (e) { + error = e + } + expect(error).toBeInstanceOf(Error) + expect(error.name).toBe('TimeoutError') + expect(error.message).toBe('The operation was aborted due to timeout') + expect(error.stack).toBeDefined() + + const abortController = new AbortController() + setTimeout(() => { + abortController.abort(new Error('Task aborted')) + }, 500) + try { + await asyncErrorPool.execute({}, 'default', abortController.signal) + } catch (e) { + error = e + } + expect(error).toBeInstanceOf(Error) + expect(error.message).toBe('Task aborted') + expect(error.stack).toBeDefined() + }) + it('Shutdown test', async () => { const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers) expect(pool.emitter.eventNames()).toStrictEqual([]) diff --git a/tests/pools/thread/fixed.test.mjs b/tests/pools/thread/fixed.test.mjs index dd01556fa..ac7937095 100644 --- a/tests/pools/thread/fixed.test.mjs +++ b/tests/pools/thread/fixed.test.mjs @@ -165,7 +165,7 @@ describe('Fixed thread pool test suite', () => { let error let result try { - result = await pool.execute(undefined, undefined, [ + result = await pool.execute(undefined, undefined, undefined, [ new ArrayBuffer(16), new MessageChannel().port1, ]) @@ -175,7 +175,7 @@ describe('Fixed thread pool test suite', () => { expect(result).toStrictEqual({ ok: 1 }) expect(error).toBeUndefined() try { - result = await pool.execute(undefined, undefined, [ + result = await pool.execute(undefined, undefined, undefined, [ new SharedArrayBuffer(16), ]) } catch (e) { @@ -206,6 +206,7 @@ describe('Fixed thread pool test suite', () => { expect(inError.message).toStrictEqual('Error Message from ThreadWorker') expect(typeof inError.stack === 'string').toBe(true) expect(taskError).toStrictEqual({ + aborted: false, data, error: inError, message: inError.message, @@ -241,6 +242,7 @@ describe('Fixed thread pool test suite', () => { ) expect(typeof inError.stack === 'string').toBe(true) expect(taskError).toStrictEqual({ + aborted: false, data, error: inError, message: inError.message, @@ -263,6 +265,33 @@ describe('Fixed thread pool test suite', () => { expect(usedTime).toBeGreaterThanOrEqual(2000) }) + it('Verify that task can be aborted', async () => { + let error + + try { + await asyncErrorPool.execute({}, 'default', AbortSignal.timeout(500)) + } catch (e) { + error = e + } + expect(error).toBeInstanceOf(Error) + expect(error.name).toBe('TimeoutError') + expect(error.message).toBe('The operation was aborted due to timeout') + expect(error.stack).toBeDefined() + + const abortController = new AbortController() + setTimeout(() => { + abortController.abort(new Error('Task aborted')) + }, 500) + try { + await asyncErrorPool.execute({}, 'default', abortController.signal) + } catch (e) { + error = e + } + expect(error).toBeInstanceOf(Error) + expect(error.message).toBe('Task aborted') + expect(error.stack).toBeDefined() + }) + it('Shutdown test', async () => { const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads) expect(pool.emitter.eventNames()).toStrictEqual([]) diff --git a/tests/pools/utils.test.mjs b/tests/pools/utils.test.mjs index 18e26c9cf..5f7f5102e 100644 --- a/tests/pools/utils.test.mjs +++ b/tests/pools/utils.test.mjs @@ -134,6 +134,7 @@ describe('Pool utils test suite', () => { continuousStealing: false, dynamic: false, id: threadWorker.threadId, + queuedTaskAbortion: false, ready: false, stealing: false, stolen: false, @@ -151,6 +152,7 @@ describe('Pool utils test suite', () => { continuousStealing: false, dynamic: false, id: clusterWorker.id, + queuedTaskAbortion: false, ready: false, stealing: false, stolen: false, diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index 753cb31d6..709c67699 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -240,6 +240,7 @@ describe('Worker node test suite', () => { continuousStealing: false, dynamic: false, id: threadWorkerNode.worker.threadId, + queuedTaskAbortion: false, ready: false, stealing: false, stolen: false, @@ -302,6 +303,7 @@ describe('Worker node test suite', () => { continuousStealing: false, dynamic: false, id: clusterWorkerNode.worker.id, + queuedTaskAbortion: false, ready: false, stealing: false, stolen: false, diff --git a/tests/queues/fixed-priority-queue.test.mjs b/tests/queues/fixed-priority-queue.test.mjs index d91ea712e..1f1eb8c31 100644 --- a/tests/queues/fixed-priority-queue.test.mjs +++ b/tests/queues/fixed-priority-queue.test.mjs @@ -138,6 +138,41 @@ describe('Fixed priority queue test suite', () => { expect(fixedPriorityQueue.capacity).toBe(queueSize) }) + it('Verify delete() behavior', () => { + const fixedPriorityQueue = new FixedPriorityQueue() + fixedPriorityQueue.enqueue(1) + fixedPriorityQueue.enqueue(2, -1) + fixedPriorityQueue.enqueue(3) + expect(fixedPriorityQueue.start).toBe(0) + expect(fixedPriorityQueue.size).toBe(3) + expect(fixedPriorityQueue.nodeArray).toMatchObject([ + { data: 2, priority: -1 }, + { data: 1, priority: 0 }, + { data: 3, priority: 0 }, + ]) + expect(fixedPriorityQueue.delete(2)).toBe(true) + expect(fixedPriorityQueue.start).toBe(0) + expect(fixedPriorityQueue.size).toBe(2) + expect(fixedPriorityQueue.nodeArray).toMatchObject([ + { data: 1, priority: 0 }, + { data: 3, priority: 0 }, + ]) + expect(fixedPriorityQueue.delete(3)).toBe(true) + expect(fixedPriorityQueue.start).toBe(0) + expect(fixedPriorityQueue.size).toBe(1) + expect(fixedPriorityQueue.nodeArray).toMatchObject([ + { data: 1, priority: 0 }, + ]) + expect(fixedPriorityQueue.delete(1)).toBe(true) + expect(fixedPriorityQueue.start).toBe(0) + expect(fixedPriorityQueue.size).toBe(0) + expect(fixedPriorityQueue.nodeArray).toMatchObject([]) + expect(fixedPriorityQueue.delete(2)).toBe(false) + expect(fixedPriorityQueue.start).toBe(0) + expect(fixedPriorityQueue.size).toBe(0) + expect(fixedPriorityQueue.nodeArray).toMatchObject([]) + }) + it('Verify iterator behavior', () => { const fixedPriorityQueue = new FixedPriorityQueue() fixedPriorityQueue.enqueue(1) diff --git a/tests/queues/fixed-queue.test.mjs b/tests/queues/fixed-queue.test.mjs index 7b1053ad7..d1231643f 100644 --- a/tests/queues/fixed-queue.test.mjs +++ b/tests/queues/fixed-queue.test.mjs @@ -136,6 +136,39 @@ describe('Fixed queue test suite', () => { expect(fixedQueue.capacity).toBe(queueSize) }) + it('Verify delete() behavior', () => { + const fixedQueue = new FixedQueue() + fixedQueue.enqueue(1) + fixedQueue.enqueue(2, -1) + fixedQueue.enqueue(3) + expect(fixedQueue.start).toBe(0) + expect(fixedQueue.size).toBe(3) + expect(fixedQueue.nodeArray).toMatchObject([ + { data: 1, priority: 0 }, + { data: 2, priority: -1 }, + { data: 3, priority: 0 }, + ]) + expect(fixedQueue.delete(2)).toBe(true) + expect(fixedQueue.start).toBe(0) + expect(fixedQueue.size).toBe(2) + expect(fixedQueue.nodeArray).toMatchObject([ + { data: 1, priority: 0 }, + { data: 3, priority: 0 }, + ]) + expect(fixedQueue.delete(3)).toBe(true) + expect(fixedQueue.start).toBe(0) + expect(fixedQueue.size).toBe(1) + expect(fixedQueue.nodeArray).toMatchObject([{ data: 1, priority: 0 }]) + expect(fixedQueue.delete(1)).toBe(true) + expect(fixedQueue.start).toBe(0) + expect(fixedQueue.size).toBe(0) + expect(fixedQueue.nodeArray).toMatchObject([]) + expect(fixedQueue.delete(2)).toBe(false) + expect(fixedQueue.start).toBe(0) + expect(fixedQueue.size).toBe(0) + expect(fixedQueue.nodeArray).toMatchObject([]) + }) + it('Verify iterator behavior', () => { const fixedQueue = new FixedQueue() fixedQueue.enqueue(1) diff --git a/tests/queues/priority-queue.test.mjs b/tests/queues/priority-queue.test.mjs index 48a687326..64174908e 100644 --- a/tests/queues/priority-queue.test.mjs +++ b/tests/queues/priority-queue.test.mjs @@ -300,6 +300,112 @@ describe('Priority queue test suite', () => { expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) }) + it('Verify default bucket size delete() behavior', () => { + const priorityQueue = new PriorityQueue(defaultBucketSize, true) + priorityQueue.enqueue(1) + priorityQueue.enqueue(2) + priorityQueue.enqueue(3) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(3) + expect(priorityQueue.maxSize).toBe(3) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(2)).toBe(true) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(2) + expect(priorityQueue.maxSize).toBe(3) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(3)).toBe(true) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(1) + expect(priorityQueue.maxSize).toBe(3) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(1)).toBe(true) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(0) + expect(priorityQueue.maxSize).toBe(3) + expect(priorityQueue.tail.empty()).toBe(true) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(2)).toBe(false) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(0) + expect(priorityQueue.maxSize).toBe(3) + expect(priorityQueue.tail.empty()).toBe(true) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + }) + + it('Verify bucketSize=2 delete() behavior', () => { + const priorityQueue = new PriorityQueue(2, true) + priorityQueue.enqueue(1) + priorityQueue.enqueue(2) + priorityQueue.enqueue(3) + priorityQueue.enqueue(3, -1) + priorityQueue.enqueue(1, 1) + priorityQueue.enqueue(3, -2) + expect(priorityQueue.buckets).toBe(3) + expect(priorityQueue.size).toBe(6) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue) + expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(2)).toBe(true) + expect(priorityQueue.buckets).toBe(2) + expect(priorityQueue.size).toBe(5) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue) + expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(3)).toBe(true) + expect(priorityQueue.buckets).toBe(2) + expect(priorityQueue.size).toBe(4) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue) + expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(1)).toBe(true) + expect(priorityQueue.buckets).toBe(1) + expect(priorityQueue.size).toBe(3) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue) + expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(1)).toBe(true) + expect(priorityQueue.buckets).toBe(1) + expect(priorityQueue.size).toBe(2) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue) + expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(3)).toBe(true) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(1) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(2)).toBe(false) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(1) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(3)).toBe(true) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(0) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(true) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + }) + it('Verify enablePriority setter behavior', () => { const priorityQueue = new PriorityQueue(2) expect(priorityQueue.enablePriority).toBe(false) diff --git a/tests/worker/abort-error.test.mjs b/tests/worker/abort-error.test.mjs new file mode 100644 index 000000000..1de5f4ed4 --- /dev/null +++ b/tests/worker/abort-error.test.mjs @@ -0,0 +1,16 @@ +import { expect } from '@std/expect' + +import { AbortError } from '../../lib/worker/abort-error.cjs' + +describe('Abort error test suite', () => { + it('Verify constructor() behavior', () => { + const errorMessage = 'This is an abort error message' + const abortError = new AbortError(errorMessage) + + expect(abortError).toBeInstanceOf(AbortError) + expect(abortError).toBeInstanceOf(Error) + expect(abortError.name).toBe('AbortError') + expect(abortError.message).toBe(errorMessage) + expect(abortError.stack).toBeDefined() + }) +}) diff --git a/tests/worker/cluster-worker.test.mjs b/tests/worker/cluster-worker.test.mjs index eaf46d222..c0df3d447 100644 --- a/tests/worker/cluster-worker.test.mjs +++ b/tests/worker/cluster-worker.test.mjs @@ -94,6 +94,7 @@ describe('Cluster worker test suite', () => { const error = new Error('Error as an error') const worker = new ClusterWorker(() => {}) expect(worker.handleError(error)).toStrictEqual({ + aborted: false, message: error.message, stack: error.stack, }) diff --git a/tests/worker/thread-worker.test.mjs b/tests/worker/thread-worker.test.mjs index 4d10cf622..49e1e572b 100644 --- a/tests/worker/thread-worker.test.mjs +++ b/tests/worker/thread-worker.test.mjs @@ -94,6 +94,7 @@ describe('Thread worker test suite', () => { const error = new Error('Error as an error') const worker = new ThreadWorker(() => {}) expect(worker.handleError(error)).toStrictEqual({ + aborted: false, error, message: error.message, stack: error.stack, -- 2.43.0