export { ClusterWorker } from './worker/cluster-worker'
export { ThreadWorker } from './worker/thread-worker'
export type { WorkerOptions } from './worker/worker-options'
+export { killBehaviorEnumeration } from './worker/worker-options'
import type { JSONValue } from '../../utility-types'
import type { ClusterPoolOptions } from './fixed'
import { FixedClusterPool } from './fixed'
+import { killBehaviorEnumeration } from '../../worker/worker-options'
/**
* A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers.
const worker = this.createAndSetupWorker()
this.registerWorkerMessageListener<Data>(worker, message => {
const tasksInProgress = this.tasks.get(worker)
- if (message.kill && tasksInProgress === 0) {
+ const isKillBehaviorOptionHard =
+ message.kill === killBehaviorEnumeration.HARD
+ if (isKillBehaviorOptionHard || tasksInProgress === 0) {
// Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- // To handle the case of a long-running task we will check if there is any active task
this.sendToWorker(worker, { kill: 1 })
void this.destroyWorker(worker)
}
import type { PoolOptions } from '../abstract-pool'
import type { ThreadWorkerWithMessageChannel } from './fixed'
import { FixedThreadPool } from './fixed'
+import { killBehaviorEnumeration } from '../../worker/worker-options'
/**
* A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads.
const worker = this.createAndSetupWorker()
this.registerWorkerMessageListener<Data>(worker, message => {
const tasksInProgress = this.tasks.get(worker)
- if (message.kill && tasksInProgress === 0) {
+ const isKillBehaviorOptionHard =
+ message.kill === killBehaviorEnumeration.HARD
+ if (isKillBehaviorOptionHard || tasksInProgress === 0) {
// Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- // To handle the case of a long-running task we will check if there is any active task
this.sendToWorker(worker, { kill: 1 })
void this.destroyWorker(worker)
}
/**
* Kill code.
*/
- readonly kill?: number
+ readonly kill?: string | number
/**
* Error.
*/
import type { MessagePort } from 'worker_threads'
import type { MessageValue } from '../utility-types'
import type { WorkerOptions } from './worker-options'
+import { killBehaviorEnumeration } from './worker-options'
+
+const defaultMaxInactiveTime = 1000 * 60
+const defaultKillBehavior = killBehaviorEnumeration.SOFT
/**
* Base class containing some shared logic for all poolifier workers.
* The maximum time to keep this worker alive while idle. The pool automatically checks and terminates this worker when the time expires.
*/
protected readonly maxInactiveTime: number
+ /**
+ * The kill behavior set as option on the Worker constructor or a default value.
+ */
+ protected readonly killBehavior: string
/**
* Whether the worker is working asynchronously or not.
*/
isMain: boolean,
fn: (data: Data) => Response,
protected mainWorker?: MainWorker | null,
- public readonly opts: WorkerOptions = {}
+ public readonly opts: WorkerOptions = {
+ killBehavior: defaultKillBehavior,
+ maxInactiveTime: defaultMaxInactiveTime
+ }
) {
super(type)
-
- this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
+ this.killBehavior = this.opts.killBehavior ?? defaultKillBehavior
+ this.maxInactiveTime = this.opts.maxInactiveTime ?? defaultMaxInactiveTime
this.async = !!this.opts.async
this.lastTask = Date.now()
if (!fn) throw new Error('fn parameter is mandatory')
*/
protected checkAlive (): void {
if (Date.now() - this.lastTask > this.maxInactiveTime) {
- this.sendToMainWorker({ kill: 1 })
+ this.sendToMainWorker({ kill: this.killBehavior })
}
}
+/**
+ * Kill behavior enumeration
+ */
+export const killBehaviorEnumeration = Object.freeze({
+ SOFT: 'SOFT',
+ HARD: 'HARD'
+})
+
/**
* Options for workers.
*/
export interface WorkerOptions {
/**
* Maximum waiting time in milliseconds for tasks.
- *
* After this time, newly created workers will be terminated.
+ * The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task.
+ * If killBehavior is set to HARD this value represents also the timeout for the tasks that you submit to the pool,
+ * when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool.
+ * If killBehavior is set to SOFT your tasks have no timeout and your workers will not be terminated until your task is finished.
*
* @default 60.000 ms
*/
* @default false
*/
async?: boolean
+ /**
+ * killBehavior dictates if your async unit ( worker/process ) will be deleted in case that a task is active on it.
+ * SOFT: If current time - last active time is greater than maxInactiveTime option, but a task is still running then the worker will be not deleted.
+ * HARD: If last active time is greater than maxInactiveTime option, but a task is still running then the worker will be deleted.
+ * This option only apply to the newly created workers.
+ *
+ * @default SOFT
+ */
+ killBehavior?: string
}
expect(res).toBeFalsy()
})
- it('Verify scale processes up and down is working when long running task is used', async () => {
+ it('Verify scale processes up and down is working when long running task is used:hard', async () => {
const longRunningPool = new DynamicClusterPool(
min,
max,
- './tests/worker/cluster/longRunningWorker.js'
+ './tests/worker/cluster/longRunningWorkerHardBehavior.js'
+ )
+ expect(longRunningPool.workers.length).toBe(min)
+ for (let i = 0; i < max * 10; i++) {
+ longRunningPool.execute({ test: 'test' })
+ }
+ expect(longRunningPool.workers.length).toBe(max)
+ await new Promise(resolve => setTimeout(resolve, 3000))
+ // Here we expect the workers to be at the max size since that the task is still running
+ expect(longRunningPool.workers.length).toBe(min)
+ })
+
+ it('Verify scale processes up and down is working when long running task is used:soft', async () => {
+ const longRunningPool = new DynamicClusterPool(
+ min,
+ max,
+ './tests/worker/cluster/longRunningWorkerSoftBehavior.js'
)
expect(longRunningPool.workers.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
expect(res).toBeFalsy()
})
- it('Verify scale thread up and down is working when long running task is used', async () => {
+ it('Verify scale thread up and down is working when long running task is used:hard', async () => {
const longRunningPool = new DynamicThreadPool(
min,
max,
- './tests/worker/thread/longRunningWorker.js',
+ './tests/worker/thread/longRunningWorkerHardBehavior.js',
+ {
+ errorHandler: e => console.error(e),
+ onlineHandler: () => console.log('worker is online')
+ }
+ )
+ expect(longRunningPool.workers.length).toBe(min)
+ for (let i = 0; i < max * 10; i++) {
+ longRunningPool.execute({ test: 'test' })
+ }
+ expect(longRunningPool.workers.length).toBe(max)
+ await new Promise(resolve => setTimeout(resolve, 1000))
+ expect(longRunningPool.workers.length).toBe(min)
+ })
+
+ it('Verify scale thread up and down is working when long running task is used:soft', async () => {
+ const longRunningPool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker/thread/longRunningWorkerSoftBehavior.js',
{
errorHandler: e => console.error(e),
onlineHandler: () => console.log('worker is online')
'use strict'
-const { ClusterWorker } = require('../../../lib/index')
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
async function error (data) {
return new Promise((resolve, reject) => {
module.exports = new ClusterWorker(error, {
maxInactiveTime: 500,
- async: true
+ async: true,
+ killBehavior: killBehaviorEnumeration
})
'use strict'
-const { ClusterWorker } = require('../../../lib/index')
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
async function sleep (data) {
return new Promise((resolve, reject) => {
})
}
-module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true })
+module.exports = new ClusterWorker(sleep, {
+ maxInactiveTime: 500,
+ async: true,
+ killBehavior: killBehaviorEnumeration.HARD
+})
'use strict'
-const { ClusterWorker } = require('../../../lib/index')
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
function echo (data) {
return data
}
-module.exports = new ClusterWorker(echo, { maxInactiveTime: 500 })
+module.exports = new ClusterWorker(echo, {
+ maxInactiveTime: 500,
+ killBehavior: killBehaviorEnumeration.HARD
+})
'use strict'
-const { ClusterWorker } = require('../../../lib/index')
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
function test (data) {}
-module.exports = new ClusterWorker(test, { maxInactiveTime: 500 })
+module.exports = new ClusterWorker(test, {
+ maxInactiveTime: 500,
+ killBehavior: killBehaviorEnumeration.HARD
+})
'use strict'
-const { ClusterWorker } = require('../../../lib/index')
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
function error (data) {
throw new Error('Error Message from ClusterWorker')
module.exports = new ClusterWorker(error, {
maxInactiveTime: 500,
- async: false
+ async: false,
+ killBehavior: killBehaviorEnumeration
})
--- /dev/null
+'use strict'
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
+
+async function sleep (data) {
+ return new Promise((resolve, reject) => {
+ setTimeout(() => resolve(data), 50000)
+ })
+}
+
+module.exports = new ClusterWorker(sleep, {
+ maxInactiveTime: 500,
+ async: true,
+ killBehavior: killBehaviorEnumeration.HARD
+})
})
}
-module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true })
+module.exports = new ClusterWorker(sleep, {
+ maxInactiveTime: 500,
+ async: true
+})
'use strict'
-const { ClusterWorker } = require('../../../lib/index')
+const { ClusterWorker, killBehaviorEnumeration } = require('../../../lib/index')
const { isMaster } = require('cluster')
function test (data) {
return isMaster
}
-module.exports = new ClusterWorker(test, { maxInactiveTime: 500 })
+module.exports = new ClusterWorker(test, {
+ maxInactiveTime: 500,
+ killBehavior: killBehaviorEnumeration.HARD
+})
'use strict'
-const { ThreadWorker } = require('../../../lib/index')
+const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index')
async function sleep (data) {
return new Promise((resolve, reject) => {
})
}
-module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, async: true })
+module.exports = new ThreadWorker(sleep, {
+ maxInactiveTime: 500,
+ async: true,
+ killBehavior: killBehaviorEnumeration.HARD
+})
'use strict'
-const { ThreadWorker } = require('../../../lib/index')
+const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index')
function echo (data) {
return data
}
-module.exports = new ThreadWorker(echo, { maxInactiveTime: 500 })
+module.exports = new ThreadWorker(echo, {
+ maxInactiveTime: 500,
+ killBehavior: killBehaviorEnumeration.HARD
+})
'use strict'
-const { ThreadWorker } = require('../../../lib/index')
+const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index')
function test (data) {}
-module.exports = new ThreadWorker(test, { maxInactiveTime: 500 })
+module.exports = new ThreadWorker(test, {
+ maxInactiveTime: 500,
+ killBehavior: killBehaviorEnumeration.HARD
+})
'use strict'
-const { ThreadWorker } = require('../../../lib/index')
+const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index')
function error (data) {
throw new Error(data)
}
-module.exports = new ThreadWorker(error, { maxInactiveTime: 500 })
+module.exports = new ThreadWorker(error, {
+ maxInactiveTime: 500,
+ killBehavior: killBehaviorEnumeration.HARD
+})
--- /dev/null
+'use strict'
+const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index')
+
+async function sleep (data) {
+ return new Promise((resolve, reject) => {
+ setTimeout(() => resolve(data), 50000)
+ })
+}
+
+module.exports = new ThreadWorker(sleep, {
+ maxInactiveTime: 500,
+ async: true,
+ killBehavior: killBehaviorEnumeration.HARD
+})
})
}
-module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, async: true })
+module.exports = new ThreadWorker(sleep, {
+ maxInactiveTime: 500,
+ async: true
+})
'use strict'
-const { ThreadWorker } = require('../../../lib/index')
+const { ThreadWorker, killBehaviorEnumeration } = require('../../../lib/index')
const { isMainThread } = require('worker_threads')
function test (data) {
return isMainThread
}
-module.exports = new ThreadWorker(test, { maxInactiveTime: 500 })
+module.exports = new ThreadWorker(test, {
+ maxInactiveTime: 500,
+ killBehavior: killBehaviorEnumeration.HARD
+})