5063c7fd31c6f058450536dfea3a6519873a88ba
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicThreadPool,
4 FixedClusterPool,
5 FixedThreadPool,
6 PoolEvents,
7 WorkerChoiceStrategies
8 } = require('../../../lib/index')
9 const { CircularArray } = require('../../../lib/circular-array')
10
11 describe('Abstract pool test suite', () => {
12 const numberOfWorkers = 1
13 const workerNotFoundInPoolError = new Error(
14 'Worker could not be found in the pool'
15 )
16 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
17 removeAllWorker () {
18 this.workers = []
19 this.promiseResponseMap.clear()
20 }
21 }
22 class StubPoolWithIsMain extends FixedThreadPool {
23 isMain () {
24 return false
25 }
26 }
27
28 it('Simulate pool creation from a non main thread/process', () => {
29 expect(
30 () =>
31 new StubPoolWithIsMain(
32 numberOfWorkers,
33 './tests/worker-files/thread/testWorker.js',
34 {
35 errorHandler: e => console.error(e)
36 }
37 )
38 ).toThrowError(new Error('Cannot start a pool from a worker!'))
39 })
40
41 it('Verify that filePath is checked', () => {
42 const expectedError = new Error(
43 'Please specify a file with a worker implementation'
44 )
45 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
46 expectedError
47 )
48 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
49 expectedError
50 )
51 })
52
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
55 new Error(
56 'Cannot instantiate a pool without specifying the number of workers'
57 )
58 )
59 })
60
61 it('Verify that a negative number of workers is checked', () => {
62 expect(
63 () =>
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
65 ).toThrowError(
66 new RangeError(
67 'Cannot instantiate a pool with a negative number of workers'
68 )
69 )
70 })
71
72 it('Verify that a non integer number of workers is checked', () => {
73 expect(
74 () =>
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
76 ).toThrowError(
77 new TypeError(
78 'Cannot instantiate a pool with a non integer number of workers'
79 )
80 )
81 })
82
83 it('Verify that pool options are checked', async () => {
84 let pool = new FixedThreadPool(
85 numberOfWorkers,
86 './tests/worker-files/thread/testWorker.js'
87 )
88 expect(pool.opts.enableEvents).toBe(true)
89 expect(pool.emitter).toBeDefined()
90 expect(pool.opts.workerChoiceStrategy).toBe(
91 WorkerChoiceStrategies.ROUND_ROBIN
92 )
93 expect(pool.opts.messageHandler).toBeUndefined()
94 expect(pool.opts.errorHandler).toBeUndefined()
95 expect(pool.opts.onlineHandler).toBeUndefined()
96 expect(pool.opts.exitHandler).toBeUndefined()
97 await pool.destroy()
98 const testHandler = () => console.log('test handler executed')
99 pool = new FixedThreadPool(
100 numberOfWorkers,
101 './tests/worker-files/thread/testWorker.js',
102 {
103 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
104 enableEvents: false,
105 messageHandler: testHandler,
106 errorHandler: testHandler,
107 onlineHandler: testHandler,
108 exitHandler: testHandler
109 }
110 )
111 expect(pool.opts.enableEvents).toBe(false)
112 expect(pool.emitter).toBeUndefined()
113 expect(pool.opts.workerChoiceStrategy).toBe(
114 WorkerChoiceStrategies.LESS_USED
115 )
116 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
117 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
118 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
119 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
120 await pool.destroy()
121 })
122
123 it('Simulate worker not found during getWorkerTasksUsage', async () => {
124 const pool = new StubPoolWithRemoveAllWorker(
125 numberOfWorkers,
126 './tests/worker-files/cluster/testWorker.js',
127 {
128 errorHandler: e => console.error(e)
129 }
130 )
131 // Simulate worker not found.
132 pool.removeAllWorker()
133 expect(() => pool.getWorkerTasksUsage()).toThrowError(
134 workerNotFoundInPoolError
135 )
136 await pool.destroy()
137 })
138
139 it('Verify that worker pool tasks usage are initialized', async () => {
140 const pool = new FixedClusterPool(
141 numberOfWorkers,
142 './tests/worker-files/cluster/testWorker.js'
143 )
144 for (const workerItem of pool.workers) {
145 expect(workerItem.tasksUsage).toBeDefined()
146 expect(workerItem.tasksUsage.run).toBe(0)
147 expect(workerItem.tasksUsage.running).toBe(0)
148 expect(workerItem.tasksUsage.runTime).toBe(0)
149 expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
150 expect(workerItem.tasksUsage.avgRunTime).toBe(0)
151 expect(workerItem.tasksUsage.medRunTime).toBe(0)
152 expect(workerItem.tasksUsage.error).toBe(0)
153 }
154 await pool.destroy()
155 })
156
157 it('Verify that worker pool tasks usage are computed', async () => {
158 const pool = new FixedClusterPool(
159 numberOfWorkers,
160 './tests/worker-files/cluster/testWorker.js'
161 )
162 const promises = []
163 for (let i = 0; i < numberOfWorkers * 2; i++) {
164 promises.push(pool.execute())
165 }
166 for (const workerItem of pool.workers) {
167 expect(workerItem.tasksUsage).toBeDefined()
168 expect(workerItem.tasksUsage.run).toBe(0)
169 expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2)
170 expect(workerItem.tasksUsage.runTime).toBe(0)
171 expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
172 expect(workerItem.tasksUsage.avgRunTime).toBe(0)
173 expect(workerItem.tasksUsage.medRunTime).toBe(0)
174 expect(workerItem.tasksUsage.error).toBe(0)
175 }
176 await Promise.all(promises)
177 for (const workerItem of pool.workers) {
178 expect(workerItem.tasksUsage).toBeDefined()
179 expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
180 expect(workerItem.tasksUsage.running).toBe(0)
181 expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
182 expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
183 expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
184 expect(workerItem.tasksUsage.medRunTime).toBe(0)
185 expect(workerItem.tasksUsage.error).toBe(0)
186 }
187 await pool.destroy()
188 })
189
190 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
191 const pool = new DynamicThreadPool(
192 numberOfWorkers,
193 numberOfWorkers,
194 './tests/worker-files/thread/testWorker.js'
195 )
196 const promises = []
197 for (let i = 0; i < numberOfWorkers * 2; i++) {
198 promises.push(pool.execute())
199 }
200 await Promise.all(promises)
201 for (const workerItem of pool.workers) {
202 expect(workerItem.tasksUsage).toBeDefined()
203 expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
204 expect(workerItem.tasksUsage.running).toBe(0)
205 expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
206 expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
207 expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
208 expect(workerItem.tasksUsage.medRunTime).toBe(0)
209 expect(workerItem.tasksUsage.error).toBe(0)
210 }
211 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
212 for (const workerItem of pool.workers) {
213 expect(workerItem.tasksUsage).toBeDefined()
214 expect(workerItem.tasksUsage.run).toBe(0)
215 expect(workerItem.tasksUsage.running).toBe(0)
216 expect(workerItem.tasksUsage.runTime).toBe(0)
217 expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
218 expect(workerItem.tasksUsage.avgRunTime).toBe(0)
219 expect(workerItem.tasksUsage.medRunTime).toBe(0)
220 expect(workerItem.tasksUsage.error).toBe(0)
221 }
222 await pool.destroy()
223 })
224
225 it("Verify that pool event emitter 'full' event can register a callback", async () => {
226 const pool = new DynamicThreadPool(
227 numberOfWorkers,
228 numberOfWorkers,
229 './tests/worker-files/thread/testWorker.js'
230 )
231 const promises = []
232 let poolFull = 0
233 pool.emitter.on(PoolEvents.full, () => ++poolFull)
234 for (let i = 0; i < numberOfWorkers * 2; i++) {
235 promises.push(pool.execute())
236 }
237 await Promise.all(promises)
238 // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers.
239 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
240 expect(poolFull).toBe(numberOfWorkers + 1)
241 await pool.destroy()
242 })
243
244 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
245 const pool = new FixedThreadPool(
246 numberOfWorkers,
247 './tests/worker-files/thread/testWorker.js'
248 )
249 const promises = []
250 let poolBusy = 0
251 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
252 for (let i = 0; i < numberOfWorkers * 2; i++) {
253 promises.push(pool.execute())
254 }
255 await Promise.all(promises)
256 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
257 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
258 expect(poolBusy).toBe(numberOfWorkers + 1)
259 await pool.destroy()
260 })
261 })