feat: add option to enable worker tasks queue
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicThreadPool,
4 FixedClusterPool,
5 FixedThreadPool,
6 PoolEvents,
7 WorkerChoiceStrategies
8 } = require('../../../lib/index')
9 const { CircularArray } = require('../../../lib/circular-array')
10
11 describe('Abstract pool test suite', () => {
12 const numberOfWorkers = 1
13 const workerNotFoundInPoolError = new Error(
14 'Worker could not be found in the pool worker nodes'
15 )
16 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
17 removeAllWorker () {
18 this.workers = []
19 this.promiseResponseMap.clear()
20 }
21 }
22 class StubPoolWithIsMain extends FixedThreadPool {
23 isMain () {
24 return false
25 }
26 }
27
28 it('Simulate pool creation from a non main thread/process', () => {
29 expect(
30 () =>
31 new StubPoolWithIsMain(
32 numberOfWorkers,
33 './tests/worker-files/thread/testWorker.js',
34 {
35 errorHandler: e => console.error(e)
36 }
37 )
38 ).toThrowError(new Error('Cannot start a pool from a worker!'))
39 })
40
41 it('Verify that filePath is checked', () => {
42 const expectedError = new Error(
43 'Please specify a file with a worker implementation'
44 )
45 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
46 expectedError
47 )
48 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
49 expectedError
50 )
51 })
52
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
55 new Error(
56 'Cannot instantiate a pool without specifying the number of workers'
57 )
58 )
59 })
60
61 it('Verify that a negative number of workers is checked', () => {
62 expect(
63 () =>
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
65 ).toThrowError(
66 new RangeError(
67 'Cannot instantiate a pool with a negative number of workers'
68 )
69 )
70 })
71
72 it('Verify that a non integer number of workers is checked', () => {
73 expect(
74 () =>
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
76 ).toThrowError(
77 new TypeError(
78 'Cannot instantiate a pool with a non integer number of workers'
79 )
80 )
81 })
82
83 it('Verify that pool options are checked', async () => {
84 let pool = new FixedThreadPool(
85 numberOfWorkers,
86 './tests/worker-files/thread/testWorker.js'
87 )
88 expect(pool.opts.enableEvents).toBe(true)
89 expect(pool.emitter).toBeDefined()
90 expect(pool.opts.enableTasksQueue).toBe(false)
91 expect(pool.opts.workerChoiceStrategy).toBe(
92 WorkerChoiceStrategies.ROUND_ROBIN
93 )
94 expect(pool.opts.messageHandler).toBeUndefined()
95 expect(pool.opts.errorHandler).toBeUndefined()
96 expect(pool.opts.onlineHandler).toBeUndefined()
97 expect(pool.opts.exitHandler).toBeUndefined()
98 await pool.destroy()
99 const testHandler = () => console.log('test handler executed')
100 pool = new FixedThreadPool(
101 numberOfWorkers,
102 './tests/worker-files/thread/testWorker.js',
103 {
104 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
105 enableEvents: false,
106 enableTasksQueue: true,
107 messageHandler: testHandler,
108 errorHandler: testHandler,
109 onlineHandler: testHandler,
110 exitHandler: testHandler
111 }
112 )
113 expect(pool.opts.enableEvents).toBe(false)
114 expect(pool.emitter).toBeUndefined()
115 expect(pool.opts.enableTasksQueue).toBe(true)
116 expect(pool.opts.workerChoiceStrategy).toBe(
117 WorkerChoiceStrategies.LESS_USED
118 )
119 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
120 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
121 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
122 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
123 await pool.destroy()
124 })
125
126 it('Simulate worker not found during getWorkerTasksUsage', async () => {
127 const pool = new StubPoolWithRemoveAllWorker(
128 numberOfWorkers,
129 './tests/worker-files/cluster/testWorker.js',
130 {
131 errorHandler: e => console.error(e)
132 }
133 )
134 // Simulate worker not found.
135 pool.removeAllWorker()
136 expect(() => pool.getWorkerTasksUsage()).toThrowError(
137 workerNotFoundInPoolError
138 )
139 await pool.destroy()
140 })
141
142 it('Verify that worker pool tasks usage are initialized', async () => {
143 const pool = new FixedClusterPool(
144 numberOfWorkers,
145 './tests/worker-files/cluster/testWorker.js'
146 )
147 for (const workerNode of pool.workerNodes) {
148 expect(workerNode.tasksUsage).toBeDefined()
149 expect(workerNode.tasksUsage.run).toBe(0)
150 expect(workerNode.tasksUsage.running).toBe(0)
151 expect(workerNode.tasksUsage.runTime).toBe(0)
152 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
153 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
154 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
155 expect(workerNode.tasksUsage.medRunTime).toBe(0)
156 expect(workerNode.tasksUsage.error).toBe(0)
157 }
158 await pool.destroy()
159 })
160
161 it('Verify that worker pool tasks queue are initialized', async () => {
162 const pool = new FixedClusterPool(
163 numberOfWorkers,
164 './tests/worker-files/cluster/testWorker.js'
165 )
166 for (const workerNode of pool.workerNodes) {
167 expect(workerNode.tasksQueue).toBeDefined()
168 expect(workerNode.tasksQueue).toBeInstanceOf(Array)
169 expect(workerNode.tasksQueue.length).toBe(0)
170 }
171 await pool.destroy()
172 })
173
174 it('Verify that worker pool tasks usage are computed', async () => {
175 const pool = new FixedClusterPool(
176 numberOfWorkers,
177 './tests/worker-files/cluster/testWorker.js'
178 )
179 const promises = []
180 for (let i = 0; i < numberOfWorkers * 2; i++) {
181 promises.push(pool.execute())
182 }
183 for (const workerNode of pool.workerNodes) {
184 expect(workerNode.tasksUsage).toBeDefined()
185 expect(workerNode.tasksUsage.run).toBe(0)
186 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
187 expect(workerNode.tasksUsage.runTime).toBe(0)
188 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
189 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
190 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
191 expect(workerNode.tasksUsage.medRunTime).toBe(0)
192 expect(workerNode.tasksUsage.error).toBe(0)
193 }
194 await Promise.all(promises)
195 for (const workerNode of pool.workerNodes) {
196 expect(workerNode.tasksUsage).toBeDefined()
197 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
198 expect(workerNode.tasksUsage.running).toBe(0)
199 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
200 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
201 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
202 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
203 expect(workerNode.tasksUsage.medRunTime).toBe(0)
204 expect(workerNode.tasksUsage.error).toBe(0)
205 }
206 await pool.destroy()
207 })
208
209 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
210 const pool = new DynamicThreadPool(
211 numberOfWorkers,
212 numberOfWorkers,
213 './tests/worker-files/thread/testWorker.js'
214 )
215 const promises = []
216 for (let i = 0; i < numberOfWorkers * 2; i++) {
217 promises.push(pool.execute())
218 }
219 await Promise.all(promises)
220 for (const workerNode of pool.workerNodes) {
221 expect(workerNode.tasksUsage).toBeDefined()
222 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
223 expect(workerNode.tasksUsage.running).toBe(0)
224 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
225 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
226 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
227 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
228 expect(workerNode.tasksUsage.medRunTime).toBe(0)
229 expect(workerNode.tasksUsage.error).toBe(0)
230 }
231 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
232 for (const workerNode of pool.workerNodes) {
233 expect(workerNode.tasksUsage).toBeDefined()
234 expect(workerNode.tasksUsage.run).toBe(0)
235 expect(workerNode.tasksUsage.running).toBe(0)
236 expect(workerNode.tasksUsage.runTime).toBe(0)
237 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
238 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
239 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
240 expect(workerNode.tasksUsage.medRunTime).toBe(0)
241 expect(workerNode.tasksUsage.error).toBe(0)
242 }
243 await pool.destroy()
244 })
245
246 it("Verify that pool event emitter 'full' event can register a callback", async () => {
247 const pool = new DynamicThreadPool(
248 numberOfWorkers,
249 numberOfWorkers,
250 './tests/worker-files/thread/testWorker.js'
251 )
252 const promises = []
253 let poolFull = 0
254 pool.emitter.on(PoolEvents.full, () => ++poolFull)
255 for (let i = 0; i < numberOfWorkers * 2; i++) {
256 promises.push(pool.execute())
257 }
258 await Promise.all(promises)
259 // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers.
260 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
261 expect(poolFull).toBe(numberOfWorkers + 1)
262 await pool.destroy()
263 })
264
265 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
266 const pool = new FixedThreadPool(
267 numberOfWorkers,
268 './tests/worker-files/thread/testWorker.js'
269 )
270 const promises = []
271 let poolBusy = 0
272 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
273 for (let i = 0; i < numberOfWorkers * 2; i++) {
274 promises.push(pool.execute())
275 }
276 await Promise.all(promises)
277 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
278 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
279 expect(poolBusy).toBe(numberOfWorkers + 1)
280 await pool.destroy()
281 })
282 })