fix: fix continuous tasks stealing on idle start at worker node idling
[poolifier.git] / tests / pools / worker-node.test.mjs
CommitLineData
a074ffee
JB
1import { MessageChannel, Worker } from 'node:worker_threads'
2import cluster from 'node:cluster'
3import { expect } from 'expect'
4import { WorkerNode } from '../../lib/pools/worker-node.js'
5import { WorkerTypes } from '../../lib/index.js'
6import { CircularArray } from '../../lib/circular-array.js'
7import { Deque } from '../../lib/deque.js'
8import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
26fb3c18
JB
9
10describe('Worker node test suite', () => {
b2fd3f4a 11 const threadWorker = new Worker('./tests/worker-files/thread/testWorker.mjs')
75de9f41
JB
12 const clusterWorker = cluster.fork()
13 const threadWorkerNode = new WorkerNode(threadWorker, 12)
14 const clusterWorkerNode = new WorkerNode(clusterWorker, 12)
26fb3c18
JB
15
16 it('Worker node instantiation', () => {
948faff7 17 expect(() => new WorkerNode()).toThrow(
26fb3c18
JB
18 new TypeError('Cannot construct a worker node without a worker')
19 )
948faff7 20 expect(() => new WorkerNode(threadWorker)).toThrow(
26fb3c18
JB
21 new TypeError(
22 'Cannot construct a worker node without a tasks queue back pressure size'
23 )
24 )
25 expect(
75de9f41 26 () => new WorkerNode(threadWorker, 'invalidTasksQueueBackPressureSize')
948faff7 27 ).toThrow(
26fb3c18
JB
28 new TypeError(
29 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
30 )
31 )
948faff7 32 expect(() => new WorkerNode(threadWorker, 0.2)).toThrow(
5b49e864
JB
33 new TypeError(
34 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
35 )
36 )
948faff7 37 expect(() => new WorkerNode(threadWorker, 0)).toThrow(
5b49e864
JB
38 new RangeError(
39 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
40 )
41 )
948faff7 42 expect(() => new WorkerNode(threadWorker, -1)).toThrow(
5b49e864
JB
43 new RangeError(
44 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
45 )
46 )
75de9f41
JB
47 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
48 expect(threadWorkerNode.worker).toBe(threadWorker)
49 expect(threadWorkerNode.info).toStrictEqual({
50 id: threadWorker.threadId,
26fb3c18
JB
51 type: WorkerTypes.thread,
52 dynamic: false,
53 ready: false
54 })
75de9f41
JB
55 expect(threadWorkerNode.usage).toStrictEqual({
56 tasks: {
57 executed: 0,
58 executing: 0,
59 queued: 0,
60 maxQueued: 0,
463226a4 61 sequentiallyStolen: 0,
75de9f41
JB
62 stolen: 0,
63 failed: 0
64 },
65 runTime: {
4ba4c7f9 66 history: new CircularArray()
75de9f41
JB
67 },
68 waitTime: {
4ba4c7f9 69 history: new CircularArray()
75de9f41
JB
70 },
71 elu: {
72 idle: {
4ba4c7f9 73 history: new CircularArray()
75de9f41
JB
74 },
75 active: {
4ba4c7f9 76 history: new CircularArray()
75de9f41
JB
77 }
78 }
79 })
80 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
81 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
82 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
83 expect(threadWorkerNode.tasksQueue.size).toBe(0)
68f1f531
JB
84 expect(threadWorkerNode.tasksQueueSize()).toBe(
85 threadWorkerNode.tasksQueue.size
86 )
87 expect(threadWorkerNode.onBackPressureStarted).toBe(false)
75de9f41
JB
88 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
89
90 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
91 expect(clusterWorkerNode.worker).toBe(clusterWorker)
92 expect(clusterWorkerNode.info).toStrictEqual({
93 id: clusterWorker.id,
94 type: WorkerTypes.cluster,
95 dynamic: false,
96 ready: false
97 })
98 expect(clusterWorkerNode.usage).toStrictEqual({
26fb3c18
JB
99 tasks: {
100 executed: 0,
101 executing: 0,
102 queued: 0,
103 maxQueued: 0,
463226a4 104 sequentiallyStolen: 0,
26fb3c18
JB
105 stolen: 0,
106 failed: 0
107 },
108 runTime: {
4ba4c7f9 109 history: new CircularArray()
26fb3c18
JB
110 },
111 waitTime: {
4ba4c7f9 112 history: new CircularArray()
26fb3c18
JB
113 },
114 elu: {
115 idle: {
4ba4c7f9 116 history: new CircularArray()
26fb3c18
JB
117 },
118 active: {
4ba4c7f9 119 history: new CircularArray()
26fb3c18
JB
120 }
121 }
122 })
75de9f41
JB
123 expect(clusterWorkerNode.messageChannel).toBeUndefined()
124 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
125 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
126 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
68f1f531
JB
127 expect(clusterWorkerNode.tasksQueueSize()).toBe(
128 clusterWorkerNode.tasksQueue.size
129 )
130 expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
75de9f41 131 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
26fb3c18
JB
132 })
133
134 it('Worker node getTaskFunctionWorkerUsage()', () => {
135 expect(() =>
75de9f41 136 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 137 ).toThrow(
26fb3c18
JB
138 new TypeError(
139 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
140 )
141 )
66979634 142 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
26fb3c18 143 expect(() =>
75de9f41 144 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 145 ).toThrow(
26fb3c18
JB
146 new TypeError(
147 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
148 )
149 )
66979634 150 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
6cd5248f 151 expect(
75de9f41 152 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
6cd5248f 153 ).toStrictEqual({
26fb3c18
JB
154 tasks: {
155 executed: 0,
156 executing: 0,
157 queued: 0,
158 stolen: 0,
463226a4 159 sequentiallyStolen: 0,
26fb3c18
JB
160 failed: 0
161 },
162 runTime: {
4ba4c7f9 163 history: new CircularArray()
26fb3c18
JB
164 },
165 waitTime: {
4ba4c7f9 166 history: new CircularArray()
26fb3c18
JB
167 },
168 elu: {
169 idle: {
4ba4c7f9 170 history: new CircularArray()
26fb3c18
JB
171 },
172 active: {
4ba4c7f9 173 history: new CircularArray()
26fb3c18
JB
174 }
175 }
176 })
75de9f41 177 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
26fb3c18
JB
178 tasks: {
179 executed: 0,
180 executing: 0,
181 queued: 0,
182 stolen: 0,
463226a4 183 sequentiallyStolen: 0,
26fb3c18
JB
184 failed: 0
185 },
186 runTime: {
4ba4c7f9 187 history: new CircularArray()
26fb3c18
JB
188 },
189 waitTime: {
4ba4c7f9 190 history: new CircularArray()
26fb3c18
JB
191 },
192 elu: {
193 idle: {
4ba4c7f9 194 history: new CircularArray()
26fb3c18
JB
195 },
196 active: {
4ba4c7f9 197 history: new CircularArray()
26fb3c18
JB
198 }
199 }
200 })
75de9f41 201 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
26fb3c18
JB
202 tasks: {
203 executed: 0,
204 executing: 0,
205 queued: 0,
206 stolen: 0,
463226a4 207 sequentiallyStolen: 0,
26fb3c18
JB
208 failed: 0
209 },
210 runTime: {
4ba4c7f9 211 history: new CircularArray()
26fb3c18
JB
212 },
213 waitTime: {
4ba4c7f9 214 history: new CircularArray()
26fb3c18
JB
215 },
216 elu: {
217 idle: {
4ba4c7f9 218 history: new CircularArray()
26fb3c18
JB
219 },
220 active: {
4ba4c7f9 221 history: new CircularArray()
26fb3c18
JB
222 }
223 }
224 })
75de9f41 225 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
26fb3c18 226 })
adee6053
JB
227
228 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
229 expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
230 DEFAULT_TASK_NAME,
231 'fn1',
232 'fn2'
233 ])
234 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
235 expect(
236 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
237 ).toBe(false)
238 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
239 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
240 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
241 })
26fb3c18 242})