The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
-## [2.1.0] - 2021-dd-mm
+## [2.1.0] - 2021-28-08
+
+### Added
+
+- Add an optional pool option `messageHandler` to `PoolOptions<Worker>` for registering a message handler callback on each worker.
### Breaking Changes
WorkerChoiceStrategyContext
} from './selection-strategies'
+/**
+ * Callback invoked if the worker has received a message.
+ */
+export type MessageHandler<Worker> = (this: Worker, m: unknown) => void
+
/**
* Callback invoked if the worker raised an error.
*/
* Basic interface that describes the minimum required implementation of listener events for a pool-worker.
*/
export interface IWorker {
+ /**
+ * Register a listener to the message event.
+ *
+ * @param event `'message'`.
+ * @param handler The message handler.
+ */
+ on(event: 'message', handler: MessageHandler<this>): void
/**
* Register a listener to the error event.
*
* Options for a poolifier pool.
*/
export interface PoolOptions<Worker> {
+ /**
+ * A function that will listen for message event on each worker.
+ */
+ messageHandler?: MessageHandler<Worker>
/**
* A function that will listen for error event on each worker.
*/
/**
* Pool events emission.
*
- * Default to true.
+ * @default true
*/
enableEvents?: boolean
}
protected abstract isMain (): boolean
/**
- * Increase the number of tasks that the given workers has applied.
+ * Increase the number of tasks that the given worker has applied.
*
* @param worker Worker whose tasks are increased.
*/
}
/**
- * Decrease the number of tasks that the given workers has applied.
+ * Decrease the number of tasks that the given worker has applied.
*
* @param worker Worker whose tasks are decreased.
*/
}
/**
- * Step the number of tasks that the given workers has applied.
+ * Step the number of tasks that the given worker has applied.
*
* @param worker Worker whose tasks are set.
* @param step Worker number of tasks step.
protected createAndSetupWorker (): Worker {
const worker: Worker = this.createWorker()
+ worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
* @param min Minimum number of workers which are always active.
* @param max Maximum number of workers that can be created by this pool.
* @param filePath Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
- * @param opts Options for this dynamic cluster pool. Default: `{}`
+ * @param [opts={}] Options for this dynamic cluster pool.
*/
public constructor (
min: number,
*
* @param numberOfWorkers Number of workers for this pool.
* @param filePath Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
- * @param opts Options for this fixed cluster pool. Default: `{}`
+ * @param [opts={}] Options for this fixed cluster pool.
*/
public constructor (
numberOfWorkers: number,
* @param min Minimum number of threads which are always active.
* @param max Maximum number of threads that can be created by this pool.
* @param filePath Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
- * @param opts Options for this dynamic thread pool. Default: `{}`
+ * @param [opts={}] Options for this dynamic thread pool.
*/
public constructor (
min: number,
*
* @param numberOfThreads Number of threads for this pool.
* @param filePath Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
- * @param opts Options for this fixed thread pool. Default: `{}`
+ * @param [opts={}] Options for this fixed thread pool.
*/
public constructor (
numberOfThreads: number,
expect(pool.opts.workerChoiceStrategy).toBe(
WorkerChoiceStrategies.ROUND_ROBIN
)
+ expect(pool.opts.messageHandler).toBeUndefined()
+ expect(pool.opts.errorHandler).toBeUndefined()
+ expect(pool.opts.onlineHandler).toBeUndefined()
+ expect(pool.opts.exitHandler).toBeUndefined()
pool.destroy()
+ const testHandler = () => console.log('test handler executed')
pool = new FixedThreadPool(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.js',
{
workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED,
- enableEvents: false
+ enableEvents: false,
+ messageHandler: testHandler,
+ errorHandler: testHandler,
+ onlineHandler: testHandler,
+ exitHandler: testHandler
}
)
expect(pool.opts.enableEvents).toBe(false)
expect(pool.opts.workerChoiceStrategy).toBe(
WorkerChoiceStrategies.LESS_RECENTLY_USED
)
+ expect(pool.opts.messageHandler).toStrictEqual(testHandler)
+ expect(pool.opts.errorHandler).toStrictEqual(testHandler)
+ expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
+ expect(pool.opts.exitHandler).toStrictEqual(testHandler)
pool.destroy()
})