import type { MessageValue } from '../utility-types'
import type { IPool } from './pool'
+/**
+ * Callback invoked if the worker raised an error.
+ */
export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
+
+/**
+ * Callback invoked when the worker has started successfully.
+ */
export type OnlineHandler<Worker> = (this: Worker) => void
+
+/**
+ * Callback invoked when the worker exits successfully.
+ */
export type ExitHandler<Worker> = (this: Worker, code: number) => void
+/**
+ * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
+ */
export interface IWorker {
on(event: 'error', handler: ErrorHandler<this>): void
on(event: 'online', handler: OnlineHandler<this>): void
on(event: 'exit', handler: ExitHandler<this>): void
}
+/**
+ * Options for a poolifier pool.
+ */
export interface PoolOptions<Worker> {
/**
* A function that will listen for error event on each worker.
*/
exitHandler?: ExitHandler<Worker>
/**
- * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
+ * This is just to avoid non-useful warning messages.
+ *
+ * Will be used to set `maxListeners` on event emitters (workers are event emitters).
*
* @default 1000
+ * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
*/
maxTasks?: number
}
+/**
+ * Internal poolifier pool emitter.
+ */
class PoolEmitter extends EventEmitter {}
+/**
+ * Base class containing some shared logic for all poolifier pools.
+ *
+ * @template Worker Type of worker which manages this pool.
+ * @template Data Type of data sent to the worker.
+ * @template Response Type of response of execution.
+ */
export abstract class AbstractPool<
Worker extends IWorker,
Data = unknown,
Response = unknown
> implements IPool<Data, Response> {
+ /**
+ * List of currently available workers.
+ */
public readonly workers: Worker[] = []
+
+ /**
+ * ID for the next worker.
+ */
public nextWorker: number = 0
/**
- * `workerId` as key and an integer value
+ * - `key`: The `Worker`
+ * - `value`: Number of tasks that has been assigned to that worker since it started
*/
public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
+ /**
+ * Emitter on which events can be listened to.
+ *
+ * Events that can currently be listened to:
+ *
+ * - `'FullPool'`
+ */
public readonly emitter: PoolEmitter
+ /**
+ * ID of the next message.
+ */
protected id: number = 0
+ /**
+ * Constructs a new poolifier pool.
+ *
+ * @param numWorkers Number of workers that this pool should manage.
+ * @param filePath Path to the worker-file.
+ * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
+ */
public constructor (
public readonly numWorkers: number,
public readonly filePath: string,
this.emitter = new PoolEmitter()
}
+ /**
+ * Setup hook that can be overridden by a Poolifer pool implementation
+ * to run code before workers are created in the abstract constructor.
+ */
protected setupHook (): void {
// Can be overridden
}
+ /**
+ * Should return whether the worker is the main worker or not.
+ */
protected abstract isMain (): boolean
public async destroy (): Promise<void> {
}
}
+ /**
+ * Shut down given worker.
+ *
+ * @param worker A worker within `workers`.
+ */
protected abstract destroyWorker (worker: Worker): void | Promise<void>
+ /**
+ * Send a message to the given worker.
+ *
+ * @param worker The worker which should receive the message.
+ * @param message The message.
+ */
protected abstract sendToWorker (
worker: Worker,
message: MessageValue<Data>
): void
+ /**
+ * Adds the given worker to the pool.
+ *
+ * @param worker Worker that will be added.
+ */
protected addWorker (worker: Worker): void {
const previousWorkerIndex = this.tasks.get(worker)
if (previousWorkerIndex !== undefined) {
}
}
+ /**
+ * Removes the given worker from the pool.
+ *
+ * @param worker Worker that will be removed.
+ */
protected removeWorker (worker: Worker): void {
// Clean worker from data structure
const workerIndex = this.workers.indexOf(worker)
this.tasks.delete(worker)
}
- /**
- * Execute the task specified into the constructor with the data parameter.
- *
- * @param data The input for the task specified.
- * @returns Promise that is resolved when the task is done.
- */
public execute (data: Data): Promise<Response> {
- // configure worker to handle message with the specified task
+ // Configure worker to handle message with the specified task
const worker = this.chooseWorker()
this.addWorker(worker)
const id = ++this.id
})
}
+ /**
+ * Choose a worker for the next task.
+ *
+ * The default implementation uses a round robin algorithm to distribute the load.
+ */
protected chooseWorker (): Worker {
this.nextWorker =
this.nextWorker === this.workers.length - 1 ? 0 : this.nextWorker + 1
return this.workers[this.nextWorker]
}
+ /**
+ * Returns a newly created worker.
+ */
protected abstract newWorker (): Worker
+ /**
+ * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
+ *
+ * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
+ *
+ * @param worker The newly created worker.
+ */
protected abstract afterNewWorkerPushed (worker: Worker): void
+ /**
+ * Creates a new worker for this pool and sets it up completely.
+ */
protected internalNewWorker (): Worker {
const worker: Worker = this.newWorker()
worker.on('error', this.opts.errorHandler ?? (() => {}))
import { FixedClusterPool } from './fixed'
/**
- * A cluster pool with a min/max number of workers, is possible to execute tasks in sync or async mode as you prefer.
+ * A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers.
*
- * This cluster pool will create new workers when the other ones are busy, until the max number of workers,
- * when the max number of workers is reached, an event will be emitted, if you want to listen this event use the emitter method.
+ * This cluster pool creates new workers when the others are busy, up to the maximum number of workers.
+ * When the maximum number of workers is reached, an event is emitted. If you want to listen to this event, use the pool's `emitter`.
+ *
+ * @template Data Type of data sent to the worker.
+ * @template Response Type of response of execution.
*
* @author [Christopher Quadflieg](https://github.com/Shinigami92)
* @since 2.0.0
Response extends JSONValue = JSONValue
> extends FixedClusterPool<Data, Response> {
/**
- * @param min Min number of workers that will be always active
- * @param max Max number of workers that will be active
- * @param filename A file path with implementation of `ClusterWorker` class, relative path is fine.
- * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
+ * Constructs a new poolifier dynamic cluster pool.
+ *
+ * @param min Minimum number of workers which are always active.
+ * @param max Maximum number of workers that can be created by this pool.
+ * @param filename Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
+ * @param opts Options for this fixed cluster pool. Default: `{ maxTasks: 1000 }`
*/
public constructor (
min: number,
super(min, filename, opts)
}
+ /**
+ * Choose a worker for the next task.
+ *
+ * It will first check for and return an idle worker.
+ * If all workers are busy, then it will try to create a new one up to the `max` worker count.
+ * If the max worker count is reached, the emitter will emit a `FullPool` event and it will fall back to using a round robin algorithm to distribute the load.
+ */
protected chooseWorker (): Worker {
let worker: Worker | undefined
for (const entry of this.tasks) {
}
if (worker) {
- // a worker is free, use it
+ // A worker is free, use it
return worker
} else {
if (this.workers.length === this.max) {
this.emitter.emit('FullPool')
return super.chooseWorker()
}
- // all workers are busy create a new worker
+ // All workers are busy, create a new worker
const worker = this.internalNewWorker()
worker.on('message', (message: MessageValue<Data>) => {
if (message.kill) {
import type { PoolOptions } from '../abstract-pool'
import { AbstractPool } from '../abstract-pool'
+/**
+ * Options for a poolifier cluster pool.
+ */
export interface ClusterPoolOptions extends PoolOptions<Worker> {
/**
* Key/value pairs to add to worker process environment.
}
/**
- * A cluster pool with a static number of workers, is possible to execute tasks in sync or async mode as you prefer.
+ * A cluster pool with a fixed number of workers.
+ *
+ * It is possible to perform tasks in sync or asynchronous mode as you prefer.
*
- * This pool will select the worker in a round robin fashion.
+ * This pool selects the workers in a round robin fashion.
+ *
+ * @template Data Type of data sent to the worker.
+ * @template Response Type of response of execution.
*
* @author [Christopher Quadflieg](https://github.com/Shinigami92)
* @since 2.0.0
Response extends JSONValue = JSONValue
> extends AbstractPool<Worker, Data, Response> {
/**
+ * Constructs a new poolifier fixed cluster pool.
+ *
* @param numWorkers Number of workers for this pool.
- * @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine.
- * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
+ * @param filePath Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
+ * @param opts Options for this fixed cluster pool. Default: `{ maxTasks: 1000 }`
*/
public constructor (
numWorkers: number,
+/**
+ * Contract definition for a poolifier pool.
+ *
+ * @template Data Type of data sent to the worker.
+ * @template Response Type of response of execution.
+ */
export interface IPool<Data = unknown, Response = unknown> {
+ /**
+ * Shut down every current worker in this pool.
+ */
destroy(): Promise<void>
+ /**
+ * Perform the task specified in the constructor with the data parameter.
+ *
+ * @param data The input for the specified task.
+ * @returns Promise that will be resolved when the task is successfully completed.
+ */
execute(data: Data): Promise<Response>
}
import { FixedThreadPool } from './fixed'
/**
- * A thread pool with a min/max number of threads, is possible to execute tasks in sync or async mode as you prefer.
+ * A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads.
*
- * This thread pool will create new workers when the other ones are busy, until the max number of threads,
- * when the max number of threads is reached, an event will be emitted, if you want to listen this event use the emitter method.
+ * This thread pool creates new threads when the others are busy, up to the maximum number of threads.
+ * When the maximum number of threads is reached, an event is emitted. If you want to listen to this event, use the pool's `emitter`.
+ *
+ * @template Data Type of data sent to the worker.
+ * @template Response Type of response of execution.
*
* @author [Alessandro Pio Ardizio](https://github.com/pioardi)
* @since 0.0.1
Response extends JSONValue = JSONValue
> extends FixedThreadPool<Data, Response> {
/**
- * @param min Min number of threads that will be always active
- * @param max Max number of threads that will be active
- * @param filename A file path with implementation of `ThreadWorker` class, relative path is fine.
- * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
+ * Constructs a new poolifier dynamic thread pool.
+ *
+ * @param min Minimum number of threads which are always active.
+ * @param max Maximum number of threads that can be created by this pool.
+ * @param filename Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
+ * @param opts Options for this fixed thread pool. Default: `{ maxTasks: 1000 }`
*/
public constructor (
min: number,
super(min, filename, opts)
}
+ /**
+ * Choose a thread for the next task.
+ *
+ * It will first check for and return an idle thread.
+ * If all threads are busy, then it will try to create a new one up to the `max` thread count.
+ * If the max thread count is reached, the emitter will emit a `FullPool` event and it will fall back to using a round robin algorithm to distribute the load.
+ */
protected chooseWorker (): ThreadWorkerWithMessageChannel {
let worker: ThreadWorkerWithMessageChannel | undefined
for (const entry of this.tasks) {
import type { PoolOptions } from '../abstract-pool'
import { AbstractPool } from '../abstract-pool'
+/**
+ * A thread worker with message channels for communication between main thread and thread worker.
+ */
export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
/**
- * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
+ * A thread pool with a fixed number of threads.
+ *
+ * It is possible to perform tasks in sync or asynchronous mode as you prefer.
+ *
+ * This pool selects the threads in a round robin fashion.
*
- * This pool will select the worker thread in a round robin fashion.
+ * @template Data Type of data sent to the worker.
+ * @template Response Type of response of execution.
*
* @author [Alessandro Pio Ardizio](https://github.com/pioardi)
* @since 0.0.1
Response extends JSONValue = JSONValue
> extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
/**
- * @param numThreads Num of threads for this worker pool.
- * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
- * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
+ * Constructs a new poolifier fixed thread pool.
+ *
+ * @param numThreads Number of threads for this pool.
+ * @param filePath Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
+ * @param opts Options for this fixed thread pool. Default: `{ maxTasks: 1000 }`
*/
public constructor (
numThreads: number,
+/**
+ * Make all properties in T non-readonly
+ */
export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
+/**
+ * Serializable primitive JSON value.
+ */
export type JSONPrimitive = number | boolean | string | null
+/**
+ * Serializable JSON value.
+ */
// eslint-disable-next-line no-use-before-define
export type JSONValue = JSONPrimitive | JSONArray | JSONObject
+/**
+ * Serializable JSON object.
+ */
export type JSONObject = { [k: string]: JSONValue }
+/**
+ * Serializable JSON array.
+ */
export type JSONArray = Array<JSONValue>
+/**
+ * Message object that is passed between worker and main worker.
+ */
export interface MessageValue<Data = unknown> {
+ /**
+ * Input data that will be passed to the worker.
+ */
readonly data?: Data
+ /**
+ * ID of the message.
+ */
readonly id?: number
+ /**
+ * Kill code.
+ */
readonly kill?: number
+ /**
+ * Error.
+ */
readonly error?: string
+ /**
+ * Reference to main worker.
+ *
+ * _Only for internal use_
+ */
readonly parent?: MessagePort
}
import type { MessageValue } from '../utility-types'
import type { WorkerOptions } from './worker-options'
+/**
+ * Base class containing some shared logic for all poolifier workers.
+ *
+ * @template MainWorker Type of main worker.
+ * @template Data Type of data this worker receives from pool's execution.
+ * @template Response Type of response the worker sends back to the main worker.
+ */
export abstract class AbstractWorker<
MainWorker,
Data = unknown,
Response = unknown
> extends AsyncResource {
+ /**
+ * The maximum time to keep this worker alive while idle. The pool automatically checks and terminates this worker when the time expires.
+ */
protected readonly maxInactiveTime: number
+ /**
+ * Whether the worker is working asynchronously or not.
+ */
protected readonly async: boolean
+ /**
+ * Timestamp of the last task processed by this worker.
+ */
protected lastTask: number
+ /**
+ * Handler ID of the `interval` alive check.
+ */
protected readonly interval?: NodeJS.Timeout
/**
+ * Constructs a new poolifier worker.
*
* @param type The type of async event.
- * @param isMain
- * @param fn
- * @param opts
+ * @param isMain Whether this is the main worker or not.
+ * @param fn Function processed by the worker when the pool's `execution` function is invoked.
+ * @param opts Options for the worker.
*/
public constructor (
type: string,
}
}
+ /**
+ * Returns the main worker.
+ */
protected abstract getMainWorker (): MainWorker
+ /**
+ * Send a message to the main worker.
+ *
+ * @param message The response message.
+ */
protected abstract sendToMainWorker (message: MessageValue<Response>): void
+ /**
+ * Check to see if the worker should be terminated, because its living too long.
+ */
protected checkAlive (): void {
if (Date.now() - this.lastTask > this.maxInactiveTime) {
this.sendToMainWorker({ kill: 1 })
}
}
+ /**
+ * Handle an error and convert it to a string so it can be sent back to the main worker.
+ *
+ * @param e The error raised by the worker.
+ */
protected handleError (e: Error | string): string {
return (e as unknown) as string
}
+ /**
+ * Run the given function synchronously.
+ *
+ * @param fn Function that will be executed.
+ * @param value Input data for the given function.
+ */
protected run (
fn: (data?: Data) => Response,
value: MessageValue<Data>
}
}
+ /**
+ * Run the given function asynchronously.
+ *
+ * @param fn Function that will be executed.
+ * @param value Input data for the given function.
+ */
protected runAsync (
fn: (data?: Data) => Promise<Response>,
value: MessageValue<Data>
import type { WorkerOptions } from './worker-options'
/**
- * An example worker that will be always alive, you just need to **extend** this class if you want a static pool.
+ * A cluster worker used by a poolifier `ClusterPool`.
*
- * When this worker is inactive for more than 1 minute, it will send this info to the main worker,
- * if you are using DynamicClusterPool, the workers created after will be killed, the min num of worker will be guaranteed.
+ * When this worker is inactive for more than the given `maxInactiveTime`,
+ * it will send a termination request to its main worker.
+ *
+ * If you use a `DynamicClusterPool` the extra workers that were created will be terminated,
+ * but the minimum number of workers will be guaranteed.
+ *
+ * @template Data Type of data this worker receives from pool's execution.
+ * @template Response Type of response the worker sends back to the main worker.
*
* @author [Christopher Quadflieg](https://github.com/Shinigami92)
* @since 2.0.0
Data extends JSONValue = JSONValue,
Response extends JSONValue = JSONValue
> extends AbstractWorker<Worker, Data, Response> {
+ /**
+ * Constructs a new poolifier cluster worker.
+ *
+ * @param fn Function processed by the worker when the pool's `execution` function is invoked.
+ * @param opts Options for the worker.
+ */
public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
super('worker-cluster-pool:pioardi', isMaster, fn, opts)
import type { WorkerOptions } from './worker-options'
/**
- * An example worker that will be always alive, you just need to **extend** this class if you want a static pool.
+ * A thread worker used by a poolifier `ThreadPool`.
*
- * When this worker is inactive for more than 1 minute, it will send this info to the main thread,
- * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed.
+ * When this worker is inactive for more than the given `maxInactiveTime`,
+ * it will send a termination request to its main thread.
+ *
+ * If you use a `DynamicThreadPool` the extra workers that were created will be terminated,
+ * but the minimum number of workers will be guaranteed.
+ *
+ * @template Data Type of data this worker receives from pool's execution.
+ * @template Response Type of response the worker sends back to the main thread.
*
* @author [Alessandro Pio Ardizio](https://github.com/pioardi)
* @since 0.0.1
Data extends JSONValue = JSONValue,
Response extends JSONValue = JSONValue
> extends AbstractWorker<MessagePort, Data, Response> {
+ /**
+ * Reference to main thread.
+ */
protected parent?: MessagePort
+ /**
+ * Constructs a new poolifier thread worker.
+ *
+ * @param fn Function processed by the worker when the pool's `execution` function is invoked.
+ * @param opts Options for the worker.
+ */
public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
super('worker-thread-pool:pioardi', isMainThread, fn, opts)
+/**
+ * Options for workers.
+ */
export interface WorkerOptions {
/**
- * Max time to wait tasks to work on (in ms), after this period the new worker threads will die.
+ * Maximum waiting time in milliseconds for tasks.
+ *
+ * After this time, newly created workers will be terminated.
*
* @default 60.000 ms
*/
maxInactiveTime?: number
/**
- * `true` if your function contains async pieces, else `false`.
+ * Whether your worker will perform asynchronous or not.
*
* @default false
*/