refactor: add PoolEvents/PoolEvent types
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicThreadPool,
4 FixedClusterPool,
5 FixedThreadPool,
6 PoolEvents,
7 WorkerChoiceStrategies
8 } = require('../../../lib/index')
9
10 describe('Abstract pool test suite', () => {
11 const numberOfWorkers = 1
12 const workerNotFoundInPoolError = new Error(
13 'Worker could not be found in the pool'
14 )
15 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
16 removeAllWorker () {
17 this.workers = []
18 this.promiseResponseMap.clear()
19 }
20 }
21 class StubPoolWithIsMain extends FixedThreadPool {
22 isMain () {
23 return false
24 }
25 }
26
27 it('Simulate pool creation from a non main thread/process', () => {
28 expect(
29 () =>
30 new StubPoolWithIsMain(
31 numberOfWorkers,
32 './tests/worker-files/thread/testWorker.js',
33 {
34 errorHandler: e => console.error(e)
35 }
36 )
37 ).toThrowError(new Error('Cannot start a pool from a worker!'))
38 })
39
40 it('Verify that filePath is checked', () => {
41 const expectedError = new Error(
42 'Please specify a file with a worker implementation'
43 )
44 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
45 expectedError
46 )
47 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
48 expectedError
49 )
50 })
51
52 it('Verify that numberOfWorkers is checked', () => {
53 expect(() => new FixedThreadPool()).toThrowError(
54 new Error(
55 'Cannot instantiate a pool without specifying the number of workers'
56 )
57 )
58 })
59
60 it('Verify that a negative number of workers is checked', () => {
61 expect(
62 () =>
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 ).toThrowError(
65 new RangeError(
66 'Cannot instantiate a pool with a negative number of workers'
67 )
68 )
69 })
70
71 it('Verify that a non integer number of workers is checked', () => {
72 expect(
73 () =>
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 ).toThrowError(
76 new TypeError(
77 'Cannot instantiate a pool with a non integer number of workers'
78 )
79 )
80 })
81
82 it('Verify that pool options are checked', async () => {
83 let pool = new FixedThreadPool(
84 numberOfWorkers,
85 './tests/worker-files/thread/testWorker.js'
86 )
87 expect(pool.opts.enableEvents).toBe(true)
88 expect(pool.emitter).toBeDefined()
89 expect(pool.opts.workerChoiceStrategy).toBe(
90 WorkerChoiceStrategies.ROUND_ROBIN
91 )
92 expect(pool.opts.messageHandler).toBeUndefined()
93 expect(pool.opts.errorHandler).toBeUndefined()
94 expect(pool.opts.onlineHandler).toBeUndefined()
95 expect(pool.opts.exitHandler).toBeUndefined()
96 await pool.destroy()
97 const testHandler = () => console.log('test handler executed')
98 pool = new FixedThreadPool(
99 numberOfWorkers,
100 './tests/worker-files/thread/testWorker.js',
101 {
102 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
103 enableEvents: false,
104 messageHandler: testHandler,
105 errorHandler: testHandler,
106 onlineHandler: testHandler,
107 exitHandler: testHandler
108 }
109 )
110 expect(pool.opts.enableEvents).toBe(false)
111 expect(pool.emitter).toBeUndefined()
112 expect(pool.opts.workerChoiceStrategy).toBe(
113 WorkerChoiceStrategies.LESS_USED
114 )
115 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
116 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
117 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
118 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
119 await pool.destroy()
120 })
121
122 it('Simulate worker not found during getWorkerTasksUsage', async () => {
123 const pool = new StubPoolWithRemoveAllWorker(
124 numberOfWorkers,
125 './tests/worker-files/cluster/testWorker.js',
126 {
127 errorHandler: e => console.error(e)
128 }
129 )
130 // Simulate worker not found.
131 pool.removeAllWorker()
132 expect(() => pool.getWorkerTasksUsage()).toThrowError(
133 workerNotFoundInPoolError
134 )
135 await pool.destroy()
136 })
137
138 it('Verify that worker pool tasks usage are initialized', async () => {
139 const pool = new FixedClusterPool(
140 numberOfWorkers,
141 './tests/worker-files/cluster/testWorker.js'
142 )
143 for (const workerItem of pool.workers) {
144 expect(workerItem.tasksUsage).toBeDefined()
145 expect(workerItem.tasksUsage.run).toBe(0)
146 expect(workerItem.tasksUsage.running).toBe(0)
147 expect(workerItem.tasksUsage.runTime).toBe(0)
148 expect(workerItem.tasksUsage.avgRunTime).toBe(0)
149 expect(workerItem.tasksUsage.error).toBe(0)
150 }
151 await pool.destroy()
152 })
153
154 it('Verify that worker pool tasks usage are computed', async () => {
155 const pool = new FixedClusterPool(
156 numberOfWorkers,
157 './tests/worker-files/cluster/testWorker.js'
158 )
159 const promises = []
160 for (let i = 0; i < numberOfWorkers * 2; i++) {
161 promises.push(pool.execute())
162 }
163 for (const workerItem of pool.workers) {
164 expect(workerItem.tasksUsage).toBeDefined()
165 expect(workerItem.tasksUsage.run).toBe(0)
166 expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2)
167 expect(workerItem.tasksUsage.runTime).toBe(0)
168 expect(workerItem.tasksUsage.avgRunTime).toBe(0)
169 expect(workerItem.tasksUsage.error).toBe(0)
170 }
171 await Promise.all(promises)
172 for (const workerItem of pool.workers) {
173 expect(workerItem.tasksUsage).toBeDefined()
174 expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
175 expect(workerItem.tasksUsage.running).toBe(0)
176 expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
177 expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
178 expect(workerItem.tasksUsage.error).toBe(0)
179 }
180 await pool.destroy()
181 })
182
183 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
184 const pool = new DynamicThreadPool(
185 numberOfWorkers,
186 numberOfWorkers,
187 './tests/worker-files/thread/testWorker.js'
188 )
189 const promises = []
190 for (let i = 0; i < numberOfWorkers * 2; i++) {
191 promises.push(pool.execute())
192 }
193 await Promise.all(promises)
194 for (const workerItem of pool.workers) {
195 expect(workerItem.tasksUsage).toBeDefined()
196 expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
197 expect(workerItem.tasksUsage.running).toBe(0)
198 expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
199 expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
200 expect(workerItem.tasksUsage.error).toBe(0)
201 }
202 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
203 for (const workerItem of pool.workers) {
204 expect(workerItem.tasksUsage).toBeDefined()
205 expect(workerItem.tasksUsage.run).toBe(0)
206 expect(workerItem.tasksUsage.running).toBe(0)
207 expect(workerItem.tasksUsage.runTime).toBe(0)
208 expect(workerItem.tasksUsage.avgRunTime).toBe(0)
209 expect(workerItem.tasksUsage.error).toBe(0)
210 }
211 await pool.destroy()
212 })
213
214 it("Verify that pool event emitter 'full' event can register a callback", async () => {
215 const pool = new DynamicThreadPool(
216 numberOfWorkers,
217 numberOfWorkers,
218 './tests/worker-files/thread/testWorker.js'
219 )
220 const promises = []
221 let poolFull = 0
222 pool.emitter.on(PoolEvents.full, () => ++poolFull)
223 for (let i = 0; i < numberOfWorkers * 2; i++) {
224 promises.push(pool.execute())
225 }
226 await Promise.all(promises)
227 // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers.
228 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
229 expect(poolFull).toBe(numberOfWorkers + 1)
230 await pool.destroy()
231 })
232
233 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
234 const pool = new FixedThreadPool(
235 numberOfWorkers,
236 './tests/worker-files/thread/testWorker.js'
237 )
238 const promises = []
239 let poolBusy = 0
240 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
241 for (let i = 0; i < numberOfWorkers * 2; i++) {
242 promises.push(pool.execute())
243 }
244 await Promise.all(promises)
245 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
246 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
247 expect(poolBusy).toBe(numberOfWorkers + 1)
248 await pool.destroy()
249 })
250 })