'poolify',
'readonly',
'resize',
- 'serializable',
'sinon',
'threadjs',
'threadwork',
"autobuild",
"Benoit",
"caffeinate",
+ "cloneable",
"codeql",
"commitlint",
"Dependabot",
"poolify",
"preinstall",
"Quadflieg",
- "serializable",
"Shinigami",
"sonarsource",
"suchmokuo",
#### New type definitions for input data and response
-For cluster worker and worker-thread pools, you can now only send and receive serializable data.
+For cluster worker and worker-thread pools, you can now only send and receive structured-cloneable data.
_This is not a limitation by poolifier but NodeJS._
#### Public property replacements
**See examples folder for more details (in particular if you want to use a pool with [multiple worker functions](./examples/multiFunctionExample.js))**.
-Remember that workers can only send and receive serializable data.
+Remember that workers can only send and receive structured-cloneable data.
## Node versions
constructor () {
// eslint-disable-next-line @typescript-eslint/promise-function-async
super((data: MyData) => this.process(data), {
- maxInactiveTime: 60000,
- async: true
+ maxInactiveTime: 60000
})
}
protected abstract destroyWorker (worker: Worker): void | Promise<void>
/**
- * Setup hook to execute code before worker node are created in the abstract constructor.
- * Can be overridden
+ * Setup hook to execute code before worker nodes are created in the abstract constructor.
+ * Can be overridden.
*
* @virtual
*/
/**
* Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
+ * Can be overridden.
*
* @param worker - The newly created worker.
*/
- private afterWorkerSetup (worker: Worker): void {
+ protected afterWorkerSetup (worker: Worker): void {
// Listen to worker messages.
this.registerWorkerMessageListener(worker, this.workerListener())
}
/**
* A cluster pool with a fixed number of workers.
*
- * It is possible to perform tasks in sync or asynchronous mode as you prefer.
- *
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
* @author [Christopher Quadflieg](https://github.com/Shinigami92)
/**
* A thread pool with a fixed number of threads.
*
- * It is possible to perform tasks in sync or asynchronous mode as you prefer.
- *
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
* @author [Alessandro Pio Ardizio](https://github.com/pioardi)
*
* @typeParam Data - Type of data sent to the worker or execution response. This can only be structured-cloneable data.
* @typeParam ErrorData - Type of data sent to the worker triggering an error. This can only be structured-cloneable data.
- * @typeParam MainWorker - Type of main worker.
* @internal
*/
-export interface MessageValue<
- Data = unknown,
- ErrorData = unknown,
- MainWorker = NodeJS.Process | MessagePort
-> extends Task<Data> {
+export interface MessageValue<Data = unknown, ErrorData = unknown>
+ extends Task<Data> {
/**
* Kill code.
*/
* Task performance.
*/
readonly taskPerformance?: TaskPerformance
- /**
- * Reference to main worker.
- */
- readonly parent?: MainWorker
/**
* Whether to compute the given statistics or not.
*/
import { AsyncResource } from 'node:async_hooks'
+import type { Worker } from 'node:cluster'
import type { MessagePort } from 'node:worker_threads'
import { performance } from 'node:perf_hooks'
import type {
* @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
*/
export abstract class AbstractWorker<
- MainWorker extends NodeJS.Process | MessagePort,
+ MainWorker extends Worker | MessagePort,
Data = unknown,
Response = unknown
> extends AsyncResource {
*
* @param message - Message received.
*/
- protected messageListener (
- message: MessageValue<Data, Data, MainWorker>
- ): void {
+ protected messageListener (message: MessageValue<Data, Data>): void {
if (message.id != null && message.data != null) {
// Task message received
const fn = this.getTaskFunction(message.name)
} else {
this.runInAsyncScope(this.runSync.bind(this), this, fn, message)
}
- } else if (message.parent != null) {
- // Main worker reference message received
- this.mainWorker = message.parent
} else if (message.statistics != null) {
// Statistics message received
this.statistics = message.statistics
-import cluster from 'node:cluster'
+import cluster, { type Worker } from 'node:cluster'
import type { MessageValue } from '../utility-types'
import { AbstractWorker } from './abstract-worker'
import type { WorkerOptions } from './worker-options'
export class ClusterWorker<
Data = unknown,
Response = unknown
-> extends AbstractWorker<NodeJS.Process, Data, Response> {
+> extends AbstractWorker<Worker, Data, Response> {
/**
* Constructs a new poolifier cluster worker.
*
'worker-cluster-pool:poolifier',
cluster.isPrimary,
taskFunctions,
- process,
+ cluster.worker as Worker,
opts
)
}
/** @inheritDoc */
protected sendToMainWorker (message: MessageValue<Response>): void {
- const mainWorker = this.getMainWorker()
- if (mainWorker.send == null) {
- throw new Error('Main worker does not support IPC communication')
- }
- mainWorker.send(message)
+ this.getMainWorker().send(message)
}
/** @inheritDoc */
module.exports = new ClusterWorker(error, {
maxInactiveTime: 500,
- async: true,
killBehavior: KillBehaviors.HARD
})
module.exports = new ClusterWorker(sleep, {
maxInactiveTime: 500,
- async: true,
killBehavior: KillBehaviors.HARD
})
module.exports = new ClusterWorker(sleep, {
maxInactiveTime: 500,
- async: true,
killBehavior: KillBehaviors.HARD
})
}
module.exports = new ClusterWorker(sleep, {
- maxInactiveTime: 500,
- async: true
+ maxInactiveTime: 500
})
module.exports = new ThreadWorker(error, {
maxInactiveTime: 500,
- async: true,
killBehavior: KillBehaviors.HARD
})
module.exports = new ThreadWorker(sleep, {
maxInactiveTime: 500,
- async: true,
killBehavior: KillBehaviors.HARD
})
module.exports = new ThreadWorker(sleep, {
maxInactiveTime: 500,
- async: true,
killBehavior: KillBehaviors.HARD
})
}
module.exports = new ThreadWorker(sleep, {
- maxInactiveTime: 500,
- async: true
+ maxInactiveTime: 500
})