From: Jérôme Benoit Date: Fri, 5 May 2023 14:44:07 +0000 (+0200) Subject: Merge pull request #747 from poolifier/multiple-functions X-Git-Tag: v2.4.12~15 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=feec4213515cceb460e722dedca403914764b487;hp=1e8c193fa4e519921b6db1b0d9d443cb15ecf12a;p=poolifier.git Merge pull request #747 from poolifier/multiple-functions --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d11d6f2..b3ebe01c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Support multiple task functions per worker. + ### Changed - Use O(1) queue implementation for tasks queueing. diff --git a/README.md b/README.md index 857d9c26..7767c75c 100644 --- a/README.md +++ b/README.md @@ -139,7 +139,7 @@ pool.execute({}).then(res => { You can do the same with the classes ClusterWorker, FixedClusterPool and DynamicClusterPool. -**See examples folder for more details (in particular if you want to use a pool for [multiple functions](./examples/multiFunctionExample.js))**. +**See examples folder for more details (in particular if you want to use a pool with [multiple worker functions](./examples/multiFunctionExample.js))**. **Now TypeScript is also supported, find how to use it into the example folder**. Remember that workers can only send and receive serializable data. @@ -209,7 +209,7 @@ This method will call the terminate method on each worker. ### `class YourWorker extends ThreadWorker/ClusterWorker` -`fn` (mandatory) The function that you want to execute on the worker +`taskFunctions` (mandatory) The task function(s) that you want to execute on the worker `opts` (optional) An object with these properties: - `maxInactiveTime` (optional) - Max time to wait tasks to work on in milliseconds, after this period the new worker will die. diff --git a/examples/multiFunctionExample.js b/examples/multiFunctionExample.js index 31fd8809..213736ea 100644 --- a/examples/multiFunctionExample.js +++ b/examples/multiFunctionExample.js @@ -5,11 +5,11 @@ const pool = new FixedThreadPool(15, './multiFunctionWorker.js', { }) pool - .execute({ functionName: 'fn0', input: 'hello' }) + .execute({ text: 'hello' }, 'fn0') .then(res => console.log(res)) .catch(err => console.error(err)) pool - .execute({ functionName: 'fn1', input: 'multiple functions' }) + .execute({ text: 'multiple functions' }, 'fn1') .then(res => console.log(res)) .catch(err => console.error(err)) diff --git a/examples/multifunctionWorker.js b/examples/multifunctionWorker.js index 61369cfa..53217fa0 100644 --- a/examples/multifunctionWorker.js +++ b/examples/multifunctionWorker.js @@ -1,14 +1,14 @@ 'use strict' const { ThreadWorker } = require('poolifier') -function yourFunction (data) { - if (data.functionName === 'fn0') { - console.log('Executing function 0') - return { data: '0 your input was' + data.input } - } else if (data.functionName === 'fn1') { - console.log('Executing function 1') - return { data: '1 your input was' + data.input } - } +function fn0 (data) { + console.log('Executing function 0') + return { data: 'fn0 your input text was' + data.text } } -module.exports = new ThreadWorker(yourFunction) +function fn1 (data) { + console.log('Executing function 1') + return { data: 'fn1 your input text was' + data.text } +} + +module.exports = new ThreadWorker({ fn0, fn1 }) diff --git a/src/index.ts b/src/index.ts index df5358ae..03ee2809 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,8 +6,8 @@ export { PoolEvents } from './pools/pool' export type { IPool, PoolEmitter, - PoolOptions, PoolEvent, + PoolOptions, PoolType, TasksQueueOptions } from './pools/pool' @@ -39,8 +39,9 @@ export { KillBehaviors } from './worker/worker-options' export type { KillBehavior, WorkerOptions } from './worker/worker-options' export type { Draft, - PromiseResponseWrapper, MessageValue, + PromiseResponseWrapper, + TaskFunctions, WorkerAsyncFunction, WorkerFunction, WorkerSyncFunction diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 06cdc3cd..edf2b29a 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -305,9 +305,10 @@ export abstract class AbstractPool< } /** @inheritDoc */ - public async execute (data?: Data): Promise { + public async execute (data?: Data, name?: string): Promise { const [workerNodeKey, workerNode] = this.chooseWorkerNode() const submittedTask: Task = { + name, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), id: crypto.randomUUID() diff --git a/src/pools/pool.ts b/src/pools/pool.ts index f6188d8e..992aaaf1 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -144,9 +144,10 @@ export interface IPool< * Executes the function specified in the worker constructor with the task data input parameter. * * @param data - The task input data for the specified worker function. This can only be serializable data. + * @param name - The name of the worker function to execute. If not specified, the default worker function will be executed. * @returns Promise that will be fulfilled when the task is completed. */ - execute: (data?: Data) => Promise + execute: (data?: Data, name?: string) => Promise /** * Shutdowns every current worker in this pool. */ diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 716376fa..2db5f630 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -37,12 +37,16 @@ export type ExitHandler = ( * @internal */ export interface Task { + /** + * Task name. + */ + readonly name?: string /** * Task input data that will be passed to the worker. */ readonly data?: Data /** - * UUID of the message. + * Message UUID. */ readonly id?: string } diff --git a/src/utility-types.ts b/src/utility-types.ts index ea90afd6..331085fe 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -48,6 +48,7 @@ export interface MessageValue< export type WorkerSyncFunction = ( data?: Data ) => Response + /** * Worker asynchronous function that can be executed. * This function must return a promise. @@ -58,6 +59,7 @@ export type WorkerSyncFunction = ( export type WorkerAsyncFunction = ( data?: Data ) => Promise + /** * Worker function that can be executed. * This function can be synchronous or asynchronous. @@ -69,6 +71,20 @@ export type WorkerFunction = | WorkerSyncFunction | WorkerAsyncFunction +/** + * Worker functions that can be executed. + * This object can contain synchronous or asynchronous functions. + * The key is the name of the function. + * The value is the function itself. + * + * @typeParam Data - Type of data sent to the worker. This can only be serializable data. + * @typeParam Response - Type of execution response. This can only be serializable data. + */ +export type TaskFunctions = Record< +string, +WorkerFunction +> + /** * An object holding the execution response promise resolve/reject callbacks. * diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index fbf99ab8..8b461f7d 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -3,6 +3,7 @@ import type { Worker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' import type { MessageValue, + TaskFunctions, WorkerAsyncFunction, WorkerFunction, WorkerSyncFunction @@ -11,6 +12,7 @@ import { EMPTY_FUNCTION } from '../utils' import type { KillBehavior, WorkerOptions } from './worker-options' import { KillBehaviors } from './worker-options' +const DEFAULT_FUNCTION_NAME = 'default' const DEFAULT_MAX_INACTIVE_TIME = 60000 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT @@ -26,6 +28,10 @@ export abstract class AbstractWorker< Data = unknown, Response = unknown > extends AsyncResource { + /** + * Task function(s) processed by the worker when the pool's `execution` function is invoked. + */ + protected taskFunctions!: Map> /** * Timestamp of the last task processed by this worker. */ @@ -39,14 +45,16 @@ export abstract class AbstractWorker< * * @param type - The type of async event. * @param isMain - Whether this is the main worker or not. - * @param fn - Function processed by the worker when the pool's `execution` function is invoked. + * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function. * @param mainWorker - Reference to main worker. * @param opts - Options for the worker. */ public constructor ( type: string, protected readonly isMain: boolean, - fn: WorkerFunction, + taskFunctions: + | WorkerFunction + | TaskFunctions, protected mainWorker: MainWorker | undefined | null, protected readonly opts: WorkerOptions = { /** @@ -62,7 +70,7 @@ export abstract class AbstractWorker< ) { super(type) this.checkWorkerOptions(this.opts) - this.checkFunctionInput(fn) + this.checkTaskFunctions(taskFunctions) if (!this.isMain) { this.lastTaskTimestamp = performance.now() this.aliveInterval = setInterval( @@ -72,12 +80,7 @@ export abstract class AbstractWorker< this.checkAlive.bind(this)() } - this.mainWorker?.on( - 'message', - (message: MessageValue) => { - this.messageListener(message, fn) - } - ) + this.mainWorker?.on('message', this.messageListener.bind(this)) } private checkWorkerOptions (opts: WorkerOptions): void { @@ -88,19 +91,51 @@ export abstract class AbstractWorker< } /** - * Checks if the `fn` parameter is passed to the constructor. + * Checks if the `taskFunctions` parameter is passed to the constructor. * - * @param fn - The function that should be defined. + * @param taskFunctions - The task function(s) parameter that should be checked. */ - private checkFunctionInput (fn: WorkerFunction): void { - if (fn == null) throw new Error('fn parameter is mandatory') - if (typeof fn !== 'function') { - throw new TypeError('fn parameter is not a function') + private checkTaskFunctions ( + taskFunctions: + | WorkerFunction + | TaskFunctions + ): void { + if (taskFunctions == null) { + throw new Error('taskFunctions parameter is mandatory') } - if (fn.constructor.name === 'AsyncFunction' && this.opts.async === false) { - throw new Error( - 'fn parameter is an async function, please set the async option to true' - ) + if ( + typeof taskFunctions !== 'function' && + typeof taskFunctions !== 'object' + ) { + throw new Error('taskFunctions parameter is not a function or an object') + } + if ( + typeof taskFunctions === 'object' && + taskFunctions.constructor !== Object && + Object.prototype.toString.call(taskFunctions) !== '[object Object]' + ) { + throw new Error('taskFunctions parameter is not an object literal') + } + this.taskFunctions = new Map>() + if (typeof taskFunctions !== 'function') { + let firstEntry = true + for (const [name, fn] of Object.entries(taskFunctions)) { + if (typeof fn !== 'function') { + throw new Error( + 'A taskFunctions parameter object value is not a function' + ) + } + this.taskFunctions.set(name, fn.bind(this)) + if (firstEntry) { + this.taskFunctions.set(DEFAULT_FUNCTION_NAME, fn.bind(this)) + firstEntry = false + } + } + if (firstEntry) { + throw new Error('taskFunctions parameter object is empty') + } + } else { + this.taskFunctions.set(DEFAULT_FUNCTION_NAME, taskFunctions.bind(this)) } } @@ -108,18 +143,15 @@ export abstract class AbstractWorker< * Worker message listener. * * @param message - Message received. - * @param fn - Function processed by the worker when the pool's `execution` function is invoked. */ - protected messageListener ( - message: MessageValue, - fn: WorkerFunction - ): void { + protected messageListener (message: MessageValue): void { if (message.id != null && message.data != null) { // Task message received - if (this.opts.async === true) { + const fn = this.getTaskFunction(message.name) + if (fn?.constructor.name === 'AsyncFunction') { this.runInAsyncScope(this.runAsync.bind(this), this, fn, message) } else { - this.runInAsyncScope(this.run.bind(this), this, fn, message) + this.runInAsyncScope(this.runSync.bind(this), this, fn, message) } } else if (message.parent != null) { // Main worker reference message received @@ -178,7 +210,7 @@ export abstract class AbstractWorker< * @param fn - Function that will be executed. * @param message - Input data for the given function. */ - protected run ( + protected runSync ( fn: WorkerSyncFunction, message: MessageValue ): void { @@ -229,4 +261,18 @@ export abstract class AbstractWorker< }) .catch(EMPTY_FUNCTION) } + + /** + * Gets the task function in the given scope. + * + * @param name - Name of the function that will be returned. + */ + private getTaskFunction (name?: string): WorkerFunction { + name = name ?? DEFAULT_FUNCTION_NAME + const fn = this.taskFunctions.get(name) + if (fn == null) { + throw new Error(`Task function "${name}" not found`) + } + return fn + } } diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 8725dc89..49b0f984 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -1,6 +1,10 @@ import type { Worker } from 'node:cluster' import cluster from 'node:cluster' -import type { MessageValue, WorkerFunction } from '../utility-types' +import type { + MessageValue, + TaskFunctions, + WorkerFunction +} from '../utility-types' import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' @@ -25,17 +29,19 @@ export class ClusterWorker< /** * Constructs a new poolifier cluster worker. * - * @param fn - Function processed by the worker when the pool's `execution` function is invoked. + * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. * @param opts - Options for the worker. */ public constructor ( - fn: WorkerFunction, + taskFunctions: + | WorkerFunction + | TaskFunctions, opts: WorkerOptions = {} ) { super( 'worker-cluster-pool:poolifier', cluster.isPrimary, - fn, + taskFunctions, cluster.worker, opts ) diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index 03566a4c..75cd9da5 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -1,6 +1,10 @@ import type { MessagePort } from 'node:worker_threads' import { isMainThread, parentPort } from 'node:worker_threads' -import type { MessageValue, WorkerFunction } from '../utility-types' +import type { + MessageValue, + TaskFunctions, + WorkerFunction +} from '../utility-types' import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' @@ -25,14 +29,22 @@ export class ThreadWorker< /** * Constructs a new poolifier thread worker. * - * @param fn - Function processed by the worker when the pool's `execution` function is invoked. + * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. * @param opts - Options for the worker. */ public constructor ( - fn: WorkerFunction, + taskFunctions: + | WorkerFunction + | TaskFunctions, opts: WorkerOptions = {} ) { - super('worker-thread-pool:poolifier', isMainThread, fn, parentPort, opts) + super( + 'worker-thread-pool:poolifier', + isMainThread, + taskFunctions, + parentPort, + opts + ) } /** @inheritDoc */ diff --git a/src/worker/worker-options.ts b/src/worker/worker-options.ts index 9ef50ade..791f915d 100644 --- a/src/worker/worker-options.ts +++ b/src/worker/worker-options.ts @@ -53,6 +53,7 @@ export interface WorkerOptions { * Whether your worker will perform asynchronous or not. * * @defaultValue false + * @deprecated This option will be removed in the next major version. */ async?: boolean /** diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 366d4965..81ae3649 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -1,5 +1,6 @@ const { expect } = require('expect') const { + DynamicClusterPool, DynamicThreadPool, FixedClusterPool, FixedThreadPool, @@ -398,4 +399,21 @@ describe('Abstract pool test suite', () => { expect(poolBusy).toBe(numberOfWorkers + 1) await pool.destroy() }) + + it('Verify that multiple tasks worker is working', async () => { + const pool = new DynamicClusterPool( + numberOfWorkers, + numberOfWorkers * 2, + './tests/worker-files/cluster/testMultiTasksWorker.js' + ) + const data = { n: 10 } + const result0 = await pool.execute(data) + expect(result0).toBe(false) + const result1 = await pool.execute(data, 'jsonIntegerSerialization') + expect(result1).toBe(false) + const result2 = await pool.execute(data, 'factorial') + expect(result2).toBe(3628800) + const result3 = await pool.execute(data, 'fibonacci') + expect(result3).toBe(89) + }) }) diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js index e67d92bd..6d2bccad 100644 --- a/tests/pools/cluster/dynamic.test.js +++ b/tests/pools/cluster/dynamic.test.js @@ -19,11 +19,11 @@ describe('Dynamic cluster pool test suite', () => { let result = await pool.execute({ function: WorkerFunctions.fibonacci }) - expect(result).toBe(false) + expect(result).toBe(121393) result = await pool.execute({ function: WorkerFunctions.factorial }) - expect(result).toBe(false) + expect(result).toBe(9.33262154439441e157) }) it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => { diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index 59f76802..34c06610 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -65,11 +65,11 @@ describe('Fixed cluster pool test suite', () => { let result = await pool.execute({ function: WorkerFunctions.fibonacci }) - expect(result).toBe(false) + expect(result).toBe(121393) result = await pool.execute({ function: WorkerFunctions.factorial }) - expect(result).toBe(false) + expect(result).toBe(9.33262154439441e157) }) it('Verify that is possible to invoke the execute() method without input', async () => { diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index 3e5a2d35..95969390 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -19,11 +19,11 @@ describe('Dynamic thread pool test suite', () => { let result = await pool.execute({ function: WorkerFunctions.fibonacci }) - expect(result).toBe(false) + expect(result).toBe(121393) result = await pool.execute({ function: WorkerFunctions.factorial }) - expect(result).toBe(false) + expect(result).toBe(9.33262154439441e157) }) it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => { diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 9187b817..09c53ea4 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -65,11 +65,11 @@ describe('Fixed thread pool test suite', () => { let result = await pool.execute({ function: WorkerFunctions.fibonacci }) - expect(result).toBe(false) + expect(result).toBe(121393) result = await pool.execute({ function: WorkerFunctions.factorial }) - expect(result).toBe(false) + expect(result).toBe(9.33262154439441e157) }) it('Verify that is possible to invoke the execute() method without input', async () => { diff --git a/tests/worker-files/cluster/testMultiTasksWorker.js b/tests/worker-files/cluster/testMultiTasksWorker.js new file mode 100644 index 00000000..692a2e76 --- /dev/null +++ b/tests/worker-files/cluster/testMultiTasksWorker.js @@ -0,0 +1,23 @@ +'use strict' +const { isMaster } = require('cluster') +const { ClusterWorker, KillBehaviors } = require('../../../lib') +const { + jsonIntegerSerialization, + factorial, + fibonacci +} = require('../../test-utils') + +module.exports = new ClusterWorker( + { + jsonIntegerSerialization: data => { + jsonIntegerSerialization(data.n) + return isMaster + }, + factorial: data => factorial(data.n), + fibonacci: data => fibonacci(data.n) + }, + { + maxInactiveTime: 500, + killBehavior: KillBehaviors.HARD + } +) diff --git a/tests/worker-files/cluster/testWorker.js b/tests/worker-files/cluster/testWorker.js index 3f50d102..0d0611d0 100644 --- a/tests/worker-files/cluster/testWorker.js +++ b/tests/worker-files/cluster/testWorker.js @@ -7,8 +7,11 @@ const { WorkerFunctions } = require('../../test-types') function test (data) { data = data || {} data.function = data.function || WorkerFunctions.jsonIntegerSerialization - TestUtils.executeWorkerFunction(data) - return isMaster + const result = TestUtils.executeWorkerFunction(data) + if (result == null) { + return isMaster + } + return result } module.exports = new ClusterWorker(test, { diff --git a/tests/worker-files/thread/testMultiTasksWorker.js b/tests/worker-files/thread/testMultiTasksWorker.js new file mode 100644 index 00000000..da357a2a --- /dev/null +++ b/tests/worker-files/thread/testMultiTasksWorker.js @@ -0,0 +1,23 @@ +'use strict' +const { isMainThread } = require('worker_threads') +const { ThreadWorker, KillBehaviors } = require('../../../lib') +const { + jsonIntegerSerialization, + factorial, + fibonacci +} = require('../../test-utils') + +module.exports = new ThreadWorker( + { + jsonIntegerSerialization: data => { + jsonIntegerSerialization(data.n) + return isMainThread + }, + factorial: data => factorial(data.n), + fibonacci: data => fibonacci(data.n) + }, + { + maxInactiveTime: 500, + killBehavior: KillBehaviors.HARD + } +) diff --git a/tests/worker-files/thread/testWorker.js b/tests/worker-files/thread/testWorker.js index 177ef08b..668587db 100644 --- a/tests/worker-files/thread/testWorker.js +++ b/tests/worker-files/thread/testWorker.js @@ -7,8 +7,11 @@ const { WorkerFunctions } = require('../../test-types') function test (data) { data = data || {} data.function = data.function || WorkerFunctions.jsonIntegerSerialization - TestUtils.executeWorkerFunction(data) - return isMainThread + const result = TestUtils.executeWorkerFunction(data) + if (result == null) { + return isMainThread + } + return result } module.exports = new ThreadWorker(test, { diff --git a/tests/worker/abstract-worker.test.js b/tests/worker/abstract-worker.test.js index b62ba942..7f05cbb7 100644 --- a/tests/worker/abstract-worker.test.js +++ b/tests/worker/abstract-worker.test.js @@ -27,26 +27,56 @@ describe('Abstract worker test suite', () => { expect(worker.opts.async).toBe(true) }) - it('Verify that fn parameter is mandatory', () => { - expect(() => new ClusterWorker()).toThrowError('fn parameter is mandatory') + it('Verify that taskFunctions parameter is mandatory', () => { + expect(() => new ClusterWorker()).toThrowError( + 'taskFunctions parameter is mandatory' + ) }) - it('Verify that fn parameter is a function', () => { - expect(() => new ClusterWorker({})).toThrowError( - new TypeError('fn parameter is not a function') + it('Verify that taskFunctions parameter is a function or an object', () => { + expect(() => new ClusterWorker(0)).toThrowError( + new TypeError('taskFunctions parameter is not a function or an object') ) expect(() => new ClusterWorker('')).toThrowError( - new TypeError('fn parameter is not a function') + new TypeError('taskFunctions parameter is not a function or an object') + ) + expect(() => new ClusterWorker(true)).toThrowError( + new TypeError('taskFunctions parameter is not a function or an object') ) }) - it('Verify that async fn parameter without async option throw error', () => { - const fn = async () => { - return new Promise() - } - expect(() => new ClusterWorker(fn)).toThrowError( - 'fn parameter is an async function, please set the async option to true' + it('Verify that taskFunctions parameter is not an empty object literal', () => { + expect(() => new ClusterWorker([])).toThrowError( + new TypeError('taskFunctions parameter is not an object literal') + ) + expect(() => new ClusterWorker(new Map())).toThrowError( + new TypeError('taskFunctions parameter is not an object literal') + ) + expect(() => new ClusterWorker(new Set())).toThrowError( + new TypeError('taskFunctions parameter is not an object literal') + ) + expect(() => new ClusterWorker(new WeakMap())).toThrowError( + new TypeError('taskFunctions parameter is not an object literal') ) + expect(() => new ClusterWorker(new WeakSet())).toThrowError( + new TypeError('taskFunctions parameter is not an object literal') + ) + expect(() => new ClusterWorker({})).toThrowError( + new TypeError('taskFunctions parameter object is empty') + ) + }) + + it('Verify that taskFunctions parameter with multiple task functions is taken', () => { + const fn1 = () => { + return 1 + } + const fn2 = () => { + return 2 + } + const worker = new ClusterWorker({ fn1, fn2 }) + expect(typeof worker.taskFunctions.get('default') === 'function').toBe(true) + expect(typeof worker.taskFunctions.get('fn1') === 'function').toBe(true) + expect(typeof worker.taskFunctions.get('fn2') === 'function').toBe(true) }) it('Verify that handleError() method is working properly', () => {