fix: fix continuous tasks stealing on idle start at worker node idling
[poolifier.git] / tests / pools / worker-node.test.mjs
1 import { MessageChannel, Worker } from 'node:worker_threads'
2 import cluster from 'node:cluster'
3 import { expect } from 'expect'
4 import { WorkerNode } from '../../lib/pools/worker-node.js'
5 import { WorkerTypes } from '../../lib/index.js'
6 import { CircularArray } from '../../lib/circular-array.js'
7 import { Deque } from '../../lib/deque.js'
8 import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
9
10 describe('Worker node test suite', () => {
11 const threadWorker = new Worker('./tests/worker-files/thread/testWorker.mjs')
12 const clusterWorker = cluster.fork()
13 const threadWorkerNode = new WorkerNode(threadWorker, 12)
14 const clusterWorkerNode = new WorkerNode(clusterWorker, 12)
15
16 it('Worker node instantiation', () => {
17 expect(() => new WorkerNode()).toThrow(
18 new TypeError('Cannot construct a worker node without a worker')
19 )
20 expect(() => new WorkerNode(threadWorker)).toThrow(
21 new TypeError(
22 'Cannot construct a worker node without a tasks queue back pressure size'
23 )
24 )
25 expect(
26 () => new WorkerNode(threadWorker, 'invalidTasksQueueBackPressureSize')
27 ).toThrow(
28 new TypeError(
29 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
30 )
31 )
32 expect(() => new WorkerNode(threadWorker, 0.2)).toThrow(
33 new TypeError(
34 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
35 )
36 )
37 expect(() => new WorkerNode(threadWorker, 0)).toThrow(
38 new RangeError(
39 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
40 )
41 )
42 expect(() => new WorkerNode(threadWorker, -1)).toThrow(
43 new RangeError(
44 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
45 )
46 )
47 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
48 expect(threadWorkerNode.worker).toBe(threadWorker)
49 expect(threadWorkerNode.info).toStrictEqual({
50 id: threadWorker.threadId,
51 type: WorkerTypes.thread,
52 dynamic: false,
53 ready: false
54 })
55 expect(threadWorkerNode.usage).toStrictEqual({
56 tasks: {
57 executed: 0,
58 executing: 0,
59 queued: 0,
60 maxQueued: 0,
61 sequentiallyStolen: 0,
62 stolen: 0,
63 failed: 0
64 },
65 runTime: {
66 history: new CircularArray()
67 },
68 waitTime: {
69 history: new CircularArray()
70 },
71 elu: {
72 idle: {
73 history: new CircularArray()
74 },
75 active: {
76 history: new CircularArray()
77 }
78 }
79 })
80 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
81 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
82 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
83 expect(threadWorkerNode.tasksQueue.size).toBe(0)
84 expect(threadWorkerNode.tasksQueueSize()).toBe(
85 threadWorkerNode.tasksQueue.size
86 )
87 expect(threadWorkerNode.onBackPressureStarted).toBe(false)
88 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
89
90 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
91 expect(clusterWorkerNode.worker).toBe(clusterWorker)
92 expect(clusterWorkerNode.info).toStrictEqual({
93 id: clusterWorker.id,
94 type: WorkerTypes.cluster,
95 dynamic: false,
96 ready: false
97 })
98 expect(clusterWorkerNode.usage).toStrictEqual({
99 tasks: {
100 executed: 0,
101 executing: 0,
102 queued: 0,
103 maxQueued: 0,
104 sequentiallyStolen: 0,
105 stolen: 0,
106 failed: 0
107 },
108 runTime: {
109 history: new CircularArray()
110 },
111 waitTime: {
112 history: new CircularArray()
113 },
114 elu: {
115 idle: {
116 history: new CircularArray()
117 },
118 active: {
119 history: new CircularArray()
120 }
121 }
122 })
123 expect(clusterWorkerNode.messageChannel).toBeUndefined()
124 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
125 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
126 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
127 expect(clusterWorkerNode.tasksQueueSize()).toBe(
128 clusterWorkerNode.tasksQueue.size
129 )
130 expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
131 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
132 })
133
134 it('Worker node getTaskFunctionWorkerUsage()', () => {
135 expect(() =>
136 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
137 ).toThrow(
138 new TypeError(
139 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
140 )
141 )
142 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
143 expect(() =>
144 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
145 ).toThrow(
146 new TypeError(
147 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
148 )
149 )
150 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
151 expect(
152 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
153 ).toStrictEqual({
154 tasks: {
155 executed: 0,
156 executing: 0,
157 queued: 0,
158 stolen: 0,
159 sequentiallyStolen: 0,
160 failed: 0
161 },
162 runTime: {
163 history: new CircularArray()
164 },
165 waitTime: {
166 history: new CircularArray()
167 },
168 elu: {
169 idle: {
170 history: new CircularArray()
171 },
172 active: {
173 history: new CircularArray()
174 }
175 }
176 })
177 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
178 tasks: {
179 executed: 0,
180 executing: 0,
181 queued: 0,
182 stolen: 0,
183 sequentiallyStolen: 0,
184 failed: 0
185 },
186 runTime: {
187 history: new CircularArray()
188 },
189 waitTime: {
190 history: new CircularArray()
191 },
192 elu: {
193 idle: {
194 history: new CircularArray()
195 },
196 active: {
197 history: new CircularArray()
198 }
199 }
200 })
201 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
202 tasks: {
203 executed: 0,
204 executing: 0,
205 queued: 0,
206 stolen: 0,
207 sequentiallyStolen: 0,
208 failed: 0
209 },
210 runTime: {
211 history: new CircularArray()
212 },
213 waitTime: {
214 history: new CircularArray()
215 },
216 elu: {
217 idle: {
218 history: new CircularArray()
219 },
220 active: {
221 history: new CircularArray()
222 }
223 }
224 })
225 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
226 })
227
228 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
229 expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
230 DEFAULT_TASK_NAME,
231 'fn1',
232 'fn2'
233 ])
234 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
235 expect(
236 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
237 ).toBe(false)
238 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
239 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
240 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
241 })
242 })