} from '../utility-types'
import { EMPTY_FUNCTION } from '../utils'
import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
+import type { AbstractPoolWorker } from './abstract-pool-worker'
+import type { PoolOptions } from './pool'
import type { IPoolInternal } from './pool-internal'
import { PoolEmitter, PoolType } from './pool-internal'
-import type { WorkerChoiceStrategy } from './selection-strategies'
import {
WorkerChoiceStrategies,
- WorkerChoiceStrategyContext
-} from './selection-strategies'
-
-/**
- * Callback invoked if the worker has received a message.
- */
-export type MessageHandler<Worker> = (this: Worker, m: unknown) => void
-
-/**
- * Callback invoked if the worker raised an error.
- */
-export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
-
-/**
- * Callback invoked when the worker has started successfully.
- */
-export type OnlineHandler<Worker> = (this: Worker) => void
-
-/**
- * Callback invoked when the worker exits successfully.
- */
-export type ExitHandler<Worker> = (this: Worker, code: number) => void
-
-/**
- * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
- */
-export interface IWorker {
- /**
- * Register a listener to the message event.
- *
- * @param event `'message'`.
- * @param handler The message handler.
- */
- on(event: 'message', handler: MessageHandler<this>): void
- /**
- * Register a listener to the error event.
- *
- * @param event `'error'`.
- * @param handler The error handler.
- */
- on(event: 'error', handler: ErrorHandler<this>): void
- /**
- * Register a listener to the online event.
- *
- * @param event `'online'`.
- * @param handler The online handler.
- */
- on(event: 'online', handler: OnlineHandler<this>): void
- /**
- * Register a listener to the exit event.
- *
- * @param event `'exit'`.
- * @param handler The exit handler.
- */
- on(event: 'exit', handler: ExitHandler<this>): void
- /**
- * Register a listener to the exit event that will only performed once.
- *
- * @param event `'exit'`.
- * @param handler The exit handler.
- */
- once(event: 'exit', handler: ExitHandler<this>): void
-}
-
-/**
- * Options for a poolifier pool.
- */
-export interface PoolOptions<Worker> {
- /**
- * A function that will listen for message event on each worker.
- */
- messageHandler?: MessageHandler<Worker>
- /**
- * A function that will listen for error event on each worker.
- */
- errorHandler?: ErrorHandler<Worker>
- /**
- * A function that will listen for online event on each worker.
- */
- onlineHandler?: OnlineHandler<Worker>
- /**
- * A function that will listen for exit event on each worker.
- */
- exitHandler?: ExitHandler<Worker>
- /**
- * The work choice strategy to use in this pool.
- */
- workerChoiceStrategy?: WorkerChoiceStrategy
- /**
- * Pool events emission.
- *
- * @default true
- */
- enableEvents?: boolean
-}
+ WorkerChoiceStrategy
+} from './selection-strategies/selection-strategies-types'
+import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
/**
* Base class containing some shared logic for all poolifier pools.
* @template Response Type of response of execution. This can only be serializable data.
*/
export abstract class AbstractPool<
- Worker extends IWorker,
+ Worker extends AbstractPoolWorker,
Data = unknown,
Response = unknown
> implements IPoolInternal<Worker, Data, Response> {
this.registerWorkerMessageListener(workerCreated, message => {
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
- this.tasks.get(workerCreated) === 0
+ this.getWorkerRunningTasks(workerCreated) === 0
) {
// Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
this.destroyWorker(workerCreated) as void
return this.promiseMap.size
}
+ /** @inheritdoc */
+ public getWorkerRunningTasks (worker: Worker): number | undefined {
+ return this.tasks.get(worker)
+ }
+
+ /** @inheritdoc */
+ public getWorkerIndex (worker: Worker): number {
+ return this.workers.indexOf(worker)
+ }
+
/** @inheritdoc */
public setWorkerChoiceStrategy (
workerChoiceStrategy: WorkerChoiceStrategy
protected internalGetBusyStatus (): boolean {
return (
this.numberOfRunningTasks >= this.numberOfWorkers &&
- this.findFreeTasksMapEntry() === false
+ this.findFreeWorker() === false
)
}
/** @inheritdoc */
- public findFreeTasksMapEntry (): [Worker, number] | false {
- for (const [worker, numberOfTasks] of this.tasks) {
- if (numberOfTasks === 0) {
- // A worker is free, return the matching tasks map entry
- return [worker, numberOfTasks]
+ public findFreeWorker (): Worker | false {
+ for (const worker of this.workers) {
+ if (this.getWorkerRunningTasks(worker) === 0) {
+ // A worker is free, return the matching worker
+ return worker
}
}
return false
*/
protected removeWorker (worker: Worker): void {
// Clean worker from data structure
- const workerIndex = this.workers.indexOf(worker)
- this.workers.splice(workerIndex, 1)
+ this.workers.splice(this.getWorkerIndex(worker), 1)
this.tasks.delete(worker)
}