1a5a75a08df6ab03d84d7fa0aff06f617f59a797
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicThreadPool,
4 FixedClusterPool,
5 FixedThreadPool,
6 PoolEvents,
7 WorkerChoiceStrategies
8 } = require('../../../lib/index')
9 const { CircularArray } = require('../../../lib/circular-array')
10
11 describe('Abstract pool test suite', () => {
12 const numberOfWorkers = 1
13 const workerNotFoundInPoolError = new Error(
14 'Worker could not be found in the pool worker nodes'
15 )
16 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
17 removeAllWorker () {
18 this.workers = []
19 this.promiseResponseMap.clear()
20 }
21 }
22 class StubPoolWithIsMain extends FixedThreadPool {
23 isMain () {
24 return false
25 }
26 }
27
28 it('Simulate pool creation from a non main thread/process', () => {
29 expect(
30 () =>
31 new StubPoolWithIsMain(
32 numberOfWorkers,
33 './tests/worker-files/thread/testWorker.js',
34 {
35 errorHandler: e => console.error(e)
36 }
37 )
38 ).toThrowError(new Error('Cannot start a pool from a worker!'))
39 })
40
41 it('Verify that filePath is checked', () => {
42 const expectedError = new Error(
43 'Please specify a file with a worker implementation'
44 )
45 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
46 expectedError
47 )
48 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
49 expectedError
50 )
51 })
52
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
55 new Error(
56 'Cannot instantiate a pool without specifying the number of workers'
57 )
58 )
59 })
60
61 it('Verify that a negative number of workers is checked', () => {
62 expect(
63 () =>
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
65 ).toThrowError(
66 new RangeError(
67 'Cannot instantiate a pool with a negative number of workers'
68 )
69 )
70 })
71
72 it('Verify that a non integer number of workers is checked', () => {
73 expect(
74 () =>
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
76 ).toThrowError(
77 new TypeError(
78 'Cannot instantiate a pool with a non integer number of workers'
79 )
80 )
81 })
82
83 it('Verify that pool options are checked', async () => {
84 let pool = new FixedThreadPool(
85 numberOfWorkers,
86 './tests/worker-files/thread/testWorker.js'
87 )
88 expect(pool.opts.enableEvents).toBe(true)
89 expect(pool.emitter).toBeDefined()
90 expect(pool.opts.enableTasksQueue).toBe(false)
91 expect(pool.opts.workerChoiceStrategy).toBe(
92 WorkerChoiceStrategies.ROUND_ROBIN
93 )
94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
95 medRunTime: false
96 })
97 expect(pool.opts.messageHandler).toBeUndefined()
98 expect(pool.opts.errorHandler).toBeUndefined()
99 expect(pool.opts.onlineHandler).toBeUndefined()
100 expect(pool.opts.exitHandler).toBeUndefined()
101 await pool.destroy()
102 const testHandler = () => console.log('test handler executed')
103 pool = new FixedThreadPool(
104 numberOfWorkers,
105 './tests/worker-files/thread/testWorker.js',
106 {
107 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
108 workerChoiceStrategyOptions: { medRunTime: true },
109 enableEvents: false,
110 enableTasksQueue: true,
111 messageHandler: testHandler,
112 errorHandler: testHandler,
113 onlineHandler: testHandler,
114 exitHandler: testHandler
115 }
116 )
117 expect(pool.opts.enableEvents).toBe(false)
118 expect(pool.emitter).toBeUndefined()
119 expect(pool.opts.enableTasksQueue).toBe(true)
120 expect(pool.opts.workerChoiceStrategy).toBe(
121 WorkerChoiceStrategies.LESS_USED
122 )
123 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
124 medRunTime: true
125 })
126 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
127 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
128 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
129 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
130 await pool.destroy()
131 })
132
133 it('Simulate worker not found during getWorkerTasksUsage', async () => {
134 const pool = new StubPoolWithRemoveAllWorker(
135 numberOfWorkers,
136 './tests/worker-files/cluster/testWorker.js',
137 {
138 errorHandler: e => console.error(e)
139 }
140 )
141 // Simulate worker not found.
142 pool.removeAllWorker()
143 expect(() => pool.getWorkerTasksUsage()).toThrowError(
144 workerNotFoundInPoolError
145 )
146 await pool.destroy()
147 })
148
149 it('Verify that worker pool tasks usage are initialized', async () => {
150 const pool = new FixedClusterPool(
151 numberOfWorkers,
152 './tests/worker-files/cluster/testWorker.js'
153 )
154 for (const workerNode of pool.workerNodes) {
155 expect(workerNode.tasksUsage).toBeDefined()
156 expect(workerNode.tasksUsage.run).toBe(0)
157 expect(workerNode.tasksUsage.running).toBe(0)
158 expect(workerNode.tasksUsage.runTime).toBe(0)
159 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
160 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
161 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
162 expect(workerNode.tasksUsage.medRunTime).toBe(0)
163 expect(workerNode.tasksUsage.error).toBe(0)
164 }
165 await pool.destroy()
166 })
167
168 it('Verify that worker pool tasks queue are initialized', async () => {
169 const pool = new FixedClusterPool(
170 numberOfWorkers,
171 './tests/worker-files/cluster/testWorker.js'
172 )
173 for (const workerNode of pool.workerNodes) {
174 expect(workerNode.tasksQueue).toBeDefined()
175 expect(workerNode.tasksQueue).toBeInstanceOf(Array)
176 expect(workerNode.tasksQueue.length).toBe(0)
177 }
178 await pool.destroy()
179 })
180
181 it('Verify that worker pool tasks usage are computed', async () => {
182 const pool = new FixedClusterPool(
183 numberOfWorkers,
184 './tests/worker-files/cluster/testWorker.js'
185 )
186 const promises = []
187 for (let i = 0; i < numberOfWorkers * 2; i++) {
188 promises.push(pool.execute())
189 }
190 for (const workerNode of pool.workerNodes) {
191 expect(workerNode.tasksUsage).toBeDefined()
192 expect(workerNode.tasksUsage.run).toBe(0)
193 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
194 expect(workerNode.tasksUsage.runTime).toBe(0)
195 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
196 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
197 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
198 expect(workerNode.tasksUsage.medRunTime).toBe(0)
199 expect(workerNode.tasksUsage.error).toBe(0)
200 }
201 await Promise.all(promises)
202 for (const workerNode of pool.workerNodes) {
203 expect(workerNode.tasksUsage).toBeDefined()
204 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
205 expect(workerNode.tasksUsage.running).toBe(0)
206 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
207 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
208 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
209 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
210 expect(workerNode.tasksUsage.medRunTime).toBe(0)
211 expect(workerNode.tasksUsage.error).toBe(0)
212 }
213 await pool.destroy()
214 })
215
216 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
217 const pool = new DynamicThreadPool(
218 numberOfWorkers,
219 numberOfWorkers,
220 './tests/worker-files/thread/testWorker.js'
221 )
222 const promises = []
223 for (let i = 0; i < numberOfWorkers * 2; i++) {
224 promises.push(pool.execute())
225 }
226 await Promise.all(promises)
227 for (const workerNode of pool.workerNodes) {
228 expect(workerNode.tasksUsage).toBeDefined()
229 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
230 expect(workerNode.tasksUsage.running).toBe(0)
231 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
232 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
233 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
234 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
235 expect(workerNode.tasksUsage.medRunTime).toBe(0)
236 expect(workerNode.tasksUsage.error).toBe(0)
237 }
238 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
239 for (const workerNode of pool.workerNodes) {
240 expect(workerNode.tasksUsage).toBeDefined()
241 expect(workerNode.tasksUsage.run).toBe(0)
242 expect(workerNode.tasksUsage.running).toBe(0)
243 expect(workerNode.tasksUsage.runTime).toBe(0)
244 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
245 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
246 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
247 expect(workerNode.tasksUsage.medRunTime).toBe(0)
248 expect(workerNode.tasksUsage.error).toBe(0)
249 }
250 await pool.destroy()
251 })
252
253 it("Verify that pool event emitter 'full' event can register a callback", async () => {
254 const pool = new DynamicThreadPool(
255 numberOfWorkers,
256 numberOfWorkers,
257 './tests/worker-files/thread/testWorker.js'
258 )
259 const promises = []
260 let poolFull = 0
261 pool.emitter.on(PoolEvents.full, () => ++poolFull)
262 for (let i = 0; i < numberOfWorkers * 2; i++) {
263 promises.push(pool.execute())
264 }
265 await Promise.all(promises)
266 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
267 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
268 expect(poolFull).toBe(numberOfWorkers + 1)
269 await pool.destroy()
270 })
271
272 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
273 const pool = new FixedThreadPool(
274 numberOfWorkers,
275 './tests/worker-files/thread/testWorker.js'
276 )
277 const promises = []
278 let poolBusy = 0
279 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
280 for (let i = 0; i < numberOfWorkers * 2; i++) {
281 promises.push(pool.execute())
282 }
283 await Promise.all(promises)
284 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
285 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
286 expect(poolBusy).toBe(numberOfWorkers + 1)
287 await pool.destroy()
288 })
289 })