feat: add 'full' event on dynamic pool emitter
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 FixedClusterPool,
4 DynamicThreadPool,
5 FixedThreadPool,
6 WorkerChoiceStrategies
7 } = require('../../../lib/index')
8
9 describe('Abstract pool test suite', () => {
10 const numberOfWorkers = 1
11 const workerNotFoundInPoolError = new Error(
12 'Worker could not be found in the pool'
13 )
14 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
15 removeAllWorker () {
16 this.workers = []
17 this.promiseResponseMap.clear()
18 }
19 }
20 class StubPoolWithIsMain extends FixedThreadPool {
21 isMain () {
22 return false
23 }
24 }
25
26 it('Simulate pool creation from a non main thread/process', () => {
27 expect(
28 () =>
29 new StubPoolWithIsMain(
30 numberOfWorkers,
31 './tests/worker-files/thread/testWorker.js',
32 {
33 errorHandler: e => console.error(e)
34 }
35 )
36 ).toThrowError(new Error('Cannot start a pool from a worker!'))
37 })
38
39 it('Verify that filePath is checked', () => {
40 const expectedError = new Error(
41 'Please specify a file with a worker implementation'
42 )
43 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
44 expectedError
45 )
46 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
47 expectedError
48 )
49 })
50
51 it('Verify that numberOfWorkers is checked', () => {
52 expect(() => new FixedThreadPool()).toThrowError(
53 new Error(
54 'Cannot instantiate a pool without specifying the number of workers'
55 )
56 )
57 })
58
59 it('Verify that a negative number of workers is checked', () => {
60 expect(
61 () =>
62 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
63 ).toThrowError(
64 new RangeError(
65 'Cannot instantiate a pool with a negative number of workers'
66 )
67 )
68 })
69
70 it('Verify that a non integer number of workers is checked', () => {
71 expect(
72 () =>
73 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
74 ).toThrowError(
75 new TypeError(
76 'Cannot instantiate a pool with a non integer number of workers'
77 )
78 )
79 })
80
81 it('Verify that pool options are checked', async () => {
82 let pool = new FixedThreadPool(
83 numberOfWorkers,
84 './tests/worker-files/thread/testWorker.js'
85 )
86 expect(pool.opts.enableEvents).toBe(true)
87 expect(pool.emitter).toBeDefined()
88 expect(pool.opts.workerChoiceStrategy).toBe(
89 WorkerChoiceStrategies.ROUND_ROBIN
90 )
91 expect(pool.opts.messageHandler).toBeUndefined()
92 expect(pool.opts.errorHandler).toBeUndefined()
93 expect(pool.opts.onlineHandler).toBeUndefined()
94 expect(pool.opts.exitHandler).toBeUndefined()
95 await pool.destroy()
96 const testHandler = () => console.log('test handler executed')
97 pool = new FixedThreadPool(
98 numberOfWorkers,
99 './tests/worker-files/thread/testWorker.js',
100 {
101 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
102 enableEvents: false,
103 messageHandler: testHandler,
104 errorHandler: testHandler,
105 onlineHandler: testHandler,
106 exitHandler: testHandler
107 }
108 )
109 expect(pool.opts.enableEvents).toBe(false)
110 expect(pool.emitter).toBeUndefined()
111 expect(pool.opts.workerChoiceStrategy).toBe(
112 WorkerChoiceStrategies.LESS_USED
113 )
114 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
115 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
116 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
117 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
118 await pool.destroy()
119 })
120
121 it('Simulate worker not found during getWorkerTasksUsage', async () => {
122 const pool = new StubPoolWithRemoveAllWorker(
123 numberOfWorkers,
124 './tests/worker-files/cluster/testWorker.js',
125 {
126 errorHandler: e => console.error(e)
127 }
128 )
129 // Simulate worker not found.
130 pool.removeAllWorker()
131 expect(() => pool.getWorkerTasksUsage()).toThrowError(
132 workerNotFoundInPoolError
133 )
134 await pool.destroy()
135 })
136
137 it('Verify that worker pool tasks usage are initialized', async () => {
138 const pool = new FixedClusterPool(
139 numberOfWorkers,
140 './tests/worker-files/cluster/testWorker.js'
141 )
142 for (const workerItem of pool.workers) {
143 expect(workerItem.tasksUsage).toBeDefined()
144 expect(workerItem.tasksUsage.run).toBe(0)
145 expect(workerItem.tasksUsage.running).toBe(0)
146 expect(workerItem.tasksUsage.runTime).toBe(0)
147 expect(workerItem.tasksUsage.avgRunTime).toBe(0)
148 expect(workerItem.tasksUsage.error).toBe(0)
149 }
150 await pool.destroy()
151 })
152
153 it('Verify that worker pool tasks usage are computed', async () => {
154 const pool = new FixedClusterPool(
155 numberOfWorkers,
156 './tests/worker-files/cluster/testWorker.js'
157 )
158 const promises = []
159 for (let i = 0; i < numberOfWorkers * 2; i++) {
160 promises.push(pool.execute())
161 }
162 for (const workerItem of pool.workers) {
163 expect(workerItem.tasksUsage).toBeDefined()
164 expect(workerItem.tasksUsage.run).toBe(0)
165 expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2)
166 expect(workerItem.tasksUsage.runTime).toBe(0)
167 expect(workerItem.tasksUsage.avgRunTime).toBe(0)
168 expect(workerItem.tasksUsage.error).toBe(0)
169 }
170 await Promise.all(promises)
171 for (const workerItem of pool.workers) {
172 expect(workerItem.tasksUsage).toBeDefined()
173 expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
174 expect(workerItem.tasksUsage.running).toBe(0)
175 expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
176 expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
177 expect(workerItem.tasksUsage.error).toBe(0)
178 }
179 await pool.destroy()
180 })
181
182 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
183 const pool = new DynamicThreadPool(
184 numberOfWorkers,
185 numberOfWorkers,
186 './tests/worker-files/thread/testWorker.js'
187 )
188 const promises = []
189 for (let i = 0; i < numberOfWorkers * 2; i++) {
190 promises.push(pool.execute())
191 }
192 await Promise.all(promises)
193 for (const workerItem of pool.workers) {
194 expect(workerItem.tasksUsage).toBeDefined()
195 expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
196 expect(workerItem.tasksUsage.running).toBe(0)
197 expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
198 expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
199 expect(workerItem.tasksUsage.error).toBe(0)
200 }
201 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
202 for (const workerItem of pool.workers) {
203 expect(workerItem.tasksUsage).toBeDefined()
204 expect(workerItem.tasksUsage.run).toBe(0)
205 expect(workerItem.tasksUsage.running).toBe(0)
206 expect(workerItem.tasksUsage.runTime).toBe(0)
207 expect(workerItem.tasksUsage.avgRunTime).toBe(0)
208 expect(workerItem.tasksUsage.error).toBe(0)
209 }
210 await pool.destroy()
211 })
212
213 it("Verify that pool event emitter 'full' event can register a callback", async () => {
214 const pool = new DynamicThreadPool(
215 numberOfWorkers,
216 numberOfWorkers,
217 './tests/worker-files/thread/testWorker.js'
218 )
219 const promises = []
220 let poolFull = 0
221 pool.emitter.on('full', () => ++poolFull)
222 for (let i = 0; i < numberOfWorkers * 2; i++) {
223 promises.push(pool.execute())
224 }
225 await Promise.all(promises)
226 // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers.
227 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
228 expect(poolFull).toBe(numberOfWorkers + 1)
229 await pool.destroy()
230 })
231
232 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
233 const pool = new FixedThreadPool(
234 numberOfWorkers,
235 './tests/worker-files/thread/testWorker.js'
236 )
237 const promises = []
238 let poolBusy = 0
239 pool.emitter.on('busy', () => ++poolBusy)
240 for (let i = 0; i < numberOfWorkers * 2; i++) {
241 promises.push(pool.execute())
242 }
243 await Promise.all(promises)
244 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
245 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
246 expect(poolBusy).toBe(numberOfWorkers + 1)
247 await pool.destroy()
248 })
249 })