round
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
-import { CircularArray } from '../circular-array'
-import { Queue } from '../queue'
import {
type IPool,
PoolEmitter,
type PoolOptions,
type PoolType,
PoolTypes,
- type TasksQueueOptions,
- type WorkerType,
- WorkerTypes
+ type TasksQueueOptions
} from './pool'
import type {
IWorker,
+ IWorkerNode,
MessageHandler,
Task,
WorkerInfo,
- WorkerNode,
+ WorkerType,
WorkerUsage
} from './worker'
import {
} from './selection-strategies/selection-strategies-types'
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
import { version } from './version'
+import { WorkerNode } from './worker-node'
/**
* Base class that implements some shared logic for all poolifier pools.
Response = unknown
> implements IPool<Worker, Data, Response> {
/** @inheritDoc */
- public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
+ public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
/** @inheritDoc */
public readonly emitter?: PoolEmitter
this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
}
for (const workerNode of this.workerNodes) {
- this.setWorkerNodeTasksUsage(
- workerNode,
- this.getInitialWorkerUsage(workerNode.worker)
- )
+ workerNode.resetUsage()
this.setWorkerStatistics(workerNode.worker)
}
}
void (this.destroyWorker(worker) as Promise<void>)
}
})
- this.sendToWorker(worker, { dynamic: true })
+ this.sendToWorker(worker, { checkAlive: true })
return worker
}
}
private handleWorkerStartedMessage (message: MessageValue<Response>): void {
- // Worker started message received
const worker = this.getWorkerById(message.workerId as number)
if (worker != null) {
- this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
+ this.getWorkerInfo(this.getWorkerNodeKey(worker)).started =
message.started as boolean
} else {
throw new Error(
}
}
- /**
- * Sets the given worker node its tasks usage in the pool.
- *
- * @param workerNode - The worker node.
- * @param workerUsage - The worker usage.
- */
- private setWorkerNodeTasksUsage (
- workerNode: WorkerNode<Worker, Data>,
- workerUsage: WorkerUsage
- ): void {
- workerNode.usage = workerUsage
- }
-
/**
* Gets the worker information.
*
* @returns The worker nodes length.
*/
private pushWorkerNode (worker: Worker): number {
- this.workerNodes.push({
- worker,
- info: this.getInitialWorkerInfo(worker),
- usage: this.getInitialWorkerUsage(),
- tasksQueue: new Queue<Task<Data>>()
- })
- this.setWorkerNodeTasksUsage(
- this.workerNodes[this.getWorkerNodeKey(worker)],
- this.getInitialWorkerUsage(worker)
- )
- return this.workerNodes.length
+ return this.workerNodes.push(new WorkerNode(worker, this.worker))
}
- /**
- * Gets the worker id.
- *
- * @param worker - The worker.
- * @returns The worker id.
- */
- private getWorkerId (worker: Worker): number | undefined {
- if (this.worker === WorkerTypes.thread) {
- return worker.threadId
- } else if (this.worker === WorkerTypes.cluster) {
- return worker.id
- }
- }
-
- // /**
- // * Sets the given worker in the pool worker nodes.
- // *
- // * @param workerNodeKey - The worker node key.
- // * @param worker - The worker.
- // * @param workerInfo - The worker info.
- // * @param workerUsage - The worker usage.
- // * @param tasksQueue - The worker task queue.
- // */
- // private setWorkerNode (
- // workerNodeKey: number,
- // worker: Worker,
- // workerInfo: WorkerInfo,
- // workerUsage: WorkerUsage,
- // tasksQueue: Queue<Task<Data>>
- // ): void {
- // this.workerNodes[workerNodeKey] = {
- // worker,
- // info: workerInfo,
- // usage: workerUsage,
- // tasksQueue
- // }
- // }
-
/**
* Removes the given worker from the pool worker nodes.
*
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
- return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
+ return this.workerNodes[workerNodeKey].enqueueTask(task)
}
private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
+ return this.workerNodes[workerNodeKey].dequeueTask()
}
private tasksQueueSize (workerNodeKey: number): number {
- return this.workerNodes[workerNodeKey].tasksQueue.size
- }
-
- private tasksMaxQueueSize (workerNodeKey: number): number {
- return this.workerNodes[workerNodeKey].tasksQueue.maxSize
+ return this.workerNodes[workerNodeKey].tasksQueueSize()
}
private flushTasksQueue (workerNodeKey: number): void {
this.dequeueTask(workerNodeKey) as Task<Data>
)
}
- this.workerNodes[workerNodeKey].tasksQueue.clear()
+ this.workerNodes[workerNodeKey].clearTasksQueue()
}
private flushTasksQueues (): void {
}
})
}
-
- private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
- const getTasksQueueSize = (worker?: Worker): number => {
- if (worker == null) {
- return 0
- }
- return this.tasksQueueSize(this.getWorkerNodeKey(worker))
- }
- const getTasksMaxQueueSize = (worker?: Worker): number => {
- if (worker == null) {
- return 0
- }
- return this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
- }
- return {
- tasks: {
- executed: 0,
- executing: 0,
- get queued (): number {
- return getTasksQueueSize(worker)
- },
- get maxQueued (): number {
- return getTasksMaxQueueSize(worker)
- },
- failed: 0
- },
- runTime: {
- history: new CircularArray()
- },
- waitTime: {
- history: new CircularArray()
- },
- elu: {
- idle: {
- history: new CircularArray()
- },
- active: {
- history: new CircularArray()
- }
- }
- }
- }
-
- private getInitialWorkerInfo (worker: Worker): WorkerInfo {
- return { id: this.getWorkerId(worker), dynamic: false, started: true }
- }
}