const messageId = ++this.nextMessageId
const res = this.internalExecute(worker, messageId)
this.checkAndEmitBusy()
- this.sendToWorker(worker, { data: data ?? ({} as Data), id: messageId })
+ data = data ?? ({} as Data)
+ this.sendToWorker(worker, { data, id: messageId })
return res
}
* Options for the worker.
*/
public readonly opts: WorkerOptions
-
/**
* Constructs a new poolifier worker.
*
}
this.mainWorker?.on('message', (value: MessageValue<Data, MainWorker>) => {
- if (value?.data && value.id) {
- // Here you will receive messages
- if (this.opts.async) {
- this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
- } else {
- this.runInAsyncScope(this.run.bind(this), this, fn, value)
- }
- } else if (value.parent) {
- // Save a reference of the main worker to communicate with it
- // This will be received once
- this.mainWorker = value.parent
- } else if (value.kill) {
- // Here is time to kill this worker, just clearing the interval
- if (this.aliveInterval) clearInterval(this.aliveInterval)
- this.emitDestroy()
- }
+ this.messageListener(value, fn)
})
}
+ protected messageListener (
+ value: MessageValue<Data, MainWorker>,
+ fn: (data: Data) => Response
+ ): void {
+ if (value.data !== undefined && value.id !== undefined) {
+ // Here you will receive messages
+ if (this.opts.async) {
+ this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
+ } else {
+ this.runInAsyncScope(this.run.bind(this), this, fn, value)
+ }
+ } else if (value.parent !== undefined) {
+ // Save a reference of the main worker to communicate with it
+ // This will be received once
+ this.mainWorker = value.parent
+ } else if (value.kill !== undefined) {
+ // Here is time to kill this worker, just clearing the interval
+ if (this.aliveInterval) clearInterval(this.aliveInterval)
+ this.emitDestroy()
+ }
+ }
+
private checkWorkerOptions (opts: WorkerOptions) {
this.opts.killBehavior = opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR
this.opts.maxInactiveTime =
pool.destroy()
})
- it("Verify that pool event emitter 'busy' event can register a callback", () => {
+ it("Verify that pool event emitter 'busy' event can register a callback", async () => {
const pool = new FixedThreadPool(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.js'
for (let i = 0; i < numberOfWorkers * 2; i++) {
promises.push(pool.execute({ test: 'test' }))
}
+ await Promise.all(promises)
// The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
// So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
expect(poolBusy).toBe(numberOfWorkers + 1)
})
it('Shutdown test', async () => {
- let closedThreads = 0
- pool.workers.forEach(w => {
- w.on('exit', () => {
- closedThreads++
- })
- })
+ const exitPromise = TestUtils.waitExits(pool, min)
await pool.destroy()
- expect(closedThreads).toBe(min)
+ const numberOfExitEvents = await exitPromise
+ expect(numberOfExitEvents).toBe(min)
})
it('Validation of inputs test', () => {
}
static async workerSleepFunction (data, ms) {
- return new Promise((resolve, reject) => {
+ return new Promise(resolve => {
setTimeout(() => resolve(data), ms)
})
}
const TestUtils = require('../../test-utils')
async function sleep (data) {
- return TestUtils.workerSleepFunction(data, 2000)
+ return await TestUtils.workerSleepFunction(data, 2000)
}
module.exports = new ClusterWorker(sleep, {
const TestUtils = require('../../test-utils')
async function sleep (data) {
- return TestUtils.workerSleepFunction(data, 50000)
+ return await TestUtils.workerSleepFunction(data, 50000)
}
module.exports = new ClusterWorker(sleep, {
const TestUtils = require('../../test-utils')
async function sleep (data) {
- return TestUtils.workerSleepFunction(data, 50000)
+ return await TestUtils.workerSleepFunction(data, 50000)
}
module.exports = new ClusterWorker(sleep, {
const TestUtils = require('../../test-utils')
async function sleep (data) {
- return TestUtils.workerSleepFunction(data, 2000)
+ return await TestUtils.workerSleepFunction(data, 2000)
}
module.exports = new ThreadWorker(sleep, {
const TestUtils = require('../../test-utils')
async function sleep (data) {
- return TestUtils.workerSleepFunction(data, 50000)
+ return await TestUtils.workerSleepFunction(data, 50000)
}
module.exports = new ThreadWorker(sleep, {
'use strict'
const { ThreadWorker } = require('../../../lib/index')
+const TestUtils = require('../../test-utils')
async function sleep (data) {
- return new Promise((resolve, reject) => {
- setTimeout(() => resolve(data), 50000)
- })
+ return await TestUtils.workerSleepFunction(data, 50000)
}
module.exports = new ThreadWorker(sleep, {
'use strict'
const { ThreadWorker, KillBehaviors } = require('../../../lib/index')
const { isMainThread } = require('worker_threads')
+const TestUtils = require('../../test-utils')
function test (data) {
- for (let i = 0; i < 50; i++) {
- const o = {
- a: i
- }
- JSON.stringify(o)
- }
+ TestUtils.jsonIntegerSerialization(50)
return isMainThread
}