function fn0 (data) {
console.log('Executing function 0')
- return { data: 'fn0 your input was' + data.text }
+ return { data: 'fn0 your input text was' + data.text }
}
function fn1 (data) {
console.log('Executing function 1')
- return { data: 'fn1 your input was' + data.text }
+ return { data: 'fn1 your input text was' + data.text }
}
module.exports = new ThreadWorker({ fn0, fn1 })
*/
readonly data?: Data
/**
- * UUID of the message.
+ * Message UUID.
*/
readonly id?: string
}
export type WorkerSyncFunction<Data = unknown, Response = unknown> = (
data?: Data
) => Response
+
/**
* Worker asynchronous function that can be executed.
* This function must return a promise.
export type WorkerAsyncFunction<Data = unknown, Response = unknown> = (
data?: Data
) => Promise<Response>
+
/**
* Worker function that can be executed.
* This function can be synchronous or asynchronous.
export type WorkerFunction<Data = unknown, Response = unknown> =
| WorkerSyncFunction<Data, Response>
| WorkerAsyncFunction<Data, Response>
+
/**
- * Worker functions that can be executed object.
+ * Worker functions that can be executed.
* This object can contain synchronous or asynchronous functions.
* The key is the name of the function.
* The value is the function itself.
if (fn?.constructor.name === 'AsyncFunction') {
this.runInAsyncScope(this.runAsync.bind(this), this, fn, message)
} else {
- this.runInAsyncScope(this.run.bind(this), this, fn, message)
+ this.runInAsyncScope(this.runSync.bind(this), this, fn, message)
}
} else if (message.parent != null) {
// Main worker reference message received
* @param fn - Function that will be executed.
* @param message - Input data for the given function.
*/
- protected run (
+ protected runSync (
fn: WorkerSyncFunction<Data, Response>,
message: MessageValue<Data>
): void {
const { expect } = require('expect')
const {
+ DynamicClusterPool,
DynamicThreadPool,
FixedClusterPool,
FixedThreadPool,
expect(poolBusy).toBe(numberOfWorkers + 1)
await pool.destroy()
})
+
+ it('Verify that multiple tasks worker is working', async () => {
+ const pool = new DynamicClusterPool(
+ numberOfWorkers,
+ numberOfWorkers * 2,
+ './tests/worker-files/cluster/testMultiTasksWorker.js'
+ )
+ const data = { n: 10 }
+ const result1 = await pool.execute(data, 'jsonIntegerSerialization')
+ expect(result1).toBe(false)
+ const result2 = await pool.execute(data, 'factorial')
+ expect(result2).toBe(3628800)
+ const result3 = await pool.execute(data, 'fibonacci')
+ expect(result3).toBe(89)
+ })
})
let result = await pool.execute({
function: WorkerFunctions.fibonacci
})
- expect(result).toBe(false)
+ expect(result).toBe(121393)
result = await pool.execute({
function: WorkerFunctions.factorial
})
- expect(result).toBe(false)
+ expect(result).toBe(9.33262154439441e157)
})
it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
let result = await pool.execute({
function: WorkerFunctions.fibonacci
})
- expect(result).toBe(false)
+ expect(result).toBe(121393)
result = await pool.execute({
function: WorkerFunctions.factorial
})
- expect(result).toBe(false)
+ expect(result).toBe(9.33262154439441e157)
})
it('Verify that is possible to invoke the execute() method without input', async () => {
let result = await pool.execute({
function: WorkerFunctions.fibonacci
})
- expect(result).toBe(false)
+ expect(result).toBe(121393)
result = await pool.execute({
function: WorkerFunctions.factorial
})
- expect(result).toBe(false)
+ expect(result).toBe(9.33262154439441e157)
})
it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
let result = await pool.execute({
function: WorkerFunctions.fibonacci
})
- expect(result).toBe(false)
+ expect(result).toBe(121393)
result = await pool.execute({
function: WorkerFunctions.factorial
})
- expect(result).toBe(false)
+ expect(result).toBe(9.33262154439441e157)
})
it('Verify that is possible to invoke the execute() method without input', async () => {
--- /dev/null
+'use strict'
+const { isMaster } = require('cluster')
+const { ClusterWorker, KillBehaviors } = require('../../../lib')
+const {
+ jsonIntegerSerialization,
+ factorial,
+ fibonacci
+} = require('../../test-utils')
+
+module.exports = new ClusterWorker(
+ {
+ jsonIntegerSerialization: data => {
+ jsonIntegerSerialization(data.n)
+ return isMaster
+ },
+ factorial: data => factorial(data.n),
+ fibonacci: data => fibonacci(data.n)
+ },
+ {
+ maxInactiveTime: 500,
+ killBehavior: KillBehaviors.HARD
+ }
+)
function test (data) {
data = data || {}
data.function = data.function || WorkerFunctions.jsonIntegerSerialization
- TestUtils.executeWorkerFunction(data)
- return isMaster
+ const result = TestUtils.executeWorkerFunction(data)
+ if (result == null) {
+ return isMaster
+ }
+ return result
}
module.exports = new ClusterWorker(test, {
--- /dev/null
+'use strict'
+const { isMainThread } = require('worker_threads')
+const { ThreadWorker, KillBehaviors } = require('../../../lib')
+const {
+ jsonIntegerSerialization,
+ factorial,
+ fibonacci
+} = require('../../test-utils')
+
+module.exports = new ThreadWorker(
+ {
+ jsonIntegerSerialization: data => {
+ jsonIntegerSerialization(data.n)
+ return isMainThread
+ },
+ factorial: data => factorial(data.n),
+ fibonacci: data => fibonacci(data.n)
+ },
+ {
+ maxInactiveTime: 500,
+ killBehavior: KillBehaviors.HARD
+ }
+)
function test (data) {
data = data || {}
data.function = data.function || WorkerFunctions.jsonIntegerSerialization
- TestUtils.executeWorkerFunction(data)
- return isMainThread
+ const result = TestUtils.executeWorkerFunction(data)
+ if (result == null) {
+ return isMainThread
+ }
+ return result
}
module.exports = new ThreadWorker(test, {