## [Unreleased]
+### Added
+
+- Support multiple task functions per worker.
+
### Changed
- Use O(1) queue implementation for tasks queueing.
You can do the same with the classes ClusterWorker, FixedClusterPool and DynamicClusterPool.
-**See examples folder for more details (in particular if you want to use a pool for [multiple functions](./examples/multiFunctionExample.js))**.
+**See examples folder for more details (in particular if you want to use a pool with [multiple worker functions](./examples/multiFunctionExample.js))**.
**Now TypeScript is also supported, find how to use it into the example folder**.
Remember that workers can only send and receive serializable data.
### `class YourWorker extends ThreadWorker/ClusterWorker`
-`fn` (mandatory) The function that you want to execute on the worker
+`taskFunctions` (mandatory) The task function(s) that you want to execute on the worker
`opts` (optional) An object with these properties:
- `maxInactiveTime` (optional) - Max time to wait tasks to work on in milliseconds, after this period the new worker will die.
})
pool
- .execute({ functionName: 'fn0', input: 'hello' })
+ .execute({ text: 'hello' }, 'fn0')
.then(res => console.log(res))
.catch(err => console.error(err))
pool
- .execute({ functionName: 'fn1', input: 'multiple functions' })
+ .execute({ text: 'multiple functions' }, 'fn1')
.then(res => console.log(res))
.catch(err => console.error(err))
'use strict'
const { ThreadWorker } = require('poolifier')
-function yourFunction (data) {
- if (data.functionName === 'fn0') {
- console.log('Executing function 0')
- return { data: '0 your input was' + data.input }
- } else if (data.functionName === 'fn1') {
- console.log('Executing function 1')
- return { data: '1 your input was' + data.input }
- }
+function fn0 (data) {
+ console.log('Executing function 0')
+ return { data: 'fn0 your input text was' + data.text }
}
-module.exports = new ThreadWorker(yourFunction)
+function fn1 (data) {
+ console.log('Executing function 1')
+ return { data: 'fn1 your input text was' + data.text }
+}
+
+module.exports = new ThreadWorker({ fn0, fn1 })
export type {
IPool,
PoolEmitter,
- PoolOptions,
PoolEvent,
+ PoolOptions,
PoolType,
TasksQueueOptions
} from './pools/pool'
export type { KillBehavior, WorkerOptions } from './worker/worker-options'
export type {
Draft,
- PromiseResponseWrapper,
MessageValue,
+ PromiseResponseWrapper,
+ TaskFunctions,
WorkerAsyncFunction,
WorkerFunction,
WorkerSyncFunction
}
/** @inheritDoc */
- public async execute (data?: Data): Promise<Response> {
+ public async execute (data?: Data, name?: string): Promise<Response> {
const [workerNodeKey, workerNode] = this.chooseWorkerNode()
const submittedTask: Task<Data> = {
+ name,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
id: crypto.randomUUID()
* Executes the function specified in the worker constructor with the task data input parameter.
*
* @param data - The task input data for the specified worker function. This can only be serializable data.
+ * @param name - The name of the worker function to execute. If not specified, the default worker function will be executed.
* @returns Promise that will be fulfilled when the task is completed.
*/
- execute: (data?: Data) => Promise<Response>
+ execute: (data?: Data, name?: string) => Promise<Response>
/**
* Shutdowns every current worker in this pool.
*/
* @internal
*/
export interface Task<Data = unknown> {
+ /**
+ * Task name.
+ */
+ readonly name?: string
/**
* Task input data that will be passed to the worker.
*/
readonly data?: Data
/**
- * UUID of the message.
+ * Message UUID.
*/
readonly id?: string
}
export type WorkerSyncFunction<Data = unknown, Response = unknown> = (
data?: Data
) => Response
+
/**
* Worker asynchronous function that can be executed.
* This function must return a promise.
export type WorkerAsyncFunction<Data = unknown, Response = unknown> = (
data?: Data
) => Promise<Response>
+
/**
* Worker function that can be executed.
* This function can be synchronous or asynchronous.
| WorkerSyncFunction<Data, Response>
| WorkerAsyncFunction<Data, Response>
+/**
+ * Worker functions that can be executed.
+ * This object can contain synchronous or asynchronous functions.
+ * The key is the name of the function.
+ * The value is the function itself.
+ *
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
+ */
+export type TaskFunctions<Data = unknown, Response = unknown> = Record<
+string,
+WorkerFunction<Data, Response>
+>
+
/**
* An object holding the execution response promise resolve/reject callbacks.
*
import type { MessagePort } from 'node:worker_threads'
import type {
MessageValue,
+ TaskFunctions,
WorkerAsyncFunction,
WorkerFunction,
WorkerSyncFunction
import type { KillBehavior, WorkerOptions } from './worker-options'
import { KillBehaviors } from './worker-options'
+const DEFAULT_FUNCTION_NAME = 'default'
const DEFAULT_MAX_INACTIVE_TIME = 60000
const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT
Data = unknown,
Response = unknown
> extends AsyncResource {
+ /**
+ * Task function(s) processed by the worker when the pool's `execution` function is invoked.
+ */
+ protected taskFunctions!: Map<string, WorkerFunction<Data, Response>>
/**
* Timestamp of the last task processed by this worker.
*/
*
* @param type - The type of async event.
* @param isMain - Whether this is the main worker or not.
- * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
+ * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
* @param mainWorker - Reference to main worker.
* @param opts - Options for the worker.
*/
public constructor (
type: string,
protected readonly isMain: boolean,
- fn: WorkerFunction<Data, Response>,
+ taskFunctions:
+ | WorkerFunction<Data, Response>
+ | TaskFunctions<Data, Response>,
protected mainWorker: MainWorker | undefined | null,
protected readonly opts: WorkerOptions = {
/**
) {
super(type)
this.checkWorkerOptions(this.opts)
- this.checkFunctionInput(fn)
+ this.checkTaskFunctions(taskFunctions)
if (!this.isMain) {
this.lastTaskTimestamp = performance.now()
this.aliveInterval = setInterval(
this.checkAlive.bind(this)()
}
- this.mainWorker?.on(
- 'message',
- (message: MessageValue<Data, MainWorker>) => {
- this.messageListener(message, fn)
- }
- )
+ this.mainWorker?.on('message', this.messageListener.bind(this))
}
private checkWorkerOptions (opts: WorkerOptions): void {
}
/**
- * Checks if the `fn` parameter is passed to the constructor.
+ * Checks if the `taskFunctions` parameter is passed to the constructor.
*
- * @param fn - The function that should be defined.
+ * @param taskFunctions - The task function(s) parameter that should be checked.
*/
- private checkFunctionInput (fn: WorkerFunction<Data, Response>): void {
- if (fn == null) throw new Error('fn parameter is mandatory')
- if (typeof fn !== 'function') {
- throw new TypeError('fn parameter is not a function')
+ private checkTaskFunctions (
+ taskFunctions:
+ | WorkerFunction<Data, Response>
+ | TaskFunctions<Data, Response>
+ ): void {
+ if (taskFunctions == null) {
+ throw new Error('taskFunctions parameter is mandatory')
}
- if (fn.constructor.name === 'AsyncFunction' && this.opts.async === false) {
- throw new Error(
- 'fn parameter is an async function, please set the async option to true'
- )
+ if (
+ typeof taskFunctions !== 'function' &&
+ typeof taskFunctions !== 'object'
+ ) {
+ throw new Error('taskFunctions parameter is not a function or an object')
+ }
+ if (
+ typeof taskFunctions === 'object' &&
+ taskFunctions.constructor !== Object &&
+ Object.prototype.toString.call(taskFunctions) !== '[object Object]'
+ ) {
+ throw new Error('taskFunctions parameter is not an object literal')
+ }
+ this.taskFunctions = new Map<string, WorkerFunction<Data, Response>>()
+ if (typeof taskFunctions !== 'function') {
+ let firstEntry = true
+ for (const [name, fn] of Object.entries(taskFunctions)) {
+ if (typeof fn !== 'function') {
+ throw new Error(
+ 'A taskFunctions parameter object value is not a function'
+ )
+ }
+ this.taskFunctions.set(name, fn.bind(this))
+ if (firstEntry) {
+ this.taskFunctions.set(DEFAULT_FUNCTION_NAME, fn.bind(this))
+ firstEntry = false
+ }
+ }
+ if (firstEntry) {
+ throw new Error('taskFunctions parameter object is empty')
+ }
+ } else {
+ this.taskFunctions.set(DEFAULT_FUNCTION_NAME, taskFunctions.bind(this))
}
}
* Worker message listener.
*
* @param message - Message received.
- * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
*/
- protected messageListener (
- message: MessageValue<Data, MainWorker>,
- fn: WorkerFunction<Data, Response>
- ): void {
+ protected messageListener (message: MessageValue<Data, MainWorker>): void {
if (message.id != null && message.data != null) {
// Task message received
- if (this.opts.async === true) {
+ const fn = this.getTaskFunction(message.name)
+ if (fn?.constructor.name === 'AsyncFunction') {
this.runInAsyncScope(this.runAsync.bind(this), this, fn, message)
} else {
- this.runInAsyncScope(this.run.bind(this), this, fn, message)
+ this.runInAsyncScope(this.runSync.bind(this), this, fn, message)
}
} else if (message.parent != null) {
// Main worker reference message received
* @param fn - Function that will be executed.
* @param message - Input data for the given function.
*/
- protected run (
+ protected runSync (
fn: WorkerSyncFunction<Data, Response>,
message: MessageValue<Data>
): void {
})
.catch(EMPTY_FUNCTION)
}
+
+ /**
+ * Gets the task function in the given scope.
+ *
+ * @param name - Name of the function that will be returned.
+ */
+ private getTaskFunction (name?: string): WorkerFunction<Data, Response> {
+ name = name ?? DEFAULT_FUNCTION_NAME
+ const fn = this.taskFunctions.get(name)
+ if (fn == null) {
+ throw new Error(`Task function "${name}" not found`)
+ }
+ return fn
+ }
}
import type { Worker } from 'node:cluster'
import cluster from 'node:cluster'
-import type { MessageValue, WorkerFunction } from '../utility-types'
+import type {
+ MessageValue,
+ TaskFunctions,
+ WorkerFunction
+} from '../utility-types'
import { AbstractWorker } from './abstract-worker'
import type { WorkerOptions } from './worker-options'
/**
* Constructs a new poolifier cluster worker.
*
- * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
+ * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked.
* @param opts - Options for the worker.
*/
public constructor (
- fn: WorkerFunction<Data, Response>,
+ taskFunctions:
+ | WorkerFunction<Data, Response>
+ | TaskFunctions<Data, Response>,
opts: WorkerOptions = {}
) {
super(
'worker-cluster-pool:poolifier',
cluster.isPrimary,
- fn,
+ taskFunctions,
cluster.worker,
opts
)
import type { MessagePort } from 'node:worker_threads'
import { isMainThread, parentPort } from 'node:worker_threads'
-import type { MessageValue, WorkerFunction } from '../utility-types'
+import type {
+ MessageValue,
+ TaskFunctions,
+ WorkerFunction
+} from '../utility-types'
import { AbstractWorker } from './abstract-worker'
import type { WorkerOptions } from './worker-options'
/**
* Constructs a new poolifier thread worker.
*
- * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
+ * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked.
* @param opts - Options for the worker.
*/
public constructor (
- fn: WorkerFunction<Data, Response>,
+ taskFunctions:
+ | WorkerFunction<Data, Response>
+ | TaskFunctions<Data, Response>,
opts: WorkerOptions = {}
) {
- super('worker-thread-pool:poolifier', isMainThread, fn, parentPort, opts)
+ super(
+ 'worker-thread-pool:poolifier',
+ isMainThread,
+ taskFunctions,
+ parentPort,
+ opts
+ )
}
/** @inheritDoc */
* Whether your worker will perform asynchronous or not.
*
* @defaultValue false
+ * @deprecated This option will be removed in the next major version.
*/
async?: boolean
/**
const { expect } = require('expect')
const {
+ DynamicClusterPool,
DynamicThreadPool,
FixedClusterPool,
FixedThreadPool,
expect(poolBusy).toBe(numberOfWorkers + 1)
await pool.destroy()
})
+
+ it('Verify that multiple tasks worker is working', async () => {
+ const pool = new DynamicClusterPool(
+ numberOfWorkers,
+ numberOfWorkers * 2,
+ './tests/worker-files/cluster/testMultiTasksWorker.js'
+ )
+ const data = { n: 10 }
+ const result0 = await pool.execute(data)
+ expect(result0).toBe(false)
+ const result1 = await pool.execute(data, 'jsonIntegerSerialization')
+ expect(result1).toBe(false)
+ const result2 = await pool.execute(data, 'factorial')
+ expect(result2).toBe(3628800)
+ const result3 = await pool.execute(data, 'fibonacci')
+ expect(result3).toBe(89)
+ })
})
let result = await pool.execute({
function: WorkerFunctions.fibonacci
})
- expect(result).toBe(false)
+ expect(result).toBe(121393)
result = await pool.execute({
function: WorkerFunctions.factorial
})
- expect(result).toBe(false)
+ expect(result).toBe(9.33262154439441e157)
})
it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
let result = await pool.execute({
function: WorkerFunctions.fibonacci
})
- expect(result).toBe(false)
+ expect(result).toBe(121393)
result = await pool.execute({
function: WorkerFunctions.factorial
})
- expect(result).toBe(false)
+ expect(result).toBe(9.33262154439441e157)
})
it('Verify that is possible to invoke the execute() method without input', async () => {
let result = await pool.execute({
function: WorkerFunctions.fibonacci
})
- expect(result).toBe(false)
+ expect(result).toBe(121393)
result = await pool.execute({
function: WorkerFunctions.factorial
})
- expect(result).toBe(false)
+ expect(result).toBe(9.33262154439441e157)
})
it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
let result = await pool.execute({
function: WorkerFunctions.fibonacci
})
- expect(result).toBe(false)
+ expect(result).toBe(121393)
result = await pool.execute({
function: WorkerFunctions.factorial
})
- expect(result).toBe(false)
+ expect(result).toBe(9.33262154439441e157)
})
it('Verify that is possible to invoke the execute() method without input', async () => {
--- /dev/null
+'use strict'
+const { isMaster } = require('cluster')
+const { ClusterWorker, KillBehaviors } = require('../../../lib')
+const {
+ jsonIntegerSerialization,
+ factorial,
+ fibonacci
+} = require('../../test-utils')
+
+module.exports = new ClusterWorker(
+ {
+ jsonIntegerSerialization: data => {
+ jsonIntegerSerialization(data.n)
+ return isMaster
+ },
+ factorial: data => factorial(data.n),
+ fibonacci: data => fibonacci(data.n)
+ },
+ {
+ maxInactiveTime: 500,
+ killBehavior: KillBehaviors.HARD
+ }
+)
function test (data) {
data = data || {}
data.function = data.function || WorkerFunctions.jsonIntegerSerialization
- TestUtils.executeWorkerFunction(data)
- return isMaster
+ const result = TestUtils.executeWorkerFunction(data)
+ if (result == null) {
+ return isMaster
+ }
+ return result
}
module.exports = new ClusterWorker(test, {
--- /dev/null
+'use strict'
+const { isMainThread } = require('worker_threads')
+const { ThreadWorker, KillBehaviors } = require('../../../lib')
+const {
+ jsonIntegerSerialization,
+ factorial,
+ fibonacci
+} = require('../../test-utils')
+
+module.exports = new ThreadWorker(
+ {
+ jsonIntegerSerialization: data => {
+ jsonIntegerSerialization(data.n)
+ return isMainThread
+ },
+ factorial: data => factorial(data.n),
+ fibonacci: data => fibonacci(data.n)
+ },
+ {
+ maxInactiveTime: 500,
+ killBehavior: KillBehaviors.HARD
+ }
+)
function test (data) {
data = data || {}
data.function = data.function || WorkerFunctions.jsonIntegerSerialization
- TestUtils.executeWorkerFunction(data)
- return isMainThread
+ const result = TestUtils.executeWorkerFunction(data)
+ if (result == null) {
+ return isMainThread
+ }
+ return result
}
module.exports = new ThreadWorker(test, {
expect(worker.opts.async).toBe(true)
})
- it('Verify that fn parameter is mandatory', () => {
- expect(() => new ClusterWorker()).toThrowError('fn parameter is mandatory')
+ it('Verify that taskFunctions parameter is mandatory', () => {
+ expect(() => new ClusterWorker()).toThrowError(
+ 'taskFunctions parameter is mandatory'
+ )
})
- it('Verify that fn parameter is a function', () => {
- expect(() => new ClusterWorker({})).toThrowError(
- new TypeError('fn parameter is not a function')
+ it('Verify that taskFunctions parameter is a function or an object', () => {
+ expect(() => new ClusterWorker(0)).toThrowError(
+ new TypeError('taskFunctions parameter is not a function or an object')
)
expect(() => new ClusterWorker('')).toThrowError(
- new TypeError('fn parameter is not a function')
+ new TypeError('taskFunctions parameter is not a function or an object')
+ )
+ expect(() => new ClusterWorker(true)).toThrowError(
+ new TypeError('taskFunctions parameter is not a function or an object')
)
})
- it('Verify that async fn parameter without async option throw error', () => {
- const fn = async () => {
- return new Promise()
- }
- expect(() => new ClusterWorker(fn)).toThrowError(
- 'fn parameter is an async function, please set the async option to true'
+ it('Verify that taskFunctions parameter is not an empty object literal', () => {
+ expect(() => new ClusterWorker([])).toThrowError(
+ new TypeError('taskFunctions parameter is not an object literal')
+ )
+ expect(() => new ClusterWorker(new Map())).toThrowError(
+ new TypeError('taskFunctions parameter is not an object literal')
+ )
+ expect(() => new ClusterWorker(new Set())).toThrowError(
+ new TypeError('taskFunctions parameter is not an object literal')
+ )
+ expect(() => new ClusterWorker(new WeakMap())).toThrowError(
+ new TypeError('taskFunctions parameter is not an object literal')
)
+ expect(() => new ClusterWorker(new WeakSet())).toThrowError(
+ new TypeError('taskFunctions parameter is not an object literal')
+ )
+ expect(() => new ClusterWorker({})).toThrowError(
+ new TypeError('taskFunctions parameter object is empty')
+ )
+ })
+
+ it('Verify that taskFunctions parameter with multiple task functions is taken', () => {
+ const fn1 = () => {
+ return 1
+ }
+ const fn2 = () => {
+ return 2
+ }
+ const worker = new ClusterWorker({ fn1, fn2 })
+ expect(typeof worker.taskFunctions.get('default') === 'function').toBe(true)
+ expect(typeof worker.taskFunctions.get('fn1') === 'function').toBe(true)
+ expect(typeof worker.taskFunctions.get('fn2') === 'function').toBe(true)
})
it('Verify that handleError() method is working properly', () => {