fix: fix worker node cross tasks stealing
[poolifier.git] / tests / pools / worker-node.test.mjs
CommitLineData
9974369e
JB
1import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
2import { Worker as ClusterWorker } from 'node:cluster'
a074ffee
JB
3import { expect } from 'expect'
4import { WorkerNode } from '../../lib/pools/worker-node.js'
5import { WorkerTypes } from '../../lib/index.js'
6import { CircularArray } from '../../lib/circular-array.js'
7import { Deque } from '../../lib/deque.js'
8import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
26fb3c18
JB
9
10describe('Worker node test suite', () => {
c3719753
JB
11 const threadWorkerNode = new WorkerNode(
12 WorkerTypes.thread,
13 './tests/worker-files/thread/testWorker.mjs',
14 { tasksQueueBackPressureSize: 12 }
15 )
16 const clusterWorkerNode = new WorkerNode(
17 WorkerTypes.cluster,
18 './tests/worker-files/cluster/testWorker.js',
19 { tasksQueueBackPressureSize: 12 }
20 )
26fb3c18
JB
21
22 it('Worker node instantiation', () => {
948faff7 23 expect(() => new WorkerNode()).toThrow(
c3719753 24 new TypeError('Cannot construct a worker node without a worker type')
26fb3c18 25 )
c3719753
JB
26 expect(
27 () =>
28 new WorkerNode(
29 'invalidWorkerType',
30 './tests/worker-files/thread/testWorker.mjs',
31 { tasksQueueBackPressureSize: 12 }
32 )
33 ).toThrow(
34 new TypeError(
35 "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
36 )
37 )
38 expect(
39 () =>
40 new WorkerNode(
41 WorkerTypes.thread,
42 './tests/worker-files/thread/testWorker.mjs'
43 )
44 ).toThrow(
26fb3c18 45 new TypeError(
c3719753 46 'Cannot construct a worker node without worker node options'
26fb3c18
JB
47 )
48 )
49 expect(
c3719753
JB
50 () =>
51 new WorkerNode(
52 WorkerTypes.thread,
53 './tests/worker-files/thread/testWorker.mjs',
54 ''
55 )
948faff7 56 ).toThrow(
26fb3c18 57 new TypeError(
c3719753 58 'Cannot construct a worker node with invalid options: must be a plain object'
26fb3c18
JB
59 )
60 )
c3719753
JB
61 expect(
62 () =>
63 new WorkerNode(
64 WorkerTypes.thread,
65 './tests/worker-files/thread/testWorker.mjs',
66 {}
67 )
68 ).toThrow(
5b49e864 69 new TypeError(
c3719753 70 'Cannot construct a worker node without a tasks queue back pressure size option'
5b49e864
JB
71 )
72 )
c3719753
JB
73 expect(
74 () =>
75 new WorkerNode(
76 WorkerTypes.thread,
77 './tests/worker-files/thread/testWorker.mjs',
78 { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
79 )
80 ).toThrow(
81 new TypeError(
82 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
83 )
84 )
85 expect(
86 () =>
87 new WorkerNode(
88 WorkerTypes.thread,
89 './tests/worker-files/thread/testWorker.mjs',
90 { tasksQueueBackPressureSize: 0.2 }
91 )
92 ).toThrow(
93 new TypeError(
94 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
95 )
96 )
97 expect(
98 () =>
99 new WorkerNode(
100 WorkerTypes.thread,
101 './tests/worker-files/thread/testWorker.mjs',
102 { tasksQueueBackPressureSize: 0 }
103 )
104 ).toThrow(
5b49e864 105 new RangeError(
c3719753 106 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
5b49e864
JB
107 )
108 )
c3719753
JB
109 expect(
110 () =>
111 new WorkerNode(
112 WorkerTypes.thread,
113 './tests/worker-files/thread/testWorker.mjs',
114 { tasksQueueBackPressureSize: -1 }
115 )
116 ).toThrow(
5b49e864 117 new RangeError(
c3719753 118 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
5b49e864
JB
119 )
120 )
75de9f41 121 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
9974369e 122 expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
75de9f41 123 expect(threadWorkerNode.info).toStrictEqual({
c3719753 124 id: threadWorkerNode.worker.threadId,
26fb3c18
JB
125 type: WorkerTypes.thread,
126 dynamic: false,
5eb72b9e
JB
127 ready: false,
128 stealing: false
26fb3c18 129 })
75de9f41
JB
130 expect(threadWorkerNode.usage).toStrictEqual({
131 tasks: {
132 executed: 0,
133 executing: 0,
134 queued: 0,
135 maxQueued: 0,
463226a4 136 sequentiallyStolen: 0,
75de9f41
JB
137 stolen: 0,
138 failed: 0
139 },
140 runTime: {
4ba4c7f9 141 history: new CircularArray()
75de9f41
JB
142 },
143 waitTime: {
4ba4c7f9 144 history: new CircularArray()
75de9f41
JB
145 },
146 elu: {
147 idle: {
4ba4c7f9 148 history: new CircularArray()
75de9f41
JB
149 },
150 active: {
4ba4c7f9 151 history: new CircularArray()
75de9f41
JB
152 }
153 }
154 })
155 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
156 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
157 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
158 expect(threadWorkerNode.tasksQueue.size).toBe(0)
68f1f531
JB
159 expect(threadWorkerNode.tasksQueueSize()).toBe(
160 threadWorkerNode.tasksQueue.size
161 )
162 expect(threadWorkerNode.onBackPressureStarted).toBe(false)
75de9f41
JB
163 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
164
165 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
9974369e 166 expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
75de9f41 167 expect(clusterWorkerNode.info).toStrictEqual({
c3719753 168 id: clusterWorkerNode.worker.id,
75de9f41
JB
169 type: WorkerTypes.cluster,
170 dynamic: false,
5eb72b9e
JB
171 ready: false,
172 stealing: false
75de9f41
JB
173 })
174 expect(clusterWorkerNode.usage).toStrictEqual({
26fb3c18
JB
175 tasks: {
176 executed: 0,
177 executing: 0,
178 queued: 0,
179 maxQueued: 0,
463226a4 180 sequentiallyStolen: 0,
26fb3c18
JB
181 stolen: 0,
182 failed: 0
183 },
184 runTime: {
4ba4c7f9 185 history: new CircularArray()
26fb3c18
JB
186 },
187 waitTime: {
4ba4c7f9 188 history: new CircularArray()
26fb3c18
JB
189 },
190 elu: {
191 idle: {
4ba4c7f9 192 history: new CircularArray()
26fb3c18
JB
193 },
194 active: {
4ba4c7f9 195 history: new CircularArray()
26fb3c18
JB
196 }
197 }
198 })
75de9f41
JB
199 expect(clusterWorkerNode.messageChannel).toBeUndefined()
200 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
201 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
202 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
68f1f531
JB
203 expect(clusterWorkerNode.tasksQueueSize()).toBe(
204 clusterWorkerNode.tasksQueue.size
205 )
206 expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
75de9f41 207 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
26fb3c18
JB
208 })
209
210 it('Worker node getTaskFunctionWorkerUsage()', () => {
211 expect(() =>
75de9f41 212 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 213 ).toThrow(
26fb3c18
JB
214 new TypeError(
215 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
216 )
217 )
66979634 218 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
26fb3c18 219 expect(() =>
75de9f41 220 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 221 ).toThrow(
26fb3c18
JB
222 new TypeError(
223 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
224 )
225 )
66979634 226 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
6cd5248f 227 expect(
75de9f41 228 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
6cd5248f 229 ).toStrictEqual({
26fb3c18
JB
230 tasks: {
231 executed: 0,
232 executing: 0,
233 queued: 0,
463226a4 234 sequentiallyStolen: 0,
5ad42e34 235 stolen: 0,
26fb3c18
JB
236 failed: 0
237 },
238 runTime: {
4ba4c7f9 239 history: new CircularArray()
26fb3c18
JB
240 },
241 waitTime: {
4ba4c7f9 242 history: new CircularArray()
26fb3c18
JB
243 },
244 elu: {
245 idle: {
4ba4c7f9 246 history: new CircularArray()
26fb3c18
JB
247 },
248 active: {
4ba4c7f9 249 history: new CircularArray()
26fb3c18
JB
250 }
251 }
252 })
75de9f41 253 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
26fb3c18
JB
254 tasks: {
255 executed: 0,
256 executing: 0,
257 queued: 0,
463226a4 258 sequentiallyStolen: 0,
5ad42e34 259 stolen: 0,
26fb3c18
JB
260 failed: 0
261 },
262 runTime: {
4ba4c7f9 263 history: new CircularArray()
26fb3c18
JB
264 },
265 waitTime: {
4ba4c7f9 266 history: new CircularArray()
26fb3c18
JB
267 },
268 elu: {
269 idle: {
4ba4c7f9 270 history: new CircularArray()
26fb3c18
JB
271 },
272 active: {
4ba4c7f9 273 history: new CircularArray()
26fb3c18
JB
274 }
275 }
276 })
75de9f41 277 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
26fb3c18
JB
278 tasks: {
279 executed: 0,
280 executing: 0,
281 queued: 0,
463226a4 282 sequentiallyStolen: 0,
5ad42e34 283 stolen: 0,
26fb3c18
JB
284 failed: 0
285 },
286 runTime: {
4ba4c7f9 287 history: new CircularArray()
26fb3c18
JB
288 },
289 waitTime: {
4ba4c7f9 290 history: new CircularArray()
26fb3c18
JB
291 },
292 elu: {
293 idle: {
4ba4c7f9 294 history: new CircularArray()
26fb3c18
JB
295 },
296 active: {
4ba4c7f9 297 history: new CircularArray()
26fb3c18
JB
298 }
299 }
300 })
75de9f41 301 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
26fb3c18 302 })
adee6053
JB
303
304 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
305 expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
306 DEFAULT_TASK_NAME,
307 'fn1',
308 'fn2'
309 ])
310 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
311 expect(
312 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
313 ).toBe(false)
314 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
315 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
316 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
317 })
26fb3c18 318})