feat: add pool and worker readyness tracking infrastructure
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
4b628b48
JB
1import { CircularArray } from '../circular-array'
2import { Queue } from '../queue'
3import {
4 type IWorker,
5 type IWorkerNode,
6 type Task,
7 type WorkerInfo,
8 type WorkerType,
9 WorkerTypes,
10 type WorkerUsage
11} from './worker'
12
60664f48
JB
13/**
14 * Worker node.
15 *
16 * @typeParam Worker - Type of worker.
17 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
18 */
4b628b48
JB
19export class WorkerNode<Worker extends IWorker, Data = unknown>
20implements IWorkerNode<Worker, Data> {
21 public readonly worker: Worker
22 public readonly info: WorkerInfo
23 public usage: WorkerUsage
24 private readonly tasksQueue: Queue<Task<Data>>
25
60664f48
JB
26 /**
27 * Constructs a new worker node.
28 *
29 * @param worker - The worker.
30 * @param workerType - The worker type.
31 * @internal
32 */
4b628b48
JB
33 constructor (worker: Worker, workerType: WorkerType) {
34 this.worker = worker
35 this.info = this.initWorkerInfo(worker, workerType)
36 this.usage = this.initWorkerUsage()
37 this.tasksQueue = new Queue<Task<Data>>()
38 }
39
40 /** @inheritdoc */
41 public tasksQueueSize (): number {
42 return this.tasksQueue.size
43 }
44
45 /**
46 * Worker node tasks queue maximum size.
47 *
48 * @returns The tasks queue maximum size.
49 */
50 private tasksQueueMaxSize (): number {
51 return this.tasksQueue.maxSize
52 }
53
54 /** @inheritdoc */
55 public enqueueTask (task: Task<Data>): number {
56 return this.tasksQueue.enqueue(task)
57 }
58
59 /** @inheritdoc */
60 public dequeueTask (): Task<Data> | undefined {
61 return this.tasksQueue.dequeue()
62 }
63
64 /** @inheritdoc */
65 public clearTasksQueue (): void {
66 this.tasksQueue.clear()
67 }
68
69 public resetUsage (): void {
70 this.usage = this.initWorkerUsage()
71 }
72
73 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
74 return {
75 id: this.getWorkerId(worker, workerType),
76 type: workerType,
77 dynamic: false,
2431bdb4 78 ready: false
4b628b48
JB
79 }
80 }
81
82 private initWorkerUsage (): WorkerUsage {
83 const getTasksQueueSize = (): number => {
84 return this.tasksQueueSize()
85 }
bf4ef2ca 86 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
87 return this.tasksQueueMaxSize()
88 }
89 return {
90 tasks: {
91 executed: 0,
92 executing: 0,
93 get queued (): number {
94 return getTasksQueueSize()
95 },
96 get maxQueued (): number {
bf4ef2ca 97 return getTasksQueueMaxSize()
4b628b48
JB
98 },
99 failed: 0
100 },
101 runTime: {
102 history: new CircularArray()
103 },
104 waitTime: {
105 history: new CircularArray()
106 },
107 elu: {
108 idle: {
109 history: new CircularArray()
110 },
111 active: {
112 history: new CircularArray()
113 }
114 }
115 }
116 }
117
118 /**
119 * Gets the worker id.
120 *
121 * @param worker - The worker.
60664f48 122 * @param workerType - The worker type.
4b628b48
JB
123 * @returns The worker id.
124 */
125 private getWorkerId (
126 worker: Worker,
127 workerType: WorkerType
128 ): number | undefined {
129 if (workerType === WorkerTypes.thread) {
130 return worker.threadId
131 } else if (workerType === WorkerTypes.cluster) {
132 return worker.id
133 }
134 }
135}