## [Unreleased]
+### Added
+
+- Support multiple task functions per worker.
+
## [2.4.11] - 2023-04-23
### Changed
You can do the same with the classes ClusterWorker, FixedClusterPool and DynamicClusterPool.
-**See examples folder for more details (in particular if you want to use a pool for [multiple functions](./examples/multiFunctionExample.js))**.
+**See examples folder for more details (in particular if you want to use a pool with [multiple worker functions](./examples/multiFunctionExample.js))**.
**Now TypeScript is also supported, find how to use it into the example folder**.
Remember that workers can only send and receive serializable data.
### `class YourWorker extends ThreadWorker/ClusterWorker`
-`fn` (mandatory) The function that you want to execute on the worker
+`taskFunctions` (mandatory) The task function(s) that you want to execute on the worker
`opts` (optional) An object with these properties:
- `maxInactiveTime` (optional) - Max time to wait tasks to work on in milliseconds, after this period the new worker will die.
})
pool
- .execute({ functionName: 'fn0', input: 'hello' })
+ .execute({ text: 'hello' }, 'fn0')
.then(res => console.log(res))
.catch(err => console.error(err))
pool
- .execute({ functionName: 'fn1', input: 'multiple functions' })
+ .execute({ text: 'multiple functions' }, 'fn1')
.then(res => console.log(res))
.catch(err => console.error(err))
'use strict'
const { ThreadWorker } = require('poolifier')
-function yourFunction (data) {
- if (data.functionName === 'fn0') {
- console.log('Executing function 0')
- return { data: '0 your input was' + data.input }
- } else if (data.functionName === 'fn1') {
- console.log('Executing function 1')
- return { data: '1 your input was' + data.input }
- }
+function fn0 (data) {
+ console.log('Executing function 0')
+ return { data: 'fn0 your input was' + data.text }
}
-module.exports = new ThreadWorker(yourFunction)
+function fn1 (data) {
+ console.log('Executing function 1')
+ return { data: 'fn1 your input was' + data.text }
+}
+
+module.exports = new ThreadWorker({ fn0, fn1 })
}
/** @inheritDoc */
- public async execute (data?: Data): Promise<Response> {
+ public async execute (data?: Data, name?: string): Promise<Response> {
const [workerNodeKey, workerNode] = this.chooseWorkerNode()
const submittedTask: Task<Data> = {
+ name,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
id: crypto.randomUUID()
* Executes the function specified in the worker constructor with the task data input parameter.
*
* @param data - The task input data for the specified worker function. This can only be serializable data.
+ * @param name - The name of the worker function to execute. If not specified, the default worker function will be executed.
* @returns Promise that will be fulfilled when the task is completed.
*/
- execute: (data?: Data) => Promise<Response>
+ execute: (data?: Data, name?: string) => Promise<Response>
/**
* Shutdowns every current worker in this pool.
*/
* @internal
*/
export interface Task<Data = unknown> {
+ /**
+ * Task name.
+ */
+ readonly name?: string
/**
* Task input data that will be passed to the worker.
*/
export type WorkerFunction<Data = unknown, Response = unknown> =
| WorkerSyncFunction<Data, Response>
| WorkerAsyncFunction<Data, Response>
+/**
+ * Worker functions object that can be executed.
+ * This object can contain synchronous or asynchronous functions.
+ * The key is the name of the function.
+ * The value is the function itself.
+ *
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
+ */
+export type TaskFunctions<Data = unknown, Response = unknown> = Record<
+string,
+WorkerFunction<Data, Response>
+>
/**
* An object holding the execution response promise resolve/reject callbacks.
import { AsyncResource } from 'node:async_hooks'
import type { Worker } from 'node:cluster'
import type { MessagePort } from 'node:worker_threads'
-import type {
- MessageValue,
- WorkerAsyncFunction,
- WorkerFunction,
- WorkerSyncFunction
+import {
+ type MessageValue,
+ type TaskFunctions,
+ type WorkerAsyncFunction,
+ type WorkerFunction,
+ type WorkerSyncFunction
} from '../utility-types'
import { EMPTY_FUNCTION } from '../utils'
import type { KillBehavior, WorkerOptions } from './worker-options'
Data = unknown,
Response = unknown
> extends AsyncResource {
+ /**
+ * Task function(s) processed by the worker when the pool's `execution` function is invoked.
+ */
+ protected taskFunctions!: Map<string, WorkerFunction<Data, Response>>
/**
* Timestamp of the last task processed by this worker.
*/
*
* @param type - The type of async event.
* @param isMain - Whether this is the main worker or not.
- * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
+ * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked.
* @param mainWorker - Reference to main worker.
* @param opts - Options for the worker.
*/
public constructor (
type: string,
protected readonly isMain: boolean,
- fn: WorkerFunction<Data, Response>,
+ taskFunctions:
+ | WorkerFunction<Data, Response>
+ | TaskFunctions<Data, Response>,
protected mainWorker: MainWorker | undefined | null,
protected readonly opts: WorkerOptions = {
/**
) {
super(type)
this.checkWorkerOptions(this.opts)
- this.checkFunctionInput(fn)
+ this.checkTaskFunctions(taskFunctions)
if (!this.isMain) {
this.lastTaskTimestamp = performance.now()
this.aliveInterval = setInterval(
this.mainWorker?.on(
'message',
(message: MessageValue<Data, MainWorker>) => {
- this.messageListener(message, fn)
+ this.messageListener(message)
}
)
}
}
/**
- * Checks if the `fn` parameter is passed to the constructor.
+ * Checks if the `taskFunctions` parameter is passed to the constructor.
*
- * @param fn - The function that should be defined.
+ * @param taskFunctions - The task function(s) that should be defined.
*/
- private checkFunctionInput (fn: WorkerFunction<Data, Response>): void {
- if (fn == null) throw new Error('fn parameter is mandatory')
- if (typeof fn !== 'function') {
- throw new TypeError('fn parameter is not a function')
+ private checkTaskFunctions (
+ taskFunctions:
+ | WorkerFunction<Data, Response>
+ | TaskFunctions<Data, Response>
+ ): void {
+ if (taskFunctions == null) { throw new Error('taskFunctions parameter is mandatory') }
+ if (
+ typeof taskFunctions !== 'function' &&
+ typeof taskFunctions !== 'object'
+ ) {
+ throw new Error('taskFunctions parameter is not a function or an object')
}
- if (fn.constructor.name === 'AsyncFunction' && this.opts.async === false) {
- throw new Error(
- 'fn parameter is an async function, please set the async option to true'
- )
+ if (
+ typeof taskFunctions === 'object' &&
+ taskFunctions.constructor !== Object &&
+ Object.prototype.toString.call(taskFunctions) !== '[object Object]'
+ ) {
+ throw new Error('taskFunctions parameter is not an object literal')
+ }
+ this.taskFunctions = new Map<string, WorkerFunction<Data, Response>>()
+ if (typeof taskFunctions !== 'function') {
+ for (const [name, fn] of Object.entries(taskFunctions)) {
+ if (typeof fn !== 'function') {
+ throw new Error(
+ 'A taskFunctions parameter object value is not a function'
+ )
+ }
+ this.taskFunctions.set(name, fn.bind(this))
+ }
+ } else {
+ this.taskFunctions.set('default', taskFunctions.bind(this))
}
}
* Worker message listener.
*
* @param message - Message received.
- * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
*/
- protected messageListener (
- message: MessageValue<Data, MainWorker>,
- fn: WorkerFunction<Data, Response>
- ): void {
+ protected messageListener (message: MessageValue<Data, MainWorker>): void {
if (message.id != null && message.data != null) {
+ let fn: WorkerFunction<Data, Response> | undefined
+ if (message.name == null) {
+ fn = this.taskFunctions.get('default')
+ } else {
+ fn = this.taskFunctions.get(message.name)
+ }
// Task message received
- if (this.opts.async === true) {
+ if (fn?.constructor.name === 'AsyncFunction') {
this.runInAsyncScope(this.runAsync.bind(this), this, fn, message)
} else {
this.runInAsyncScope(this.run.bind(this), this, fn, message)
import type { Worker } from 'node:cluster'
import cluster from 'node:cluster'
-import type { MessageValue, WorkerFunction } from '../utility-types'
+import type {
+ MessageValue,
+ TaskFunctions,
+ WorkerFunction
+} from '../utility-types'
import { AbstractWorker } from './abstract-worker'
import type { WorkerOptions } from './worker-options'
/**
* Constructs a new poolifier cluster worker.
*
- * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
+ * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked.
* @param opts - Options for the worker.
*/
public constructor (
- fn: WorkerFunction<Data, Response>,
+ taskFunctions:
+ | WorkerFunction<Data, Response>
+ | TaskFunctions<Data, Response>,
opts: WorkerOptions = {}
) {
super(
'worker-cluster-pool:poolifier',
cluster.isPrimary,
- fn,
+ taskFunctions,
cluster.worker,
opts
)
import type { MessagePort } from 'node:worker_threads'
import { isMainThread, parentPort } from 'node:worker_threads'
-import type { MessageValue, WorkerFunction } from '../utility-types'
+import type {
+ MessageValue,
+ TaskFunctions,
+ WorkerFunction
+} from '../utility-types'
import { AbstractWorker } from './abstract-worker'
import type { WorkerOptions } from './worker-options'
* @param opts - Options for the worker.
*/
public constructor (
- fn: WorkerFunction<Data, Response>,
+ fn: WorkerFunction<Data, Response> | TaskFunctions<Data, Response>,
opts: WorkerOptions = {}
) {
super('worker-thread-pool:poolifier', isMainThread, fn, parentPort, opts)
* Whether your worker will perform asynchronous or not.
*
* @defaultValue false
+ * @deprecated This option will be removed in the next major version.
*/
async?: boolean
/**
expect(worker.opts.async).toBe(true)
})
- it('Verify that fn parameter is mandatory', () => {
- expect(() => new ClusterWorker()).toThrowError('fn parameter is mandatory')
+ it('Verify that taskFunctions parameter is mandatory', () => {
+ expect(() => new ClusterWorker()).toThrowError(
+ 'taskFunctions parameter is mandatory'
+ )
})
- it('Verify that fn parameter is a function', () => {
- expect(() => new ClusterWorker({})).toThrowError(
- new TypeError('fn parameter is not a function')
+ it('Verify that taskFunctions parameter is a function or an object', () => {
+ expect(() => new ClusterWorker(0)).toThrowError(
+ new TypeError('taskFunctions parameter is not a function or an object')
)
expect(() => new ClusterWorker('')).toThrowError(
- new TypeError('fn parameter is not a function')
+ new TypeError('taskFunctions parameter is not a function or an object')
+ )
+ expect(() => new ClusterWorker(true)).toThrowError(
+ new TypeError('taskFunctions parameter is not a function or an object')
)
})
- it('Verify that async fn parameter without async option throw error', () => {
- const fn = async () => {
- return new Promise()
- }
- expect(() => new ClusterWorker(fn)).toThrowError(
- 'fn parameter is an async function, please set the async option to true'
+ it('Verify that taskFunctions parameter is an object literal', () => {
+ expect(() => new ClusterWorker([])).toThrowError(
+ new TypeError('taskFunctions parameter is not an object literal')
+ )
+ expect(() => new ClusterWorker(new Map())).toThrowError(
+ new TypeError('taskFunctions parameter is not an object literal')
+ )
+ expect(() => new ClusterWorker(new Set())).toThrowError(
+ new TypeError('taskFunctions parameter is not an object literal')
+ )
+ expect(() => new ClusterWorker(new WeakMap())).toThrowError(
+ new TypeError('taskFunctions parameter is not an object literal')
)
+ expect(() => new ClusterWorker(new WeakSet())).toThrowError(
+ new TypeError('taskFunctions parameter is not an object literal')
+ )
+ })
+
+ it('Verify that taskFunctions parameter with multiple task functions is taken', () => {
+ const fn1 = () => {
+ return 1
+ }
+ const fn2 = () => {
+ return 2
+ }
+ const worker = new ClusterWorker({ fn1, fn2 })
+ expect(typeof worker.taskFunctions.get('fn1') === 'function').toBe(true)
+ expect(typeof worker.taskFunctions.get('fn2') === 'function').toBe(true)
})
it('Verify that handleError() method is working properly', () => {