WorkerChoiceStrategyContext
} from './selection-strategies'
+/**
+ * Callback invoked if the worker has received a message.
+ */
+export type MessageHandler<Worker> = (this: Worker, m: unknown) => void
+
/**
* Callback invoked if the worker raised an error.
*/
* Basic interface that describes the minimum required implementation of listener events for a pool-worker.
*/
export interface IWorker {
+ /**
+ * Register a listener to the message event.
+ *
+ * @param event `'message'`.
+ * @param handler The message handler.
+ */
+ on(event: 'message', handler: MessageHandler<this>): void
/**
* Register a listener to the error event.
*
* Options for a poolifier pool.
*/
export interface PoolOptions<Worker> {
+ /**
+ * A function that will listen for message event on each worker.
+ */
+ messageHandler?: MessageHandler<Worker>
/**
* A function that will listen for error event on each worker.
*/
/**
* Pool events emission.
*
- * Default to true.
+ * @default true
*/
enableEvents?: boolean
}
Worker extends IWorker,
Data = unknown,
Response = unknown
-> implements IPoolInternal<Worker, Data, Response>
-{
+> implements IPoolInternal<Worker, Data, Response> {
/** @inheritdoc */
public readonly workers: Worker[] = []
this,
() => {
const workerCreated = this.createAndSetupWorker()
- this.registerWorkerMessageListener(workerCreated, async message => {
- const tasksInProgress = this.tasks.get(workerCreated)
+ this.registerWorkerMessageListener(workerCreated, message => {
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
- tasksInProgress === 0
+ this.tasks.get(workerCreated) === 0
) {
// Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- await this.destroyWorker(workerCreated)
+ this.destroyWorker(workerCreated) as void
}
})
return workerCreated
const messageId = ++this.nextMessageId
const res = this.internalExecute(worker, messageId)
this.checkAndEmitBusy()
- this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
+ data = data ?? ({} as Data)
+ this.sendToWorker(worker, { data, id: messageId })
return res
}
protected abstract isMain (): boolean
/**
- * Increase the number of tasks that the given workers has applied.
+ * Increase the number of tasks that the given worker has applied.
*
* @param worker Worker whose tasks are increased.
*/
}
/**
- * Decrease the number of tasks that the given workers has applied.
+ * Decrease the number of tasks that the given worker has applied.
*
* @param worker Worker whose tasks are decreased.
*/
}
/**
- * Step the number of tasks that the given workers has applied.
+ * Step the number of tasks that the given worker has applied.
*
* @param worker Worker whose tasks are set.
* @param step Worker number of tasks step.
* @returns New, completely set up worker.
*/
protected createAndSetupWorker (): Worker {
- const worker: Worker = this.createWorker()
+ const worker = this.createWorker()
+ worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
/**
* This function is the listener registered for each worker.
*
- * @returns The listener function to execute when a message is sent from a worker.
+ * @returns The listener function to execute when a message is received from a worker.
*/
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
- if (message.id) {
- const value = this.promiseMap.get(message.id)
- if (value) {
- this.decreaseWorkersTasks(value.worker)
- if (message.error) value.reject(message.error)
- else value.resolve(message.data as Response)
+ if (message.id !== undefined) {
+ const promise = this.promiseMap.get(message.id)
+ if (promise !== undefined) {
+ this.decreaseWorkersTasks(promise.worker)
+ if (message.error) promise.reject(message.error)
+ else promise.resolve(message.data as Response)
this.promiseMap.delete(message.id)
}
}