"source.fixAll": "explicit"
},
"cSpell.words": [
+ "abortable",
"Alessandro",
"Ardizio",
"autobuild",
- Tasks stealing under back pressure :white_check_mark:
- Tasks redistribution on worker error :white_check_mark:
- Support for sync and async task function :white_check_mark:
+- Support for abortable task function :white_check_mark:
- Support for multiple task functions with per task function queuing priority and tasks distribution strategy :white_check_mark:
- Support for task functions [CRUD](https://en.wikipedia.org/wiki/Create,_read,_update_and_delete) operations at runtime :white_check_mark:
- General guidelines on pool choice :white_check_mark:
- [Pool](#pool)
- [`pool = new FixedThreadPool/FixedClusterPool(numberOfThreads/numberOfWorkers, filePath, opts)`](#pool--new-fixedthreadpoolfixedclusterpoolnumberofthreadsnumberofworkers-filepath-opts)
- [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts)
- - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
- - [`pool.mapExecute(data, name, transferList)`](#poolmapexecutedata-name-transferlist)
+ - [`pool.execute(data, name, abortSignal, transferList)`](#poolexecutedata-name-abortsignal-transferlist)
+ - [`pool.mapExecute(data, name, abortSignals, transferList)`](#poolmapexecutedata-name-abortsignals-transferlist)
- [`pool.start()`](#poolstart)
- [`pool.destroy()`](#pooldestroy)
- [`pool.hasTaskFunction(name)`](#poolhastaskfunctionname)
`filePath` (mandatory) Path to a file with a worker implementation.
`opts` (optional) An object with the pool options properties described below.
-### `pool.execute(data, name, transferList)`
+### `pool.execute(data, name, abortSignal, transferList)`
`data` (optional) An object that you want to pass to your worker task function implementation.
`name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'`
+`abortSignal` (optional) An abort signal to abort the task function execution.
`transferList` (optional) An array of transferable objects that you want to transfer to your [`ThreadWorker`](#class-yourworker-extends-threadworkerclusterworker) worker implementation.
This method is available on both pool implementations and returns a promise with the task function execution response.
-### `pool.mapExecute(data, name, transferList)`
+### `pool.mapExecute(data, name, abortSignals, transferList)`
-`data` Iterable objects that you want to pass to your worker task function implementation.
+`data` An iterable of objects that you want to pass to your worker task function implementation.
`name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'`
+`abortSignals` (optional) An iterable of AbortSignal to abort the matching object task function execution.
`transferList` (optional) An array of transferable objects that you want to transfer to your [`ThreadWorker`](#class-yourworker-extends-threadworkerclusterworker) worker implementation.
This method is available on both pool implementations and returns a promise with the task function execution responses array.
Default: `() => {}`
- `workerChoiceStrategy` (optional) - The default worker choice strategy to use in this pool:
-
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
- `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executing and queued tasks
- `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks execution time
- `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool.
Properties:
-
- `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
- `runTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md) runtime instead of the tasks simple moving average runtime in worker choice strategies.
- `waitTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md) wait time instead of the tasks simple moving average wait time in worker choice strategies.
- `tasksQueueOptions` (optional) - The worker tasks queue options object to use in this pool.
Properties:
-
- `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer.
- `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
- `taskStealing` (optional) - Task stealing enablement on idle.
execute: (
data?: ThreadWorkerData,
name?: string,
+ abortSignal?: AbortSignal,
transferList?: readonly Transferable[]
) => Promise<ThreadWorkerResponse>
mapExecute: (
data: Iterable<ThreadWorkerData>,
name?: string,
+ abortSignals?: Iterable<AbortSignal>,
transferList?: readonly Transferable[]
) => Promise<ThreadWorkerResponse[]>
pool: DynamicThreadPool<ThreadWorkerData, ThreadWorkerResponse>
async (
data?: ThreadWorkerData,
name?: string,
+ abortSignal?: AbortSignal,
transferList?: readonly Transferable[]
): Promise<ThreadWorkerResponse> =>
- await pool.execute(data, name, transferList)
+ await pool.execute(data, name, abortSignal, transferList)
)
}
if (!fastify.hasDecorator('mapExecute')) {
async (
data: Iterable<ThreadWorkerData>,
name?: string,
+ abortSignals?: Iterable<AbortSignal>,
transferList?: readonly Transferable[]
): Promise<ThreadWorkerResponse[]> =>
- await pool.mapExecute(data, name, transferList)
+ await pool.mapExecute(data, name, abortSignals, transferList)
)
}
done()
execute: (
data?: WorkerData,
name?: string,
+ abortSignal?: AbortSignal,
transferList?: readonly Transferable[]
) => Promise<WorkerResponse>
mapExecute: (
data: Iterable<WorkerData>,
name?: string,
+ abortSignals?: Iterable<AbortSignal>,
transferList?: readonly Transferable[]
) => Promise<WorkerResponse[]>
pool: DynamicThreadPool<WorkerData, WorkerResponse>
async (
data?: WorkerData,
name?: string,
+ abortSignal?: AbortSignal,
transferList?: readonly Transferable[]
- ): Promise<WorkerResponse> => await pool.execute(data, name, transferList)
+ ): Promise<WorkerResponse> =>
+ await pool.execute(data, name, abortSignal, transferList)
)
}
if (!fastify.hasDecorator('mapExecute')) {
async (
data: Iterable<WorkerData>,
name?: string,
+ abortSignals?: Iterable<AbortSignal>,
transferList?: readonly Transferable[]
): Promise<WorkerResponse[]> =>
- await pool.mapExecute(data, name, transferList)
+ await pool.mapExecute(data, name, abortSignals, transferList)
)
}
done()
public async execute (
data?: Data,
name?: string,
+ abortSignal?: AbortSignal,
transferList?: readonly Transferable[]
): Promise<Response> {
if (!this.started) {
if (name != null && typeof name === 'string' && name.trim().length === 0) {
throw new TypeError('name argument must not be an empty string')
}
+ if (abortSignal != null && !(abortSignal instanceof AbortSignal)) {
+ throw new TypeError('abortSignal argument must be an AbortSignal')
+ }
if (transferList != null && !Array.isArray(transferList)) {
throw new TypeError('transferList argument must be an array')
}
- return await this.internalExecute(data, name, transferList)
+ return await this.internalExecute(data, name, abortSignal, transferList)
}
/** @inheritDoc */
public async mapExecute (
data: Iterable<Data>,
name?: string,
+ abortSignals?: Iterable<AbortSignal>,
transferList?: readonly Transferable[]
): Promise<Response[]> {
if (!this.started) {
if (name != null && typeof name === 'string' && name.trim().length === 0) {
throw new TypeError('name argument must not be an empty string')
}
- if (transferList != null && !Array.isArray(transferList)) {
- throw new TypeError('transferList argument must be an array')
- }
if (!Array.isArray(data)) {
data = [...data]
}
+ if (abortSignals != null) {
+ if (typeof abortSignals[Symbol.iterator] !== 'function') {
+ throw new TypeError('abortSignals argument must be an iterable')
+ }
+ for (const abortSignal of abortSignals) {
+ if (!(abortSignal instanceof AbortSignal)) {
+ throw new TypeError(
+ 'abortSignals argument must be an iterable of AbortSignal'
+ )
+ }
+ }
+ if (!Array.isArray(abortSignals)) {
+ abortSignals = [...abortSignals]
+ }
+ if ((data as Data[]).length !== (abortSignals as AbortSignal[]).length) {
+ throw new Error(
+ 'data and abortSignals arguments must have the same length'
+ )
+ }
+ }
+ if (transferList != null && !Array.isArray(transferList)) {
+ throw new TypeError('transferList argument must be an array')
+ }
+ const tasks: [Data, AbortSignal | undefined][] = Array.from(
+ { length: (data as Data[]).length },
+ (_, i) => [
+ (data as Data[])[i],
+ abortSignals != null ? (abortSignals as AbortSignal[])[i] : undefined,
+ ]
+ )
return await Promise.all(
- (data as Data[]).map(data =>
- this.internalExecute(data, name, transferList)
+ tasks.map(([data, abortSignal]) =>
+ this.internalExecute(data, name, abortSignal, transferList)
)
)
}
)
}
}
+ this.workerNodes[workerNodeKey].on('abortTask', this.abortTask)
}
/**
}
}
+ private readonly abortTask = (eventDetail: WorkerNodeEventDetail): void => {
+ if (!this.started || this.destroying) {
+ return
+ }
+ const { taskId, workerId } = eventDetail
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const promiseResponse = this.promiseResponseMap.get(taskId!)
+ if (promiseResponse == null) {
+ return
+ }
+ const { abortSignal, reject } = promiseResponse
+ if (abortSignal?.aborted === false) {
+ return
+ }
+ const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (!workerNode.info.ready) {
+ return
+ }
+ if (this.opts.enableTasksQueue === true) {
+ for (const task of workerNode.tasksQueue) {
+ const { abortable, name } = task
+ if (taskId === task.taskId && abortable === true) {
+ workerNode.info.queuedTaskAbortion = true
+ workerNode.deleteTask(task)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.promiseResponseMap.delete(taskId!)
+ workerNode.info.queuedTaskAbortion = false
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ reject(this.getAbortError(name!, taskId!))
+ return
+ }
+ }
+ }
+ this.sendToWorker(workerNodeKey, { taskId, taskOperation: 'abort' })
+ }
+
/**
* Adds the given worker node in the pool worker nodes.
* @param workerNode - The worker node.
* @param task - The task to execute.
*/
private executeTask (workerNodeKey: number, task: Task<Data>): void {
+ const { transferList } = task
this.beforeTaskExecutionHook(workerNodeKey, task)
- this.sendToWorker(workerNodeKey, task, task.transferList)
+ this.sendToWorker(workerNodeKey, task, transferList)
this.checkAndEmitTaskExecutionEvents()
}
}
}
+ private readonly getAbortError = (
+ taskName: string,
+ taskId: `${string}-${string}-${string}-${string}-${string}`
+ ): Error => {
+ const abortError = this.promiseResponseMap.get(taskId)?.abortSignal
+ ?.reason as Error | string
+ return abortError instanceof Error
+ ? abortError
+ : typeof abortError === 'string'
+ ? new Error(abortError)
+ : new Error(`Task '${taskName}' id '${taskId}' aborted`)
+ }
+
/**
* Gets task function worker choice strategy, if any.
* @param name - The task function name.
const workerNode = this.workerNodes[workerNodeKey]
if (workerError != null) {
this.emitter?.emit(PoolEvents.taskError, workerError)
- const error = this.handleWorkerError(workerError)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const error = this.handleWorkerError(taskId!, workerError)
asyncResource != null
? asyncResource.runInAsyncScope(reject, this.emitter, error)
: reject(error)
}
}
- private handleWorkerError (workerError: WorkerError): Error {
- if (workerError.error != null) {
- return workerError.error
+ private readonly handleWorkerError = (
+ taskId: `${string}-${string}-${string}-${string}-${string}`,
+ workerError: WorkerError
+ ): Error => {
+ const { aborted, error, message, name, stack } = workerError
+ if (aborted) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ return this.getAbortError(name!, taskId)
+ }
+ if (error != null) {
+ return error
}
- const error = new Error(workerError.message)
- error.stack = workerError.stack
- return error
+ const wError = new Error(message)
+ wError.stack = stack
+ return wError
}
private readonly handleWorkerNodeBackPressureEvent = (
private async internalExecute (
data?: Data,
name?: string,
+ abortSignal?: AbortSignal,
transferList?: readonly Transferable[]
): Promise<Response> {
return await new Promise<Response>((resolve, reject) => {
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode(name)
const task: Task<Data> = {
+ abortable: abortSignal != null,
data: data ?? ({} as Data),
name: name ?? DEFAULT_TASK_NAME,
priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
timestamp,
transferList,
}
+ abortSignal?.addEventListener(
+ 'abort',
+ () => {
+ this.workerNodes[workerNodeKey].emit('abortTask', {
+ taskId: task.taskId,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ workerId: this.getWorkerInfo(workerNodeKey)!.id!,
+ })
+ },
+ { once: true }
+ )
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.promiseResponseMap.set(task.taskId!, {
+ reject,
+ resolve,
+ workerNodeKey,
+ ...(this.emitter != null && {
+ asyncResource: new AsyncResource('poolifier:task', {
+ requireManualDestroy: true,
+ triggerAsyncId: this.emitter.asyncId,
+ }),
+ }),
+ abortSignal,
+ })
if (
this.opts.enableTasksQueue === false ||
(this.opts.enableTasksQueue === true &&
} else {
this.enqueueTask(workerNodeKey, task)
}
- queueMicrotask(() => {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.promiseResponseMap.set(task.taskId!, {
- reject,
- resolve,
- workerNodeKey,
- ...(this.emitter != null && {
- asyncResource: new AsyncResource('poolifier:task', {
- requireManualDestroy: true,
- triggerAsyncId: this.emitter.asyncId,
- }),
- }),
- })
- })
})
}
!sourceWorkerNode.info.ready ||
sourceWorkerNode.info.stolen ||
sourceWorkerNode.info.stealing ||
+ sourceWorkerNode.info.queuedTaskAbortion ||
!destinationWorkerNode.info.ready ||
destinationWorkerNode.info.stolen ||
- destinationWorkerNode.info.stealing
+ destinationWorkerNode.info.stealing ||
+ destinationWorkerNode.info.queuedTaskAbortion
) {
return
}
* Executes the specified function in the worker constructor with the task data input parameter.
* @param data - The optional task input data for the specified task function. This can only be structured-cloneable data.
* @param name - The optional name of the task function to execute. If not specified, the default task function will be executed.
- * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
+ * @param abortSignal - The optional AbortSignal to abort the task.
+ * @param transferList - The optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
* @returns Promise with a task function response that will be fulfilled when the task is completed.
*/
readonly execute: (
data?: Data,
name?: string,
+ abortSignal?: AbortSignal,
transferList?: readonly Transferable[]
) => Promise<Response>
/**
* Executes the specified function in the worker constructor with the tasks data iterable input parameter.
* @param data - The tasks iterable input data for the specified task function. This can only be an iterable of structured-cloneable data.
* @param name - The optional name of the task function to execute. If not specified, the default task function will be executed.
- * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
+ * @param abortSignals - The optional iterable of AbortSignal to abort the tasks iterable.
+ * @param transferList - The optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
* @returns Promise with an array of task function responses that will be fulfilled when the tasks are completed.
*/
readonly mapExecute: (
data: Iterable<Data>,
name?: string,
+ abortSignals?: Iterable<AbortSignal>,
transferList?: readonly Transferable[]
) => Promise<Response[]>
/**
continuousStealing: false,
dynamic: false,
id: getWorkerId(worker),
+ queuedTaskAbortion: false,
ready: false,
stealing: false,
stolen: false,
/** @inheritdoc */
public strategyData?: StrategyData
/** @inheritdoc */
+ public readonly tasksQueue: PriorityQueue<Task<Data>>
+ /** @inheritdoc */
public tasksQueueBackPressureSize: number
/** @inheritdoc */
public usage: WorkerUsage
public readonly worker: Worker
private setBackPressureFlag: boolean
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
- private readonly tasksQueue: PriorityQueue<Task<Data>>
/**
* Constructs a new worker node.
this.tasksQueue.clear()
}
+ /** @inheritdoc */
+ public deleteTask (task: Task<Data>): boolean {
+ return this.tasksQueue.delete(task)
+ }
+
/** @inheritdoc */
public deleteTaskFunctionWorkerUsage (name: string): boolean {
return this.taskFunctionsUsage.delete(name)
import type { MessageChannel, WorkerOptions } from 'node:worker_threads'
import type { CircularBuffer } from '../circular-buffer.js'
+import type { PriorityQueue } from '../queues/priority-queue.js'
import type { Task, TaskFunctionProperties } from '../utility-types.js'
/**
* Clears tasks queue.
*/
readonly clearTasksQueue: () => void
+ /**
+ * Deletes a task from the tasks queue.
+ * @param task - The task to delete.
+ * @returns `true` if the task was deleted, `false` otherwise.
+ */
+ readonly deleteTask: (task: Task<Data>) => boolean
/**
* Deletes task function worker usage statistics.
* @param name - The task function name.
* This is used to store data that are specific to the worker choice strategy.
*/
strategyData?: StrategyData
+ /**
+ * Tasks queue.
+ */
+ readonly tasksQueue: PriorityQueue<Task<Data>>
/**
* Tasks queue back pressure size.
* This is the number of tasks that can be enqueued before the worker node has back pressure.
* Worker id.
*/
readonly id: number | undefined
+ /**
+ * Queued task abortion flag.
+ * This flag is set to `true` when worker node is aborting a queued task.
+ */
+ queuedTaskAbortion: boolean
/**
* Ready flag.
*/
* @internal
*/
export interface WorkerNodeEventDetail {
+ taskId?: `${string}-${string}-${string}-${string}-${string}`
workerId?: number
workerNodeKey?: number
}
this.size = 0
}
+ /** @inheritdoc */
+ public delete (data: T): boolean {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ const index = this.nodeArray.findIndex(node => node?.data === data)
+ if (index !== -1) {
+ this.nodeArray.splice(index, 1)
+ this.nodeArray.length = this.capacity
+ --this.size
+ return true
+ }
+ return false
+ }
+
/** @inheritdoc */
public dequeue (): T | undefined {
if (this.empty()) {
this.maxSize = 0
}
+ /**
+ * Deletes the given data from the priority queue.
+ * @param data - Data to delete.
+ * @returns `true` if the data was deleted, `false` otherwise.
+ */
+ public delete (data: T): boolean {
+ let node: PriorityQueueNode<T> | undefined = this.tail
+ let prev: PriorityQueueNode<T> | undefined
+ while (node != null) {
+ if (node.delete(data)) {
+ if (node.empty()) {
+ if (node === this.tail && node.next != null) {
+ this.tail = node.next
+ delete node.next
+ } else if (node.next != null && prev != null) {
+ prev.next = node.next
+ delete node.next
+ } else if (node.next == null && prev != null) {
+ delete prev.next
+ this.head = prev
+ }
+ }
+ return true
+ }
+ prev = node
+ node = node.next
+ }
+ return false
+ }
+
/**
* Dequeue data from the priority queue.
* @param bucket - The prioritized bucket to dequeue from.
* Clears the fixed queue.
*/
clear: () => void
+ /**
+ * Deletes the given data from the fixed priority queue.
+ * @param data - Data to delete.
+ * @returns `true` if the data was deleted, `false` otherwise.
+ */
+ delete: (data: T) => boolean
/**
* Dequeue data from the fixed queue.
* @returns The dequeued data or `undefined` if the fixed queue is empty.
* Task functions properties.
*/
readonly taskFunctionsProperties?: TaskFunctionProperties[]
+ /**
+ * Task operation:
+ * - `'abort'` - Abort a task.
+ */
+ readonly taskOperation?: 'abort'
/**
* Task performance.
*/
* @internal
*/
export interface PromiseResponseWrapper<Response = unknown> {
+ /**
+ * The task abort signal.
+ */
+ readonly abortSignal?: AbortSignal
/**
* The asynchronous resource used to track the task execution.
*/
* @internal
*/
export interface Task<Data = unknown> {
+ /**
+ * Whether the task is abortable or not.
+ */
+ readonly abortable?: boolean
/**
* Task input data that will be passed to the worker.
*/
* @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data.
*/
export interface WorkerError<Data = unknown> {
+ /**
+ * Whether the error is an abort error or not.
+ */
+ readonly aborted: boolean
/**
* Data triggering the error.
*/
--- /dev/null
+export class AbortError extends Error {
+ public constructor (message: string) {
+ super(message)
+ this.name = 'AbortError'
+ }
+}
import type { Worker } from 'node:cluster'
import type { MessagePort } from 'node:worker_threads'
+import { EventEmitter } from 'node:events'
import { performance } from 'node:perf_hooks'
import type {
TaskFunctions,
TaskSyncFunction,
} from './task-functions.js'
+import type { AbortTaskEventDetail } from './worker-types.js'
import {
buildTaskFunctionProperties,
isAsyncFunction,
isPlainObject,
} from '../utils.js'
+import { AbortError } from './abort-error.js'
import {
checkTaskFunctionName,
checkValidTaskFunctionObjectEntry,
MainWorker extends MessagePort | Worker,
Data = unknown,
Response = unknown
-> {
+> extends EventEmitter {
/**
* Handler id of the `activeInterval` worker activity check.
*/
*/
protected statistics?: WorkerStatistics
+ /**
+ * Task abort functions processed by the worker when task operation 'abort' is received.
+ */
+ protected taskAbortFunctions: Map<
+ `${string}-${string}-${string}-${string}-${string}`,
+ () => void
+ >
+
/**
* Task function object(s) processed by the worker when the pool's `execute` method is invoked.
*/
taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS
) {
+ super()
if (this.isMain == null) {
throw new Error('isMain parameter is mandatory')
}
this.checkTaskFunctions(taskFunctions)
+ this.taskAbortFunctions = new Map<
+ `${string}-${string}-${string}-${string}-${string}`,
+ () => void
+ >()
+ this.on('abortTask', (eventDetail: AbortTaskEventDetail) => {
+ const { taskId } = eventDetail
+ if (this.taskAbortFunctions.has(taskId)) {
+ this.taskAbortFunctions.get(taskId)?.()
+ }
+ })
this.checkWorkerOptions(this.opts)
if (!this.isMain) {
// Should be once() but Node.js on windows has a bug that prevents it from working
* @returns The worker error object.
*/
protected abstract handleError (error: Error): {
+ aborted: boolean
error?: Error
message: string
stack?: string
this.sendToMainWorker({ kill: 'failure' })
}
}
+ this.removeAllListeners()
}
/**
statistics,
taskFunctionOperation,
taskId,
+ taskOperation,
} = message
if (statistics != null) {
// Statistics message received
} else if (taskId != null && data != null) {
// Task message received
this.run(message)
+ } else if (taskOperation === 'abort' && taskId != null) {
+ // Abort task operation message received
+ this.emit('abortTask', { taskId })
} else if (kill === true) {
// Kill message received
this.handleKillMessage(message)
* @param task - The task to execute.
*/
protected readonly run = (task: Task<Data>): void => {
- const { data, name, taskId } = task
+ const { abortable, data, name, taskId } = task
const taskFunctionName = name ?? DEFAULT_TASK_NAME
if (!this.taskFunctions.has(taskFunctionName)) {
this.sendToMainWorker({
})
return
}
- const fn = this.taskFunctions.get(taskFunctionName)?.taskFunction
+ let fn: TaskFunction<Data, Response>
+ if (abortable === true) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ fn = this.getAbortableTaskFunction(taskFunctionName, taskId!)
+ } else {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ fn = this.taskFunctions.get(taskFunctionName)!.taskFunction
+ }
if (isAsyncFunction(fn)) {
this.runAsync(fn as TaskAsyncFunction<Data, Response>, task)
} else {
fn: TaskAsyncFunction<Data, Response>,
task: Task<Data>
): void => {
- const { data, name, taskId } = task
+ const { abortable, data, name, taskId } = task
let taskPerformance = this.beginTaskPerformance(name)
fn(data)
.then(res => {
})
.finally(() => {
this.updateLastTaskTimestamp()
+ if (abortable === true) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.taskAbortFunctions.delete(taskId!)
+ }
})
.catch(EMPTY_FUNCTION)
}
fn: TaskSyncFunction<Data, Response>,
task: Task<Data>
): void => {
- const { data, name, taskId } = task
+ const { abortable, data, name, taskId } = task
try {
let taskPerformance = this.beginTaskPerformance(name)
const res = fn(data)
})
} finally {
this.updateLastTaskTimestamp()
+ if (abortable === true) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.taskAbortFunctions.delete(taskId!)
+ }
}
}
}
}
+ /**
+ * Gets abortable task function.
+ * An abortable promise is built to permit the task to be aborted.
+ * @param name - The name of the task.
+ * @param taskId - The task id.
+ * @returns The abortable task function.
+ */
+ private getAbortableTaskFunction (
+ name: string,
+ taskId: `${string}-${string}-${string}-${string}-${string}`
+ ): TaskAsyncFunction<Data, Response> {
+ return async (data?: Data): Promise<Response> =>
+ await new Promise<Response>(
+ (resolve, reject: (reason?: unknown) => void) => {
+ this.taskAbortFunctions.set(taskId, () => {
+ reject(new AbortError(`Task '${name}' id '${taskId}' aborted`))
+ })
+ const taskFunction = this.taskFunctions.get(name)?.taskFunction
+ if (isAsyncFunction(taskFunction)) {
+ ;(taskFunction as TaskAsyncFunction<Data, Response>)(data)
+ .then(resolve)
+ .catch(reject)
+ } else {
+ resolve((taskFunction as TaskSyncFunction<Data, Response>)(data))
+ }
+ }
+ )
+ }
+
/**
* Starts the worker check active interval.
*/
import type { TaskFunction, TaskFunctions } from './task-functions.js'
import type { WorkerOptions } from './worker-options.js'
+import { AbortError } from './abort-error.js'
import { AbstractWorker } from './abstract-worker.js'
/**
/**
* @inheritDoc
*/
- protected handleError (error: Error): { message: string; stack?: string } {
- return { message: error.message, stack: error.stack }
+ protected handleError (error: Error): {
+ aborted: boolean
+ message: string
+ stack?: string
+ } {
+ return {
+ aborted: error instanceof AbortError,
+ message: error.message,
+ stack: error.stack,
+ }
}
/** @inheritDoc */
import type { TaskFunction, TaskFunctions } from './task-functions.js'
import type { WorkerOptions } from './worker-options.js'
+import { AbortError } from './abort-error.js'
import { AbstractWorker } from './abstract-worker.js'
/**
* @inheritDoc
*/
protected handleError (error: Error): {
+ aborted: boolean
error: Error
message: string
stack?: string
} {
- return { error, message: error.message, stack: error.stack }
+ return {
+ aborted: error instanceof AbortError,
+ error,
+ message: error.message,
+ stack: error.stack,
+ }
}
/** @inheritDoc */
--- /dev/null
+export interface AbortTaskEventDetail {
+ taskId: `${string}-${string}-${string}-${string}-${string}`
+}
continuousStealing: false,
dynamic: false,
id: expect.any(Number),
+ queuedTaskAbortion: false,
ready: true,
stealing: false,
stolen: false,
continuousStealing: false,
dynamic: false,
id: expect.any(Number),
+ queuedTaskAbortion: false,
ready: true,
stealing: false,
stolen: false,
new TypeError('name argument must not be an empty string')
)
await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
- new TypeError('transferList argument must be an array')
+ new TypeError('abortSignal argument must be an AbortSignal')
)
+ await expect(
+ pool.execute(undefined, undefined, new AbortController().signal, {})
+ ).rejects.toThrow(new TypeError('transferList argument must be an array'))
await expect(pool.execute(undefined, 'unknown')).rejects.toThrow(
new Error("Task function 'unknown' not found")
)
await expect(pool.mapExecute([undefined], '')).rejects.toThrow(
new TypeError('name argument must not be an empty string')
)
- await expect(pool.mapExecute([undefined], undefined, {})).rejects.toThrow(
- new TypeError('transferList argument must be an array')
+ await expect(pool.mapExecute([undefined], undefined, 0)).rejects.toThrow(
+ new TypeError('abortSignals argument must be an iterable')
+ )
+ await expect(
+ pool.mapExecute([undefined], undefined, [undefined])
+ ).rejects.toThrow(
+ new TypeError('abortSignals argument must be an iterable of AbortSignal')
+ )
+ await expect(
+ pool.mapExecute([undefined], undefined, [
+ new AbortController().signal,
+ new AbortController().signal,
+ ])
+ ).rejects.toThrow(
+ new Error('data and abortSignals arguments must have the same length')
)
+ await expect(
+ pool.mapExecute(
+ [undefined],
+ undefined,
+ [new AbortController().signal],
+ {}
+ )
+ ).rejects.toThrow(new TypeError('transferList argument must be an array'))
await expect(pool.mapExecute([undefined], 'unknown')).rejects.toThrow(
new Error("Task function 'unknown' not found")
)
expect(inError.message).toStrictEqual('Error Message from ClusterWorker')
expect(typeof inError.stack === 'string').toBe(true)
expect(taskError).toStrictEqual({
+ aborted: false,
data,
message: inError.message,
name: DEFAULT_TASK_NAME,
)
expect(typeof inError.stack === 'string').toBe(true)
expect(taskError).toStrictEqual({
+ aborted: false,
data,
message: inError.message,
name: DEFAULT_TASK_NAME,
expect(usedTime).toBeGreaterThanOrEqual(2000)
})
+ it('Verify that task can be aborted', async () => {
+ let error
+
+ try {
+ await asyncErrorPool.execute({}, 'default', AbortSignal.timeout(500))
+ } catch (e) {
+ error = e
+ }
+ expect(error).toBeInstanceOf(Error)
+ expect(error.name).toBe('TimeoutError')
+ expect(error.message).toBe('The operation was aborted due to timeout')
+ expect(error.stack).toBeDefined()
+
+ const abortController = new AbortController()
+ setTimeout(() => {
+ abortController.abort(new Error('Task aborted'))
+ }, 500)
+ try {
+ await asyncErrorPool.execute({}, 'default', abortController.signal)
+ } catch (e) {
+ error = e
+ }
+ expect(error).toBeInstanceOf(Error)
+ expect(error.message).toBe('Task aborted')
+ expect(error.stack).toBeDefined()
+ })
+
it('Shutdown test', async () => {
const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
expect(pool.emitter.eventNames()).toStrictEqual([])
let error
let result
try {
- result = await pool.execute(undefined, undefined, [
+ result = await pool.execute(undefined, undefined, undefined, [
new ArrayBuffer(16),
new MessageChannel().port1,
])
expect(result).toStrictEqual({ ok: 1 })
expect(error).toBeUndefined()
try {
- result = await pool.execute(undefined, undefined, [
+ result = await pool.execute(undefined, undefined, undefined, [
new SharedArrayBuffer(16),
])
} catch (e) {
expect(inError.message).toStrictEqual('Error Message from ThreadWorker')
expect(typeof inError.stack === 'string').toBe(true)
expect(taskError).toStrictEqual({
+ aborted: false,
data,
error: inError,
message: inError.message,
)
expect(typeof inError.stack === 'string').toBe(true)
expect(taskError).toStrictEqual({
+ aborted: false,
data,
error: inError,
message: inError.message,
expect(usedTime).toBeGreaterThanOrEqual(2000)
})
+ it('Verify that task can be aborted', async () => {
+ let error
+
+ try {
+ await asyncErrorPool.execute({}, 'default', AbortSignal.timeout(500))
+ } catch (e) {
+ error = e
+ }
+ expect(error).toBeInstanceOf(Error)
+ expect(error.name).toBe('TimeoutError')
+ expect(error.message).toBe('The operation was aborted due to timeout')
+ expect(error.stack).toBeDefined()
+
+ const abortController = new AbortController()
+ setTimeout(() => {
+ abortController.abort(new Error('Task aborted'))
+ }, 500)
+ try {
+ await asyncErrorPool.execute({}, 'default', abortController.signal)
+ } catch (e) {
+ error = e
+ }
+ expect(error).toBeInstanceOf(Error)
+ expect(error.message).toBe('Task aborted')
+ expect(error.stack).toBeDefined()
+ })
+
it('Shutdown test', async () => {
const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
expect(pool.emitter.eventNames()).toStrictEqual([])
continuousStealing: false,
dynamic: false,
id: threadWorker.threadId,
+ queuedTaskAbortion: false,
ready: false,
stealing: false,
stolen: false,
continuousStealing: false,
dynamic: false,
id: clusterWorker.id,
+ queuedTaskAbortion: false,
ready: false,
stealing: false,
stolen: false,
continuousStealing: false,
dynamic: false,
id: threadWorkerNode.worker.threadId,
+ queuedTaskAbortion: false,
ready: false,
stealing: false,
stolen: false,
continuousStealing: false,
dynamic: false,
id: clusterWorkerNode.worker.id,
+ queuedTaskAbortion: false,
ready: false,
stealing: false,
stolen: false,
expect(fixedPriorityQueue.capacity).toBe(queueSize)
})
+ it('Verify delete() behavior', () => {
+ const fixedPriorityQueue = new FixedPriorityQueue()
+ fixedPriorityQueue.enqueue(1)
+ fixedPriorityQueue.enqueue(2, -1)
+ fixedPriorityQueue.enqueue(3)
+ expect(fixedPriorityQueue.start).toBe(0)
+ expect(fixedPriorityQueue.size).toBe(3)
+ expect(fixedPriorityQueue.nodeArray).toMatchObject([
+ { data: 2, priority: -1 },
+ { data: 1, priority: 0 },
+ { data: 3, priority: 0 },
+ ])
+ expect(fixedPriorityQueue.delete(2)).toBe(true)
+ expect(fixedPriorityQueue.start).toBe(0)
+ expect(fixedPriorityQueue.size).toBe(2)
+ expect(fixedPriorityQueue.nodeArray).toMatchObject([
+ { data: 1, priority: 0 },
+ { data: 3, priority: 0 },
+ ])
+ expect(fixedPriorityQueue.delete(3)).toBe(true)
+ expect(fixedPriorityQueue.start).toBe(0)
+ expect(fixedPriorityQueue.size).toBe(1)
+ expect(fixedPriorityQueue.nodeArray).toMatchObject([
+ { data: 1, priority: 0 },
+ ])
+ expect(fixedPriorityQueue.delete(1)).toBe(true)
+ expect(fixedPriorityQueue.start).toBe(0)
+ expect(fixedPriorityQueue.size).toBe(0)
+ expect(fixedPriorityQueue.nodeArray).toMatchObject([])
+ expect(fixedPriorityQueue.delete(2)).toBe(false)
+ expect(fixedPriorityQueue.start).toBe(0)
+ expect(fixedPriorityQueue.size).toBe(0)
+ expect(fixedPriorityQueue.nodeArray).toMatchObject([])
+ })
+
it('Verify iterator behavior', () => {
const fixedPriorityQueue = new FixedPriorityQueue()
fixedPriorityQueue.enqueue(1)
expect(fixedQueue.capacity).toBe(queueSize)
})
+ it('Verify delete() behavior', () => {
+ const fixedQueue = new FixedQueue()
+ fixedQueue.enqueue(1)
+ fixedQueue.enqueue(2, -1)
+ fixedQueue.enqueue(3)
+ expect(fixedQueue.start).toBe(0)
+ expect(fixedQueue.size).toBe(3)
+ expect(fixedQueue.nodeArray).toMatchObject([
+ { data: 1, priority: 0 },
+ { data: 2, priority: -1 },
+ { data: 3, priority: 0 },
+ ])
+ expect(fixedQueue.delete(2)).toBe(true)
+ expect(fixedQueue.start).toBe(0)
+ expect(fixedQueue.size).toBe(2)
+ expect(fixedQueue.nodeArray).toMatchObject([
+ { data: 1, priority: 0 },
+ { data: 3, priority: 0 },
+ ])
+ expect(fixedQueue.delete(3)).toBe(true)
+ expect(fixedQueue.start).toBe(0)
+ expect(fixedQueue.size).toBe(1)
+ expect(fixedQueue.nodeArray).toMatchObject([{ data: 1, priority: 0 }])
+ expect(fixedQueue.delete(1)).toBe(true)
+ expect(fixedQueue.start).toBe(0)
+ expect(fixedQueue.size).toBe(0)
+ expect(fixedQueue.nodeArray).toMatchObject([])
+ expect(fixedQueue.delete(2)).toBe(false)
+ expect(fixedQueue.start).toBe(0)
+ expect(fixedQueue.size).toBe(0)
+ expect(fixedQueue.nodeArray).toMatchObject([])
+ })
+
it('Verify iterator behavior', () => {
const fixedQueue = new FixedQueue()
fixedQueue.enqueue(1)
expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
})
+ it('Verify default bucket size delete() behavior', () => {
+ const priorityQueue = new PriorityQueue(defaultBucketSize, true)
+ priorityQueue.enqueue(1)
+ priorityQueue.enqueue(2)
+ priorityQueue.enqueue(3)
+ expect(priorityQueue.buckets).toBe(0)
+ expect(priorityQueue.size).toBe(3)
+ expect(priorityQueue.maxSize).toBe(3)
+ expect(priorityQueue.tail.empty()).toBe(false)
+ expect(priorityQueue.tail.next).toBe(undefined)
+ expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+ expect(priorityQueue.delete(2)).toBe(true)
+ expect(priorityQueue.buckets).toBe(0)
+ expect(priorityQueue.size).toBe(2)
+ expect(priorityQueue.maxSize).toBe(3)
+ expect(priorityQueue.tail.empty()).toBe(false)
+ expect(priorityQueue.tail.next).toBe(undefined)
+ expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+ expect(priorityQueue.delete(3)).toBe(true)
+ expect(priorityQueue.buckets).toBe(0)
+ expect(priorityQueue.size).toBe(1)
+ expect(priorityQueue.maxSize).toBe(3)
+ expect(priorityQueue.tail.empty()).toBe(false)
+ expect(priorityQueue.tail.next).toBe(undefined)
+ expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+ expect(priorityQueue.delete(1)).toBe(true)
+ expect(priorityQueue.buckets).toBe(0)
+ expect(priorityQueue.size).toBe(0)
+ expect(priorityQueue.maxSize).toBe(3)
+ expect(priorityQueue.tail.empty()).toBe(true)
+ expect(priorityQueue.tail.next).toBe(undefined)
+ expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+ expect(priorityQueue.delete(2)).toBe(false)
+ expect(priorityQueue.buckets).toBe(0)
+ expect(priorityQueue.size).toBe(0)
+ expect(priorityQueue.maxSize).toBe(3)
+ expect(priorityQueue.tail.empty()).toBe(true)
+ expect(priorityQueue.tail.next).toBe(undefined)
+ expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+ })
+
+ it('Verify bucketSize=2 delete() behavior', () => {
+ const priorityQueue = new PriorityQueue(2, true)
+ priorityQueue.enqueue(1)
+ priorityQueue.enqueue(2)
+ priorityQueue.enqueue(3)
+ priorityQueue.enqueue(3, -1)
+ priorityQueue.enqueue(1, 1)
+ priorityQueue.enqueue(3, -2)
+ expect(priorityQueue.buckets).toBe(3)
+ expect(priorityQueue.size).toBe(6)
+ expect(priorityQueue.maxSize).toBe(6)
+ expect(priorityQueue.tail.empty()).toBe(false)
+ expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
+ expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head)
+ expect(priorityQueue.delete(2)).toBe(true)
+ expect(priorityQueue.buckets).toBe(2)
+ expect(priorityQueue.size).toBe(5)
+ expect(priorityQueue.maxSize).toBe(6)
+ expect(priorityQueue.tail.empty()).toBe(false)
+ expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
+ expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head)
+ expect(priorityQueue.delete(3)).toBe(true)
+ expect(priorityQueue.buckets).toBe(2)
+ expect(priorityQueue.size).toBe(4)
+ expect(priorityQueue.maxSize).toBe(6)
+ expect(priorityQueue.tail.empty()).toBe(false)
+ expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
+ expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head)
+ expect(priorityQueue.delete(1)).toBe(true)
+ expect(priorityQueue.buckets).toBe(1)
+ expect(priorityQueue.size).toBe(3)
+ expect(priorityQueue.maxSize).toBe(6)
+ expect(priorityQueue.tail.empty()).toBe(false)
+ expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
+ expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head)
+ expect(priorityQueue.delete(1)).toBe(true)
+ expect(priorityQueue.buckets).toBe(1)
+ expect(priorityQueue.size).toBe(2)
+ expect(priorityQueue.maxSize).toBe(6)
+ expect(priorityQueue.tail.empty()).toBe(false)
+ expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
+ expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head)
+ expect(priorityQueue.delete(3)).toBe(true)
+ expect(priorityQueue.buckets).toBe(0)
+ expect(priorityQueue.size).toBe(1)
+ expect(priorityQueue.maxSize).toBe(6)
+ expect(priorityQueue.tail.empty()).toBe(false)
+ expect(priorityQueue.tail.next).toBe(undefined)
+ expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+ expect(priorityQueue.delete(2)).toBe(false)
+ expect(priorityQueue.buckets).toBe(0)
+ expect(priorityQueue.size).toBe(1)
+ expect(priorityQueue.maxSize).toBe(6)
+ expect(priorityQueue.tail.empty()).toBe(false)
+ expect(priorityQueue.tail.next).toBe(undefined)
+ expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+ expect(priorityQueue.delete(3)).toBe(true)
+ expect(priorityQueue.buckets).toBe(0)
+ expect(priorityQueue.size).toBe(0)
+ expect(priorityQueue.maxSize).toBe(6)
+ expect(priorityQueue.tail.empty()).toBe(true)
+ expect(priorityQueue.tail.next).toBe(undefined)
+ expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+ })
+
it('Verify enablePriority setter behavior', () => {
const priorityQueue = new PriorityQueue(2)
expect(priorityQueue.enablePriority).toBe(false)
--- /dev/null
+import { expect } from '@std/expect'
+
+import { AbortError } from '../../lib/worker/abort-error.cjs'
+
+describe('Abort error test suite', () => {
+ it('Verify constructor() behavior', () => {
+ const errorMessage = 'This is an abort error message'
+ const abortError = new AbortError(errorMessage)
+
+ expect(abortError).toBeInstanceOf(AbortError)
+ expect(abortError).toBeInstanceOf(Error)
+ expect(abortError.name).toBe('AbortError')
+ expect(abortError.message).toBe(errorMessage)
+ expect(abortError.stack).toBeDefined()
+ })
+})
const error = new Error('Error as an error')
const worker = new ClusterWorker(() => {})
expect(worker.handleError(error)).toStrictEqual({
+ aborted: false,
message: error.message,
stack: error.stack,
})
const error = new Error('Error as an error')
const worker = new ThreadWorker(() => {})
expect(worker.handleError(error)).toStrictEqual({
+ aborted: false,
error,
message: error.message,
stack: error.stack,