feat: fire tasks stealing at worker node idling
[poolifier.git] / tests / pools / worker-node.test.mjs
CommitLineData
a074ffee
JB
1import { MessageChannel, Worker } from 'node:worker_threads'
2import cluster from 'node:cluster'
3import { expect } from 'expect'
4import { WorkerNode } from '../../lib/pools/worker-node.js'
5import { WorkerTypes } from '../../lib/index.js'
6import { CircularArray } from '../../lib/circular-array.js'
7import { Deque } from '../../lib/deque.js'
8import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
26fb3c18
JB
9
10describe('Worker node test suite', () => {
b2fd3f4a 11 const threadWorker = new Worker('./tests/worker-files/thread/testWorker.mjs')
75de9f41
JB
12 const clusterWorker = cluster.fork()
13 const threadWorkerNode = new WorkerNode(threadWorker, 12)
14 const clusterWorkerNode = new WorkerNode(clusterWorker, 12)
26fb3c18
JB
15
16 it('Worker node instantiation', () => {
948faff7 17 expect(() => new WorkerNode()).toThrow(
26fb3c18
JB
18 new TypeError('Cannot construct a worker node without a worker')
19 )
948faff7 20 expect(() => new WorkerNode(threadWorker)).toThrow(
26fb3c18
JB
21 new TypeError(
22 'Cannot construct a worker node without a tasks queue back pressure size'
23 )
24 )
25 expect(
75de9f41 26 () => new WorkerNode(threadWorker, 'invalidTasksQueueBackPressureSize')
948faff7 27 ).toThrow(
26fb3c18
JB
28 new TypeError(
29 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
30 )
31 )
948faff7 32 expect(() => new WorkerNode(threadWorker, 0.2)).toThrow(
5b49e864
JB
33 new TypeError(
34 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
35 )
36 )
948faff7 37 expect(() => new WorkerNode(threadWorker, 0)).toThrow(
5b49e864
JB
38 new RangeError(
39 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
40 )
41 )
948faff7 42 expect(() => new WorkerNode(threadWorker, -1)).toThrow(
5b49e864
JB
43 new RangeError(
44 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
45 )
46 )
75de9f41
JB
47 expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
48 expect(threadWorkerNode.worker).toBe(threadWorker)
49 expect(threadWorkerNode.info).toStrictEqual({
50 id: threadWorker.threadId,
26fb3c18
JB
51 type: WorkerTypes.thread,
52 dynamic: false,
53 ready: false
54 })
75de9f41
JB
55 expect(threadWorkerNode.usage).toStrictEqual({
56 tasks: {
57 executed: 0,
58 executing: 0,
59 queued: 0,
60 maxQueued: 0,
61 stolen: 0,
62 failed: 0
63 },
64 runTime: {
4ba4c7f9 65 history: new CircularArray()
75de9f41
JB
66 },
67 waitTime: {
4ba4c7f9 68 history: new CircularArray()
75de9f41
JB
69 },
70 elu: {
71 idle: {
4ba4c7f9 72 history: new CircularArray()
75de9f41
JB
73 },
74 active: {
4ba4c7f9 75 history: new CircularArray()
75de9f41
JB
76 }
77 }
78 })
79 expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
80 expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
81 expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
82 expect(threadWorkerNode.tasksQueue.size).toBe(0)
68f1f531
JB
83 expect(threadWorkerNode.tasksQueueSize()).toBe(
84 threadWorkerNode.tasksQueue.size
85 )
86 expect(threadWorkerNode.onBackPressureStarted).toBe(false)
65542a35 87 expect(threadWorkerNode.onIdleWorkerNodeCount).toBe(0)
75de9f41
JB
88 expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
89
90 expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
91 expect(clusterWorkerNode.worker).toBe(clusterWorker)
92 expect(clusterWorkerNode.info).toStrictEqual({
93 id: clusterWorker.id,
94 type: WorkerTypes.cluster,
95 dynamic: false,
96 ready: false
97 })
98 expect(clusterWorkerNode.usage).toStrictEqual({
26fb3c18
JB
99 tasks: {
100 executed: 0,
101 executing: 0,
102 queued: 0,
103 maxQueued: 0,
104 stolen: 0,
105 failed: 0
106 },
107 runTime: {
4ba4c7f9 108 history: new CircularArray()
26fb3c18
JB
109 },
110 waitTime: {
4ba4c7f9 111 history: new CircularArray()
26fb3c18
JB
112 },
113 elu: {
114 idle: {
4ba4c7f9 115 history: new CircularArray()
26fb3c18
JB
116 },
117 active: {
4ba4c7f9 118 history: new CircularArray()
26fb3c18
JB
119 }
120 }
121 })
75de9f41
JB
122 expect(clusterWorkerNode.messageChannel).toBeUndefined()
123 expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
124 expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
125 expect(clusterWorkerNode.tasksQueue.size).toBe(0)
68f1f531
JB
126 expect(clusterWorkerNode.tasksQueueSize()).toBe(
127 clusterWorkerNode.tasksQueue.size
128 )
129 expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
65542a35 130 expect(clusterWorkerNode.onIdleWorkerNodeCount).toBe(0)
75de9f41 131 expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
26fb3c18
JB
132 })
133
134 it('Worker node getTaskFunctionWorkerUsage()', () => {
135 expect(() =>
75de9f41 136 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 137 ).toThrow(
26fb3c18
JB
138 new TypeError(
139 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
140 )
141 )
66979634 142 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
26fb3c18 143 expect(() =>
75de9f41 144 threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
948faff7 145 ).toThrow(
26fb3c18
JB
146 new TypeError(
147 "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
148 )
149 )
66979634 150 threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
6cd5248f 151 expect(
75de9f41 152 threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
6cd5248f 153 ).toStrictEqual({
26fb3c18
JB
154 tasks: {
155 executed: 0,
156 executing: 0,
157 queued: 0,
158 stolen: 0,
159 failed: 0
160 },
161 runTime: {
4ba4c7f9 162 history: new CircularArray()
26fb3c18
JB
163 },
164 waitTime: {
4ba4c7f9 165 history: new CircularArray()
26fb3c18
JB
166 },
167 elu: {
168 idle: {
4ba4c7f9 169 history: new CircularArray()
26fb3c18
JB
170 },
171 active: {
4ba4c7f9 172 history: new CircularArray()
26fb3c18
JB
173 }
174 }
175 })
75de9f41 176 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
26fb3c18
JB
177 tasks: {
178 executed: 0,
179 executing: 0,
180 queued: 0,
181 stolen: 0,
182 failed: 0
183 },
184 runTime: {
4ba4c7f9 185 history: new CircularArray()
26fb3c18
JB
186 },
187 waitTime: {
4ba4c7f9 188 history: new CircularArray()
26fb3c18
JB
189 },
190 elu: {
191 idle: {
4ba4c7f9 192 history: new CircularArray()
26fb3c18
JB
193 },
194 active: {
4ba4c7f9 195 history: new CircularArray()
26fb3c18
JB
196 }
197 }
198 })
75de9f41 199 expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
26fb3c18
JB
200 tasks: {
201 executed: 0,
202 executing: 0,
203 queued: 0,
204 stolen: 0,
205 failed: 0
206 },
207 runTime: {
4ba4c7f9 208 history: new CircularArray()
26fb3c18
JB
209 },
210 waitTime: {
4ba4c7f9 211 history: new CircularArray()
26fb3c18
JB
212 },
213 elu: {
214 idle: {
4ba4c7f9 215 history: new CircularArray()
26fb3c18
JB
216 },
217 active: {
4ba4c7f9 218 history: new CircularArray()
26fb3c18
JB
219 }
220 }
221 })
75de9f41 222 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
26fb3c18 223 })
adee6053
JB
224
225 it('Worker node deleteTaskFunctionWorkerUsage()', () => {
226 expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
227 DEFAULT_TASK_NAME,
228 'fn1',
229 'fn2'
230 ])
231 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
232 expect(
233 threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
234 ).toBe(false)
235 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
236 expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
237 expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
238 })
26fb3c18 239})