5633530f35d6fd0d06c1cae72af18405daabb997
[poolifier.git] / tests / pools / worker-node.test.mjs
1 import { Worker as ClusterWorker } from 'node:cluster'
2 import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
3
4 import { expect } from 'expect'
5
6 import { CircularArray } from '../../lib/circular-array.cjs'
7 import { Deque } from '../../lib/deque.cjs'
8 import { WorkerTypes } from '../../lib/index.cjs'
9 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
10 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
11
12 describe('Worker node test suite', () => {
13 const threadWorkerNode = new WorkerNode(
14 WorkerTypes.thread,
15 './tests/worker-files/thread/testWorker.mjs',
16 { tasksQueueBackPressureSize: 12 }
17 )
18 const clusterWorkerNode = new WorkerNode(
19 WorkerTypes.cluster,
20 './tests/worker-files/cluster/testWorker.cjs',
21 { tasksQueueBackPressureSize: 12 }
22 )
23
24 it('Worker node instantiation', () => {
25 expect(() => new WorkerNode()).toThrow(
26 new TypeError('Cannot construct a worker node without a worker type')
27 )
28 expect(
29 () =>
30 new WorkerNode(
31 'invalidWorkerType',
32 './tests/worker-files/thread/testWorker.mjs',
33 { tasksQueueBackPressureSize: 12 }
34 )
35 ).toThrow(
36 new TypeError(
37 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
38 )
39 )
40 expect(
41 () =>
42 new WorkerNode(
43 WorkerTypes.thread,
44 './tests/worker-files/thread/testWorker.mjs'
45 )
46 ).toThrow(
47 new TypeError(
48 'Cannot construct a worker node without worker node options'
49 )
50 )
51 expect(
52 () =>
53 new WorkerNode(
54 WorkerTypes.thread,
55 './tests/worker-files/thread/testWorker.mjs',
56 ''
57 )
58 ).toThrow(
59 new TypeError(
60 'Cannot construct a worker node with invalid options: must be a plain object'
61 )
62 )
63 expect(
64 () =>
65 new WorkerNode(
66 WorkerTypes.thread,
67 './tests/worker-files/thread/testWorker.mjs',
68 {}
69 )
70 ).toThrow(
71 new TypeError(
72 'Cannot construct a worker node without a tasks queue back pressure size option'
73 )
74 )
75 expect(
76 () =>
77 new WorkerNode(
78 WorkerTypes.thread,
79 './tests/worker-files/thread/testWorker.mjs',
80 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
81 )
82 ).toThrow(
83 new TypeError(
84 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
85 )
86 )
87 expect(
88 () =>
89 new WorkerNode(
90 WorkerTypes.thread,
91 './tests/worker-files/thread/testWorker.mjs',
92 { tasksQueueBackPressureSize: 0.2 }
93 )
94 ).toThrow(
95 new TypeError(
96 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
97 )
98 )
99 expect(
100 () =>
101 new WorkerNode(
102 WorkerTypes.thread,
103 './tests/worker-files/thread/testWorker.mjs',
104 { tasksQueueBackPressureSize: 0 }
105 )
106 ).toThrow(
107 new RangeError(
108 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
109 )
110 )
111 expect(
112 () =>
113 new WorkerNode(
114 WorkerTypes.thread,
115 './tests/worker-files/thread/testWorker.mjs',
116 { tasksQueueBackPressureSize: -1 }
117 )
118 ).toThrow(
119 new RangeError(
120 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
121 )
122 )
123 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
124 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
125 expect(threadWorkerNode.info).toStrictEqual({
126 id: threadWorkerNode.worker.threadId,
127 type: WorkerTypes.thread,
128 dynamic: false,
129 ready: false,
130 stealing: false
131 })
132 expect(threadWorkerNode.usage).toStrictEqual({
133 tasks: {
134 executed: 0,
135 executing: 0,
136 queued: 0,
137 maxQueued: 0,
138 sequentiallyStolen: 0,
139 stolen: 0,
140 failed: 0
141 },
142 runTime: {
143 history: new CircularArray()
144 },
145 waitTime: {
146 history: new CircularArray()
147 },
148 elu: {
149 idle: {
150 history: new CircularArray()
151 },
152 active: {
153 history: new CircularArray()
154 }
155 }
156 })
157 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
158 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
159 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
160 expect(threadWorkerNode.tasksQueue.size).toBe(0)
161 expect(threadWorkerNode.tasksQueueSize()).toBe(
162 threadWorkerNode.tasksQueue.size
163 )
164 expect(threadWorkerNode.onBackPressureStarted).toBe(false)
165 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
166
167 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
168 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
169 expect(clusterWorkerNode.info).toStrictEqual({
170 id: clusterWorkerNode.worker.id,
171 type: WorkerTypes.cluster,
172 dynamic: false,
173 ready: false,
174 stealing: false
175 })
176 expect(clusterWorkerNode.usage).toStrictEqual({
177 tasks: {
178 executed: 0,
179 executing: 0,
180 queued: 0,
181 maxQueued: 0,
182 sequentiallyStolen: 0,
183 stolen: 0,
184 failed: 0
185 },
186 runTime: {
187 history: new CircularArray()
188 },
189 waitTime: {
190 history: new CircularArray()
191 },
192 elu: {
193 idle: {
194 history: new CircularArray()
195 },
196 active: {
197 history: new CircularArray()
198 }
199 }
200 })
201 expect(clusterWorkerNode.messageChannel).toBeUndefined()
202 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
203 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
204 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
205 expect(clusterWorkerNode.tasksQueueSize()).toBe(
206 clusterWorkerNode.tasksQueue.size
207 )
208 expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
209 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
210 })
211
212 it('Worker node getTaskFunctionWorkerUsage()', () => {
213 expect(() =>
214 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
215 ).toThrow(
216 new TypeError(
217 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
218 )
219 )
220 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
221 expect(() =>
222 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
223 ).toThrow(
224 new TypeError(
225 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
226 )
227 )
228 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
229 expect(
230 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
231 ).toStrictEqual({
232 tasks: {
233 executed: 0,
234 executing: 0,
235 queued: 0,
236 sequentiallyStolen: 0,
237 stolen: 0,
238 failed: 0
239 },
240 runTime: {
241 history: new CircularArray()
242 },
243 waitTime: {
244 history: new CircularArray()
245 },
246 elu: {
247 idle: {
248 history: new CircularArray()
249 },
250 active: {
251 history: new CircularArray()
252 }
253 }
254 })
255 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
256 tasks: {
257 executed: 0,
258 executing: 0,
259 queued: 0,
260 sequentiallyStolen: 0,
261 stolen: 0,
262 failed: 0
263 },
264 runTime: {
265 history: new CircularArray()
266 },
267 waitTime: {
268 history: new CircularArray()
269 },
270 elu: {
271 idle: {
272 history: new CircularArray()
273 },
274 active: {
275 history: new CircularArray()
276 }
277 }
278 })
279 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
280 tasks: {
281 executed: 0,
282 executing: 0,
283 queued: 0,
284 sequentiallyStolen: 0,
285 stolen: 0,
286 failed: 0
287 },
288 runTime: {
289 history: new CircularArray()
290 },
291 waitTime: {
292 history: new CircularArray()
293 },
294 elu: {
295 idle: {
296 history: new CircularArray()
297 },
298 active: {
299 history: new CircularArray()
300 }
301 }
302 })
303 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
304 })
305
306 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
307 expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
308 DEFAULT_TASK_NAME,
309 'fn1',
310 'fn2'
311 ])
312 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
313 expect(
314 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
315 ).toBe(false)
316 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
317 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
318 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
319 })
320 })