From 70a4f5ea122d3ddb2d2e0b245598ad241562e7f7 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 5 May 2023 13:53:31 +0200 Subject: [PATCH] test: add multi tasks worker MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- examples/multifunctionWorker.js | 4 ++-- src/pools/worker.ts | 2 +- src/utility-types.ts | 5 +++- src/worker/abstract-worker.ts | 4 ++-- tests/pools/abstract/abstract-pool.test.js | 16 +++++++++++++ tests/pools/cluster/dynamic.test.js | 4 ++-- tests/pools/cluster/fixed.test.js | 4 ++-- tests/pools/thread/dynamic.test.js | 4 ++-- tests/pools/thread/fixed.test.js | 4 ++-- .../cluster/testMultiTasksWorker.js | 23 +++++++++++++++++++ tests/worker-files/cluster/testWorker.js | 7 ++++-- .../thread/testMultiTasksWorker.js | 23 +++++++++++++++++++ tests/worker-files/thread/testWorker.js | 7 ++++-- 13 files changed, 89 insertions(+), 18 deletions(-) create mode 100644 tests/worker-files/cluster/testMultiTasksWorker.js create mode 100644 tests/worker-files/thread/testMultiTasksWorker.js diff --git a/examples/multifunctionWorker.js b/examples/multifunctionWorker.js index a38d8bb0..53217fa0 100644 --- a/examples/multifunctionWorker.js +++ b/examples/multifunctionWorker.js @@ -3,12 +3,12 @@ const { ThreadWorker } = require('poolifier') function fn0 (data) { console.log('Executing function 0') - return { data: 'fn0 your input was' + data.text } + return { data: 'fn0 your input text was' + data.text } } function fn1 (data) { console.log('Executing function 1') - return { data: 'fn1 your input was' + data.text } + return { data: 'fn1 your input text was' + data.text } } module.exports = new ThreadWorker({ fn0, fn1 }) diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 7bf86848..2db5f630 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -46,7 +46,7 @@ export interface Task { */ readonly data?: Data /** - * UUID of the message. + * Message UUID. */ readonly id?: string } diff --git a/src/utility-types.ts b/src/utility-types.ts index 54e66e1b..331085fe 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -48,6 +48,7 @@ export interface MessageValue< export type WorkerSyncFunction = ( data?: Data ) => Response + /** * Worker asynchronous function that can be executed. * This function must return a promise. @@ -58,6 +59,7 @@ export type WorkerSyncFunction = ( export type WorkerAsyncFunction = ( data?: Data ) => Promise + /** * Worker function that can be executed. * This function can be synchronous or asynchronous. @@ -68,8 +70,9 @@ export type WorkerAsyncFunction = ( export type WorkerFunction = | WorkerSyncFunction | WorkerAsyncFunction + /** - * Worker functions that can be executed object. + * Worker functions that can be executed. * This object can contain synchronous or asynchronous functions. * The key is the name of the function. * The value is the function itself. diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 2aca9e91..3ca52b69 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -148,7 +148,7 @@ export abstract class AbstractWorker< if (fn?.constructor.name === 'AsyncFunction') { this.runInAsyncScope(this.runAsync.bind(this), this, fn, message) } else { - this.runInAsyncScope(this.run.bind(this), this, fn, message) + this.runInAsyncScope(this.runSync.bind(this), this, fn, message) } } else if (message.parent != null) { // Main worker reference message received @@ -207,7 +207,7 @@ export abstract class AbstractWorker< * @param fn - Function that will be executed. * @param message - Input data for the given function. */ - protected run ( + protected runSync ( fn: WorkerSyncFunction, message: MessageValue ): void { diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 366d4965..972587b5 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -1,5 +1,6 @@ const { expect } = require('expect') const { + DynamicClusterPool, DynamicThreadPool, FixedClusterPool, FixedThreadPool, @@ -398,4 +399,19 @@ describe('Abstract pool test suite', () => { expect(poolBusy).toBe(numberOfWorkers + 1) await pool.destroy() }) + + it('Verify that multiple tasks worker is working', async () => { + const pool = new DynamicClusterPool( + numberOfWorkers, + numberOfWorkers * 2, + './tests/worker-files/cluster/testMultiTasksWorker.js' + ) + const data = { n: 10 } + const result1 = await pool.execute(data, 'jsonIntegerSerialization') + expect(result1).toBe(false) + const result2 = await pool.execute(data, 'factorial') + expect(result2).toBe(3628800) + const result3 = await pool.execute(data, 'fibonacci') + expect(result3).toBe(89) + }) }) diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js index e67d92bd..6d2bccad 100644 --- a/tests/pools/cluster/dynamic.test.js +++ b/tests/pools/cluster/dynamic.test.js @@ -19,11 +19,11 @@ describe('Dynamic cluster pool test suite', () => { let result = await pool.execute({ function: WorkerFunctions.fibonacci }) - expect(result).toBe(false) + expect(result).toBe(121393) result = await pool.execute({ function: WorkerFunctions.factorial }) - expect(result).toBe(false) + expect(result).toBe(9.33262154439441e157) }) it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => { diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index 59f76802..34c06610 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -65,11 +65,11 @@ describe('Fixed cluster pool test suite', () => { let result = await pool.execute({ function: WorkerFunctions.fibonacci }) - expect(result).toBe(false) + expect(result).toBe(121393) result = await pool.execute({ function: WorkerFunctions.factorial }) - expect(result).toBe(false) + expect(result).toBe(9.33262154439441e157) }) it('Verify that is possible to invoke the execute() method without input', async () => { diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index 3e5a2d35..95969390 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -19,11 +19,11 @@ describe('Dynamic thread pool test suite', () => { let result = await pool.execute({ function: WorkerFunctions.fibonacci }) - expect(result).toBe(false) + expect(result).toBe(121393) result = await pool.execute({ function: WorkerFunctions.factorial }) - expect(result).toBe(false) + expect(result).toBe(9.33262154439441e157) }) it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => { diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 9187b817..09c53ea4 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -65,11 +65,11 @@ describe('Fixed thread pool test suite', () => { let result = await pool.execute({ function: WorkerFunctions.fibonacci }) - expect(result).toBe(false) + expect(result).toBe(121393) result = await pool.execute({ function: WorkerFunctions.factorial }) - expect(result).toBe(false) + expect(result).toBe(9.33262154439441e157) }) it('Verify that is possible to invoke the execute() method without input', async () => { diff --git a/tests/worker-files/cluster/testMultiTasksWorker.js b/tests/worker-files/cluster/testMultiTasksWorker.js new file mode 100644 index 00000000..692a2e76 --- /dev/null +++ b/tests/worker-files/cluster/testMultiTasksWorker.js @@ -0,0 +1,23 @@ +'use strict' +const { isMaster } = require('cluster') +const { ClusterWorker, KillBehaviors } = require('../../../lib') +const { + jsonIntegerSerialization, + factorial, + fibonacci +} = require('../../test-utils') + +module.exports = new ClusterWorker( + { + jsonIntegerSerialization: data => { + jsonIntegerSerialization(data.n) + return isMaster + }, + factorial: data => factorial(data.n), + fibonacci: data => fibonacci(data.n) + }, + { + maxInactiveTime: 500, + killBehavior: KillBehaviors.HARD + } +) diff --git a/tests/worker-files/cluster/testWorker.js b/tests/worker-files/cluster/testWorker.js index 3f50d102..0d0611d0 100644 --- a/tests/worker-files/cluster/testWorker.js +++ b/tests/worker-files/cluster/testWorker.js @@ -7,8 +7,11 @@ const { WorkerFunctions } = require('../../test-types') function test (data) { data = data || {} data.function = data.function || WorkerFunctions.jsonIntegerSerialization - TestUtils.executeWorkerFunction(data) - return isMaster + const result = TestUtils.executeWorkerFunction(data) + if (result == null) { + return isMaster + } + return result } module.exports = new ClusterWorker(test, { diff --git a/tests/worker-files/thread/testMultiTasksWorker.js b/tests/worker-files/thread/testMultiTasksWorker.js new file mode 100644 index 00000000..da357a2a --- /dev/null +++ b/tests/worker-files/thread/testMultiTasksWorker.js @@ -0,0 +1,23 @@ +'use strict' +const { isMainThread } = require('worker_threads') +const { ThreadWorker, KillBehaviors } = require('../../../lib') +const { + jsonIntegerSerialization, + factorial, + fibonacci +} = require('../../test-utils') + +module.exports = new ThreadWorker( + { + jsonIntegerSerialization: data => { + jsonIntegerSerialization(data.n) + return isMainThread + }, + factorial: data => factorial(data.n), + fibonacci: data => fibonacci(data.n) + }, + { + maxInactiveTime: 500, + killBehavior: KillBehaviors.HARD + } +) diff --git a/tests/worker-files/thread/testWorker.js b/tests/worker-files/thread/testWorker.js index 177ef08b..668587db 100644 --- a/tests/worker-files/thread/testWorker.js +++ b/tests/worker-files/thread/testWorker.js @@ -7,8 +7,11 @@ const { WorkerFunctions } = require('../../test-types') function test (data) { data = data || {} data.function = data.function || WorkerFunctions.jsonIntegerSerialization - TestUtils.executeWorkerFunction(data) - return isMainThread + const result = TestUtils.executeWorkerFunction(data) + if (result == null) { + return isMainThread + } + return result } module.exports = new ThreadWorker(test, { -- 2.34.1