message: MessageValue<Data>
): void
- /**
- * Registers a listener callback on the given worker.
- *
- * @param worker - The worker which should register a listener.
- * @param listener - The message listener callback.
- */
- private registerWorkerMessageListener<Message extends Data | Response>(
- worker: Worker,
- listener: (message: MessageValue<Message>) => void
- ): void {
- worker.on('message', listener as MessageHandler<Worker>)
- }
-
/**
* Creates a new worker.
*
*/
protected abstract createWorker (): Worker
- /**
- * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
- * Can be overridden.
- *
- * @param worker - The newly created worker.
- */
- protected afterWorkerSetup (worker: Worker): void {
- // Listen to worker messages.
- this.registerWorkerMessageListener(worker, this.workerListener())
- // Send startup message to worker.
- this.sendToWorker(worker, {
- ready: false,
- workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
- })
- // Setup worker task statistics computation.
- this.setWorkerStatistics(worker)
- }
-
/**
* Creates a new worker and sets it up completely in the pool worker nodes.
*
return worker
}
- private redistributeQueuedTasks (workerNodeKey: number): void {
- while (this.tasksQueueSize(workerNodeKey) > 0) {
- let targetWorkerNodeKey: number = workerNodeKey
- let minQueuedTasks = Infinity
- for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
- if (
- workerNodeId !== workerNodeKey &&
- workerNode.usage.tasks.queued === 0
- ) {
- targetWorkerNodeKey = workerNodeId
- break
- }
- if (
- workerNodeId !== workerNodeKey &&
- workerNode.usage.tasks.queued < minQueuedTasks
- ) {
- minQueuedTasks = workerNode.usage.tasks.queued
- targetWorkerNodeKey = workerNodeId
- }
- }
- this.enqueueTask(
- targetWorkerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
- }
- }
-
/**
* Creates a new dynamic worker and sets it up completely in the pool worker nodes.
*
return worker
}
+ /**
+ * Registers a listener callback on the given worker.
+ *
+ * @param worker - The worker which should register a listener.
+ * @param listener - The message listener callback.
+ */
+ private registerWorkerMessageListener<Message extends Data | Response>(
+ worker: Worker,
+ listener: (message: MessageValue<Message>) => void
+ ): void {
+ worker.on('message', listener as MessageHandler<Worker>)
+ }
+
+ /**
+ * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
+ * Can be overridden.
+ *
+ * @param worker - The newly created worker.
+ */
+ protected afterWorkerSetup (worker: Worker): void {
+ // Listen to worker messages.
+ this.registerWorkerMessageListener(worker, this.workerListener())
+ // Send startup message to worker.
+ this.sendToWorker(worker, {
+ ready: false,
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
+ })
+ // Setup worker task statistics computation.
+ this.setWorkerStatistics(worker)
+ }
+
+ private redistributeQueuedTasks (workerNodeKey: number): void {
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ let targetWorkerNodeKey: number = workerNodeKey
+ let minQueuedTasks = Infinity
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ const workerInfo = this.getWorkerInfo(workerNodeId)
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerInfo.ready &&
+ workerNode.usage.tasks.queued === 0
+ ) {
+ targetWorkerNodeKey = workerNodeId
+ break
+ }
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerInfo.ready &&
+ workerNode.usage.tasks.queued < minQueuedTasks
+ ) {
+ minQueuedTasks = workerNode.usage.tasks.queued
+ targetWorkerNodeKey = workerNodeId
+ }
+ }
+ this.enqueueTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+
/**
* This function is the listener registered for each worker message.
*