From 729c563db85562dd7d0f7733b1a3e2d98467134b Mon Sep 17 00:00:00 2001 From: Shinigami Date: Sat, 13 Feb 2021 11:42:48 +0100 Subject: [PATCH] Improve JSDoc comments (#130) Co-authored-by: ST-DDT --- src/pools/abstract-pool.ts | 113 +++++++++++++++++++++++++++++++--- src/pools/cluster/dynamic.ts | 30 ++++++--- src/pools/cluster/fixed.ts | 18 ++++-- src/pools/pool.ts | 15 +++++ src/pools/thread/dynamic.ts | 26 +++++--- src/pools/thread/fixed.ts | 20 ++++-- src/utility-types.ts | 35 +++++++++++ src/worker/abstract-worker.ts | 54 +++++++++++++++- src/worker/cluster-worker.ts | 18 +++++- src/worker/thread-worker.ts | 21 ++++++- src/worker/worker-options.ts | 9 ++- 11 files changed, 314 insertions(+), 45 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1ccaca8b..0763f04f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,16 +2,33 @@ import EventEmitter from 'events' import type { MessageValue } from '../utility-types' import type { IPool } from './pool' +/** + * Callback invoked if the worker raised an error. + */ export type ErrorHandler = (this: Worker, e: Error) => void + +/** + * Callback invoked when the worker has started successfully. + */ export type OnlineHandler = (this: Worker) => void + +/** + * Callback invoked when the worker exits successfully. + */ export type ExitHandler = (this: Worker, code: number) => void +/** + * Basic interface that describes the minimum required implementation of listener events for a pool-worker. + */ export interface IWorker { on(event: 'error', handler: ErrorHandler): void on(event: 'online', handler: OnlineHandler): void on(event: 'exit', handler: ExitHandler): void } +/** + * Options for a poolifier pool. + */ export interface PoolOptions { /** * A function that will listen for error event on each worker. @@ -26,32 +43,70 @@ export interface PoolOptions { */ exitHandler?: ExitHandler /** - * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters). + * This is just to avoid non-useful warning messages. + * + * Will be used to set `maxListeners` on event emitters (workers are event emitters). * * @default 1000 + * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n) */ maxTasks?: number } +/** + * Internal poolifier pool emitter. + */ class PoolEmitter extends EventEmitter {} +/** + * Base class containing some shared logic for all poolifier pools. + * + * @template Worker Type of worker which manages this pool. + * @template Data Type of data sent to the worker. + * @template Response Type of response of execution. + */ export abstract class AbstractPool< Worker extends IWorker, Data = unknown, Response = unknown > implements IPool { + /** + * List of currently available workers. + */ public readonly workers: Worker[] = [] + + /** + * ID for the next worker. + */ public nextWorker: number = 0 /** - * `workerId` as key and an integer value + * - `key`: The `Worker` + * - `value`: Number of tasks that has been assigned to that worker since it started */ public readonly tasks: Map = new Map() + /** + * Emitter on which events can be listened to. + * + * Events that can currently be listened to: + * + * - `'FullPool'` + */ public readonly emitter: PoolEmitter + /** + * ID of the next message. + */ protected id: number = 0 + /** + * Constructs a new poolifier pool. + * + * @param numWorkers Number of workers that this pool should manage. + * @param filePath Path to the worker-file. + * @param opts Options for the pool. Default: `{ maxTasks: 1000 }` + */ public constructor ( public readonly numWorkers: number, public readonly filePath: string, @@ -74,10 +129,17 @@ export abstract class AbstractPool< this.emitter = new PoolEmitter() } + /** + * Setup hook that can be overridden by a Poolifer pool implementation + * to run code before workers are created in the abstract constructor. + */ protected setupHook (): void { // Can be overridden } + /** + * Should return whether the worker is the main worker or not. + */ protected abstract isMain (): boolean public async destroy (): Promise { @@ -86,13 +148,29 @@ export abstract class AbstractPool< } } + /** + * Shut down given worker. + * + * @param worker A worker within `workers`. + */ protected abstract destroyWorker (worker: Worker): void | Promise + /** + * Send a message to the given worker. + * + * @param worker The worker which should receive the message. + * @param message The message. + */ protected abstract sendToWorker ( worker: Worker, message: MessageValue ): void + /** + * Adds the given worker to the pool. + * + * @param worker Worker that will be added. + */ protected addWorker (worker: Worker): void { const previousWorkerIndex = this.tasks.get(worker) if (previousWorkerIndex !== undefined) { @@ -102,6 +180,11 @@ export abstract class AbstractPool< } } + /** + * Removes the given worker from the pool. + * + * @param worker Worker that will be removed. + */ protected removeWorker (worker: Worker): void { // Clean worker from data structure const workerIndex = this.workers.indexOf(worker) @@ -109,14 +192,8 @@ export abstract class AbstractPool< this.tasks.delete(worker) } - /** - * Execute the task specified into the constructor with the data parameter. - * - * @param data The input for the task specified. - * @returns Promise that is resolved when the task is done. - */ public execute (data: Data): Promise { - // configure worker to handle message with the specified task + // Configure worker to handle message with the specified task const worker = this.chooseWorker() this.addWorker(worker) const id = ++this.id @@ -149,16 +226,34 @@ export abstract class AbstractPool< }) } + /** + * Choose a worker for the next task. + * + * The default implementation uses a round robin algorithm to distribute the load. + */ protected chooseWorker (): Worker { this.nextWorker = this.nextWorker === this.workers.length - 1 ? 0 : this.nextWorker + 1 return this.workers[this.nextWorker] } + /** + * Returns a newly created worker. + */ protected abstract newWorker (): Worker + /** + * Function that can be hooked up when a worker has been newly created and moved to the workers registry. + * + * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default. + * + * @param worker The newly created worker. + */ protected abstract afterNewWorkerPushed (worker: Worker): void + /** + * Creates a new worker for this pool and sets it up completely. + */ protected internalNewWorker (): Worker { const worker: Worker = this.newWorker() worker.on('error', this.opts.errorHandler ?? (() => {})) diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 25940ad8..99fa828d 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -4,10 +4,13 @@ import type { ClusterPoolOptions } from './fixed' import { FixedClusterPool } from './fixed' /** - * A cluster pool with a min/max number of workers, is possible to execute tasks in sync or async mode as you prefer. + * A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers. * - * This cluster pool will create new workers when the other ones are busy, until the max number of workers, - * when the max number of workers is reached, an event will be emitted, if you want to listen this event use the emitter method. + * This cluster pool creates new workers when the others are busy, up to the maximum number of workers. + * When the maximum number of workers is reached, an event is emitted. If you want to listen to this event, use the pool's `emitter`. + * + * @template Data Type of data sent to the worker. + * @template Response Type of response of execution. * * @author [Christopher Quadflieg](https://github.com/Shinigami92) * @since 2.0.0 @@ -17,10 +20,12 @@ export class DynamicClusterPool< Response extends JSONValue = JSONValue > extends FixedClusterPool { /** - * @param min Min number of workers that will be always active - * @param max Max number of workers that will be active - * @param filename A file path with implementation of `ClusterWorker` class, relative path is fine. - * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` + * Constructs a new poolifier dynamic cluster pool. + * + * @param min Minimum number of workers which are always active. + * @param max Maximum number of workers that can be created by this pool. + * @param filename Path to an implementation of a `ClusterWorker` file, which can be relative or absolute. + * @param opts Options for this fixed cluster pool. Default: `{ maxTasks: 1000 }` */ public constructor ( min: number, @@ -31,6 +36,13 @@ export class DynamicClusterPool< super(min, filename, opts) } + /** + * Choose a worker for the next task. + * + * It will first check for and return an idle worker. + * If all workers are busy, then it will try to create a new one up to the `max` worker count. + * If the max worker count is reached, the emitter will emit a `FullPool` event and it will fall back to using a round robin algorithm to distribute the load. + */ protected chooseWorker (): Worker { let worker: Worker | undefined for (const entry of this.tasks) { @@ -41,14 +53,14 @@ export class DynamicClusterPool< } if (worker) { - // a worker is free, use it + // A worker is free, use it return worker } else { if (this.workers.length === this.max) { this.emitter.emit('FullPool') return super.chooseWorker() } - // all workers are busy create a new worker + // All workers are busy, create a new worker const worker = this.internalNewWorker() worker.on('message', (message: MessageValue) => { if (message.kill) { diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 65f6ce54..f17deed7 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -3,6 +3,9 @@ import type { JSONValue, MessageValue } from '../../utility-types' import type { PoolOptions } from '../abstract-pool' import { AbstractPool } from '../abstract-pool' +/** + * Options for a poolifier cluster pool. + */ export interface ClusterPoolOptions extends PoolOptions { /** * Key/value pairs to add to worker process environment. @@ -14,9 +17,14 @@ export interface ClusterPoolOptions extends PoolOptions { } /** - * A cluster pool with a static number of workers, is possible to execute tasks in sync or async mode as you prefer. + * A cluster pool with a fixed number of workers. + * + * It is possible to perform tasks in sync or asynchronous mode as you prefer. * - * This pool will select the worker in a round robin fashion. + * This pool selects the workers in a round robin fashion. + * + * @template Data Type of data sent to the worker. + * @template Response Type of response of execution. * * @author [Christopher Quadflieg](https://github.com/Shinigami92) * @since 2.0.0 @@ -26,9 +34,11 @@ export class FixedClusterPool< Response extends JSONValue = JSONValue > extends AbstractPool { /** + * Constructs a new poolifier fixed cluster pool. + * * @param numWorkers Number of workers for this pool. - * @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine. - * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` + * @param filePath Path to an implementation of a `ClusterWorker` file, which can be relative or absolute. + * @param opts Options for this fixed cluster pool. Default: `{ maxTasks: 1000 }` */ public constructor ( numWorkers: number, diff --git a/src/pools/pool.ts b/src/pools/pool.ts index e8f391cc..55e15792 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -1,4 +1,19 @@ +/** + * Contract definition for a poolifier pool. + * + * @template Data Type of data sent to the worker. + * @template Response Type of response of execution. + */ export interface IPool { + /** + * Shut down every current worker in this pool. + */ destroy(): Promise + /** + * Perform the task specified in the constructor with the data parameter. + * + * @param data The input for the specified task. + * @returns Promise that will be resolved when the task is successfully completed. + */ execute(data: Data): Promise } diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index b63d14e2..77f0a2d5 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -4,10 +4,13 @@ import type { ThreadWorkerWithMessageChannel } from './fixed' import { FixedThreadPool } from './fixed' /** - * A thread pool with a min/max number of threads, is possible to execute tasks in sync or async mode as you prefer. + * A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads. * - * This thread pool will create new workers when the other ones are busy, until the max number of threads, - * when the max number of threads is reached, an event will be emitted, if you want to listen this event use the emitter method. + * This thread pool creates new threads when the others are busy, up to the maximum number of threads. + * When the maximum number of threads is reached, an event is emitted. If you want to listen to this event, use the pool's `emitter`. + * + * @template Data Type of data sent to the worker. + * @template Response Type of response of execution. * * @author [Alessandro Pio Ardizio](https://github.com/pioardi) * @since 0.0.1 @@ -17,10 +20,12 @@ export class DynamicThreadPool< Response extends JSONValue = JSONValue > extends FixedThreadPool { /** - * @param min Min number of threads that will be always active - * @param max Max number of threads that will be active - * @param filename A file path with implementation of `ThreadWorker` class, relative path is fine. - * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` + * Constructs a new poolifier dynamic thread pool. + * + * @param min Minimum number of threads which are always active. + * @param max Maximum number of threads that can be created by this pool. + * @param filename Path to an implementation of a `ThreadWorker` file, which can be relative or absolute. + * @param opts Options for this fixed thread pool. Default: `{ maxTasks: 1000 }` */ public constructor ( min: number, @@ -31,6 +36,13 @@ export class DynamicThreadPool< super(min, filename, opts) } + /** + * Choose a thread for the next task. + * + * It will first check for and return an idle thread. + * If all threads are busy, then it will try to create a new one up to the `max` thread count. + * If the max thread count is reached, the emitter will emit a `FullPool` event and it will fall back to using a round robin algorithm to distribute the load. + */ protected chooseWorker (): ThreadWorkerWithMessageChannel { let worker: ThreadWorkerWithMessageChannel | undefined for (const entry of this.tasks) { diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 6cf8b526..7ef5f84a 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -3,12 +3,20 @@ import type { Draft, JSONValue, MessageValue } from '../../utility-types' import type { PoolOptions } from '../abstract-pool' import { AbstractPool } from '../abstract-pool' +/** + * A thread worker with message channels for communication between main thread and thread worker. + */ export type ThreadWorkerWithMessageChannel = Worker & Draft /** - * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer. + * A thread pool with a fixed number of threads. + * + * It is possible to perform tasks in sync or asynchronous mode as you prefer. + * + * This pool selects the threads in a round robin fashion. * - * This pool will select the worker thread in a round robin fashion. + * @template Data Type of data sent to the worker. + * @template Response Type of response of execution. * * @author [Alessandro Pio Ardizio](https://github.com/pioardi) * @since 0.0.1 @@ -18,9 +26,11 @@ export class FixedThreadPool< Response extends JSONValue = JSONValue > extends AbstractPool { /** - * @param numThreads Num of threads for this worker pool. - * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine. - * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` + * Constructs a new poolifier fixed thread pool. + * + * @param numThreads Number of threads for this pool. + * @param filePath Path to an implementation of a `ThreadWorker` file, which can be relative or absolute. + * @param opts Options for this fixed thread pool. Default: `{ maxTasks: 1000 }` */ public constructor ( numThreads: number, diff --git a/src/utility-types.ts b/src/utility-types.ts index e177adae..aa4d058c 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,15 +1,50 @@ +/** + * Make all properties in T non-readonly + */ export type Draft = { -readonly [P in keyof T]?: T[P] } +/** + * Serializable primitive JSON value. + */ export type JSONPrimitive = number | boolean | string | null +/** + * Serializable JSON value. + */ // eslint-disable-next-line no-use-before-define export type JSONValue = JSONPrimitive | JSONArray | JSONObject +/** + * Serializable JSON object. + */ export type JSONObject = { [k: string]: JSONValue } +/** + * Serializable JSON array. + */ export type JSONArray = Array +/** + * Message object that is passed between worker and main worker. + */ export interface MessageValue { + /** + * Input data that will be passed to the worker. + */ readonly data?: Data + /** + * ID of the message. + */ readonly id?: number + /** + * Kill code. + */ readonly kill?: number + /** + * Error. + */ readonly error?: string + /** + * Reference to main worker. + * + * _Only for internal use_ + */ readonly parent?: MessagePort } diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 1ae7b65c..fd163547 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -2,22 +2,42 @@ import { AsyncResource } from 'async_hooks' import type { MessageValue } from '../utility-types' import type { WorkerOptions } from './worker-options' +/** + * Base class containing some shared logic for all poolifier workers. + * + * @template MainWorker Type of main worker. + * @template Data Type of data this worker receives from pool's execution. + * @template Response Type of response the worker sends back to the main worker. + */ export abstract class AbstractWorker< MainWorker, Data = unknown, Response = unknown > extends AsyncResource { + /** + * The maximum time to keep this worker alive while idle. The pool automatically checks and terminates this worker when the time expires. + */ protected readonly maxInactiveTime: number + /** + * Whether the worker is working asynchronously or not. + */ protected readonly async: boolean + /** + * Timestamp of the last task processed by this worker. + */ protected lastTask: number + /** + * Handler ID of the `interval` alive check. + */ protected readonly interval?: NodeJS.Timeout /** + * Constructs a new poolifier worker. * * @param type The type of async event. - * @param isMain - * @param fn - * @param opts + * @param isMain Whether this is the main worker or not. + * @param fn Function processed by the worker when the pool's `execution` function is invoked. + * @param opts Options for the worker. */ public constructor ( type: string, @@ -41,20 +61,42 @@ export abstract class AbstractWorker< } } + /** + * Returns the main worker. + */ protected abstract getMainWorker (): MainWorker + /** + * Send a message to the main worker. + * + * @param message The response message. + */ protected abstract sendToMainWorker (message: MessageValue): void + /** + * Check to see if the worker should be terminated, because its living too long. + */ protected checkAlive (): void { if (Date.now() - this.lastTask > this.maxInactiveTime) { this.sendToMainWorker({ kill: 1 }) } } + /** + * Handle an error and convert it to a string so it can be sent back to the main worker. + * + * @param e The error raised by the worker. + */ protected handleError (e: Error | string): string { return (e as unknown) as string } + /** + * Run the given function synchronously. + * + * @param fn Function that will be executed. + * @param value Input data for the given function. + */ protected run ( fn: (data?: Data) => Response, value: MessageValue @@ -70,6 +112,12 @@ export abstract class AbstractWorker< } } + /** + * Run the given function asynchronously. + * + * @param fn Function that will be executed. + * @param value Input data for the given function. + */ protected runAsync ( fn: (data?: Data) => Promise, value: MessageValue diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 4deeb9ae..9402ec10 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -5,10 +5,16 @@ import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' /** - * An example worker that will be always alive, you just need to **extend** this class if you want a static pool. + * A cluster worker used by a poolifier `ClusterPool`. * - * When this worker is inactive for more than 1 minute, it will send this info to the main worker, - * if you are using DynamicClusterPool, the workers created after will be killed, the min num of worker will be guaranteed. + * When this worker is inactive for more than the given `maxInactiveTime`, + * it will send a termination request to its main worker. + * + * If you use a `DynamicClusterPool` the extra workers that were created will be terminated, + * but the minimum number of workers will be guaranteed. + * + * @template Data Type of data this worker receives from pool's execution. + * @template Response Type of response the worker sends back to the main worker. * * @author [Christopher Quadflieg](https://github.com/Shinigami92) * @since 2.0.0 @@ -17,6 +23,12 @@ export class ClusterWorker< Data extends JSONValue = JSONValue, Response extends JSONValue = JSONValue > extends AbstractWorker { + /** + * Constructs a new poolifier cluster worker. + * + * @param fn Function processed by the worker when the pool's `execution` function is invoked. + * @param opts Options for the worker. + */ public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) { super('worker-cluster-pool:pioardi', isMaster, fn, opts) diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index a6c9ec16..b6fbdae2 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -4,10 +4,16 @@ import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' /** - * An example worker that will be always alive, you just need to **extend** this class if you want a static pool. + * A thread worker used by a poolifier `ThreadPool`. * - * When this worker is inactive for more than 1 minute, it will send this info to the main thread, - * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed. + * When this worker is inactive for more than the given `maxInactiveTime`, + * it will send a termination request to its main thread. + * + * If you use a `DynamicThreadPool` the extra workers that were created will be terminated, + * but the minimum number of workers will be guaranteed. + * + * @template Data Type of data this worker receives from pool's execution. + * @template Response Type of response the worker sends back to the main thread. * * @author [Alessandro Pio Ardizio](https://github.com/pioardi) * @since 0.0.1 @@ -16,8 +22,17 @@ export class ThreadWorker< Data extends JSONValue = JSONValue, Response extends JSONValue = JSONValue > extends AbstractWorker { + /** + * Reference to main thread. + */ protected parent?: MessagePort + /** + * Constructs a new poolifier thread worker. + * + * @param fn Function processed by the worker when the pool's `execution` function is invoked. + * @param opts Options for the worker. + */ public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) { super('worker-thread-pool:pioardi', isMainThread, fn, opts) diff --git a/src/worker/worker-options.ts b/src/worker/worker-options.ts index 86129901..0236164b 100644 --- a/src/worker/worker-options.ts +++ b/src/worker/worker-options.ts @@ -1,12 +1,17 @@ +/** + * Options for workers. + */ export interface WorkerOptions { /** - * Max time to wait tasks to work on (in ms), after this period the new worker threads will die. + * Maximum waiting time in milliseconds for tasks. + * + * After this time, newly created workers will be terminated. * * @default 60.000 ms */ maxInactiveTime?: number /** - * `true` if your function contains async pieces, else `false`. + * Whether your worker will perform asynchronous or not. * * @default false */ -- 2.34.1