d524c464ee2d92139bb243b22b5b93749711e451
[poolifier.git] / tests / pools / worker-node.test.mjs
1 import { MessageChannel, Worker } from 'node:worker_threads'
2 import cluster from 'node:cluster'
3 import { expect } from 'expect'
4 import { WorkerNode } from '../../lib/pools/worker-node.js'
5 import { WorkerTypes } from '../../lib/index.js'
6 import { CircularArray } from '../../lib/circular-array.js'
7 import { Deque } from '../../lib/deque.js'
8 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
9
10 describe('Worker node test suite', () => {
11 const threadWorker = new Worker('./tests/worker-files/thread/testWorker.mjs')
12 const clusterWorker = cluster.fork()
13 const threadWorkerNode = new WorkerNode(threadWorker, 12)
14 const clusterWorkerNode = new WorkerNode(clusterWorker, 12)
15
16 it('Worker node instantiation', () => {
17 expect(() => new WorkerNode()).toThrow(
18 new TypeError('Cannot construct a worker node without a worker')
19 )
20 expect(() => new WorkerNode(threadWorker)).toThrow(
21 new TypeError(
22 'Cannot construct a worker node without a tasks queue back pressure size'
23 )
24 )
25 expect(
26 () => new WorkerNode(threadWorker, 'invalidTasksQueueBackPressureSize')
27 ).toThrow(
28 new TypeError(
29 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
30 )
31 )
32 expect(() => new WorkerNode(threadWorker, 0.2)).toThrow(
33 new TypeError(
34 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
35 )
36 )
37 expect(() => new WorkerNode(threadWorker, 0)).toThrow(
38 new RangeError(
39 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
40 )
41 )
42 expect(() => new WorkerNode(threadWorker, -1)).toThrow(
43 new RangeError(
44 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
45 )
46 )
47 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
48 expect(threadWorkerNode.worker).toBe(threadWorker)
49 expect(threadWorkerNode.info).toStrictEqual({
50 id: threadWorker.threadId,
51 type: WorkerTypes.thread,
52 dynamic: false,
53 ready: false
54 })
55 expect(threadWorkerNode.usage).toStrictEqual({
56 tasks: {
57 executed: 0,
58 executing: 0,
59 queued: 0,
60 maxQueued: 0,
61 stolen: 0,
62 failed: 0
63 },
64 runTime: {
65 history: new CircularArray()
66 },
67 waitTime: {
68 history: new CircularArray()
69 },
70 elu: {
71 idle: {
72 history: new CircularArray()
73 },
74 active: {
75 history: new CircularArray()
76 }
77 }
78 })
79 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
80 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
81 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
82 expect(threadWorkerNode.tasksQueue.size).toBe(0)
83 expect(threadWorkerNode.tasksQueueSize()).toBe(
84 threadWorkerNode.tasksQueue.size
85 )
86 expect(threadWorkerNode.onBackPressureStarted).toBe(false)
87 expect(threadWorkerNode.onEmptyQueueCount).toBe(0)
88 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
89
90 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
91 expect(clusterWorkerNode.worker).toBe(clusterWorker)
92 expect(clusterWorkerNode.info).toStrictEqual({
93 id: clusterWorker.id,
94 type: WorkerTypes.cluster,
95 dynamic: false,
96 ready: false
97 })
98 expect(clusterWorkerNode.usage).toStrictEqual({
99 tasks: {
100 executed: 0,
101 executing: 0,
102 queued: 0,
103 maxQueued: 0,
104 stolen: 0,
105 failed: 0
106 },
107 runTime: {
108 history: new CircularArray()
109 },
110 waitTime: {
111 history: new CircularArray()
112 },
113 elu: {
114 idle: {
115 history: new CircularArray()
116 },
117 active: {
118 history: new CircularArray()
119 }
120 }
121 })
122 expect(clusterWorkerNode.messageChannel).toBeUndefined()
123 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
124 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
125 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
126 expect(clusterWorkerNode.tasksQueueSize()).toBe(
127 clusterWorkerNode.tasksQueue.size
128 )
129 expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
130 expect(clusterWorkerNode.onEmptyQueueCount).toBe(0)
131 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
132 })
133
134 it('Worker node getTaskFunctionWorkerUsage()', () => {
135 expect(() =>
136 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
137 ).toThrow(
138 new TypeError(
139 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
140 )
141 )
142 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
143 expect(() =>
144 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
145 ).toThrow(
146 new TypeError(
147 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
148 )
149 )
150 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
151 expect(
152 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
153 ).toStrictEqual({
154 tasks: {
155 executed: 0,
156 executing: 0,
157 queued: 0,
158 stolen: 0,
159 failed: 0
160 },
161 runTime: {
162 history: new CircularArray()
163 },
164 waitTime: {
165 history: new CircularArray()
166 },
167 elu: {
168 idle: {
169 history: new CircularArray()
170 },
171 active: {
172 history: new CircularArray()
173 }
174 }
175 })
176 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
177 tasks: {
178 executed: 0,
179 executing: 0,
180 queued: 0,
181 stolen: 0,
182 failed: 0
183 },
184 runTime: {
185 history: new CircularArray()
186 },
187 waitTime: {
188 history: new CircularArray()
189 },
190 elu: {
191 idle: {
192 history: new CircularArray()
193 },
194 active: {
195 history: new CircularArray()
196 }
197 }
198 })
199 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
200 tasks: {
201 executed: 0,
202 executing: 0,
203 queued: 0,
204 stolen: 0,
205 failed: 0
206 },
207 runTime: {
208 history: new CircularArray()
209 },
210 waitTime: {
211 history: new CircularArray()
212 },
213 elu: {
214 idle: {
215 history: new CircularArray()
216 },
217 active: {
218 history: new CircularArray()
219 }
220 }
221 })
222 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
223 })
224
225 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
226 expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
227 DEFAULT_TASK_NAME,
228 'fn1',
229 'fn2'
230 ])
231 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
232 expect(
233 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
234 ).toBe(false)
235 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
236 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
237 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
238 })
239 })