fix: fix async task statistics accounting
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
4b628b48
JB
1import { CircularArray } from '../circular-array'
2import { Queue } from '../queue'
3import {
4 type IWorker,
5 type IWorkerNode,
6 type Task,
7 type WorkerInfo,
8 type WorkerType,
9 WorkerTypes,
10 type WorkerUsage
11} from './worker'
12
60664f48
JB
13/**
14 * Worker node.
15 *
16 * @typeParam Worker - Type of worker.
17 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
18 */
4b628b48
JB
19export class WorkerNode<Worker extends IWorker, Data = unknown>
20implements IWorkerNode<Worker, Data> {
21 public readonly worker: Worker
22 public readonly info: WorkerInfo
23 public usage: WorkerUsage
ff128cc9 24 private readonly tasksUsage: Map<string, WorkerUsage>
4b628b48
JB
25 private readonly tasksQueue: Queue<Task<Data>>
26
60664f48
JB
27 /**
28 * Constructs a new worker node.
29 *
30 * @param worker - The worker.
31 * @param workerType - The worker type.
60664f48 32 */
4b628b48
JB
33 constructor (worker: Worker, workerType: WorkerType) {
34 this.worker = worker
35 this.info = this.initWorkerInfo(worker, workerType)
36 this.usage = this.initWorkerUsage()
ff128cc9 37 this.tasksUsage = new Map<string, WorkerUsage>()
4b628b48
JB
38 this.tasksQueue = new Queue<Task<Data>>()
39 }
40
41 /** @inheritdoc */
42 public tasksQueueSize (): number {
43 return this.tasksQueue.size
44 }
45
46 /**
47 * Worker node tasks queue maximum size.
48 *
49 * @returns The tasks queue maximum size.
50 */
51 private tasksQueueMaxSize (): number {
52 return this.tasksQueue.maxSize
53 }
54
55 /** @inheritdoc */
56 public enqueueTask (task: Task<Data>): number {
57 return this.tasksQueue.enqueue(task)
58 }
59
60 /** @inheritdoc */
61 public dequeueTask (): Task<Data> | undefined {
62 return this.tasksQueue.dequeue()
63 }
64
65 /** @inheritdoc */
66 public clearTasksQueue (): void {
67 this.tasksQueue.clear()
68 }
69
ff128cc9 70 /** @inheritdoc */
4b628b48
JB
71 public resetUsage (): void {
72 this.usage = this.initWorkerUsage()
ff128cc9
JB
73 this.tasksUsage.clear()
74 }
75
76 /** @inheritdoc */
77 public getTasksWorkerUsage (name: string): WorkerUsage | undefined {
78 if (!this.tasksUsage.has(name)) {
79 this.tasksUsage.set(name, this.initWorkerUsage())
80 }
81 return this.tasksUsage.get(name)
4b628b48
JB
82 }
83
84 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
85 return {
86 id: this.getWorkerId(worker, workerType),
87 type: workerType,
88 dynamic: false,
2431bdb4 89 ready: false
4b628b48
JB
90 }
91 }
92
93 private initWorkerUsage (): WorkerUsage {
94 const getTasksQueueSize = (): number => {
95 return this.tasksQueueSize()
96 }
bf4ef2ca 97 const getTasksQueueMaxSize = (): number => {
4b628b48
JB
98 return this.tasksQueueMaxSize()
99 }
100 return {
101 tasks: {
102 executed: 0,
103 executing: 0,
104 get queued (): number {
105 return getTasksQueueSize()
106 },
107 get maxQueued (): number {
bf4ef2ca 108 return getTasksQueueMaxSize()
4b628b48
JB
109 },
110 failed: 0
111 },
112 runTime: {
113 history: new CircularArray()
114 },
115 waitTime: {
116 history: new CircularArray()
117 },
118 elu: {
119 idle: {
120 history: new CircularArray()
121 },
122 active: {
123 history: new CircularArray()
124 }
125 }
126 }
127 }
128
129 /**
130 * Gets the worker id.
131 *
132 * @param worker - The worker.
60664f48 133 * @param workerType - The worker type.
4b628b48
JB
134 * @returns The worker id.
135 */
136 private getWorkerId (
137 worker: Worker,
138 workerType: WorkerType
139 ): number | undefined {
140 if (workerType === WorkerTypes.thread) {
141 return worker.threadId
142 } else if (workerType === WorkerTypes.cluster) {
143 return worker.id
144 }
145 }
146}