test: improve event emitter tests
[poolifier.git] / tests / pools / thread / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedThreadPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
6cd5248f 5const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
506c2a14 6
079de991 7describe('Fixed thread pool test suite', () => {
e1ffb94f 8 const numberOfThreads = 6
4e377863 9 const tasksConcurrency = 2
e1ffb94f
JB
10 const pool = new FixedThreadPool(
11 numberOfThreads,
12 './tests/worker-files/thread/testWorker.js',
13 {
041dc05b 14 errorHandler: e => console.error(e)
e1ffb94f
JB
15 }
16 )
594bfb84
JB
17 const queuePool = new FixedThreadPool(
18 numberOfThreads,
19 './tests/worker-files/thread/testWorker.js',
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
4e377863 23 concurrency: tasksConcurrency
594bfb84 24 },
041dc05b 25 errorHandler: e => console.error(e)
594bfb84
JB
26 }
27 )
e1ffb94f
JB
28 const emptyPool = new FixedThreadPool(
29 numberOfThreads,
30 './tests/worker-files/thread/emptyWorker.js',
73bfd59d 31 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
32 )
33 const echoPool = new FixedThreadPool(
34 numberOfThreads,
35 './tests/worker-files/thread/echoWorker.js'
36 )
37 const errorPool = new FixedThreadPool(
38 numberOfThreads,
39 './tests/worker-files/thread/errorWorker.js',
40 {
041dc05b 41 errorHandler: e => console.error(e)
e1ffb94f
JB
42 }
43 )
44 const asyncErrorPool = new FixedThreadPool(
45 numberOfThreads,
46 './tests/worker-files/thread/asyncErrorWorker.js',
47 {
041dc05b 48 errorHandler: e => console.error(e)
e1ffb94f
JB
49 }
50 )
51 const asyncPool = new FixedThreadPool(
52 numberOfThreads,
53 './tests/worker-files/thread/asyncWorker.js'
54 )
55
0e2503fc
JB
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
7c0ba920 61 await asyncErrorPool.destroy()
0e2503fc 62 await emptyPool.destroy()
594bfb84 63 await queuePool.destroy()
0e2503fc
JB
64 })
65
506c2a14 66 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9 67 let result = await pool.execute({
dbca3be9 68 function: TaskFunctions.fibonacci
6db75ad9 69 })
024daf59 70 expect(result).toBe(75025)
6db75ad9 71 result = await pool.execute({
dbca3be9 72 function: TaskFunctions.factorial
6db75ad9 73 })
70a4f5ea 74 expect(result).toBe(9.33262154439441e157)
506c2a14 75 })
76
318d4156 77 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 78 const result = await pool.execute()
30b963d4 79 expect(result).toStrictEqual({ ok: 1 })
106744f7 80 })
81
1dcbfefc 82 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 83 const pool = new FixedThreadPool(
079de991
JB
84 numberOfThreads,
85 './tests/worker-files/thread/testWorker.js',
86 {
041dc05b 87 errorHandler: e => console.error(e)
079de991
JB
88 }
89 )
c726f66c 90 expect(pool.emitter.eventNames()).toStrictEqual([])
079de991 91 let poolReady = 0
0fe39c97
JB
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
216541b6 95 expect(poolReady).toBe(1)
0fe39c97 96 await pool.destroy()
216541b6
JB
97 })
98
94407def
JB
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
c726f66c 101 expect(pool.emitter.eventNames()).toStrictEqual([])
7c0ba920 102 let poolBusy = 0
aee46736 103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
c726f66c 104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
7c0ba920 105 for (let i = 0; i < numberOfThreads * 2; i++) {
94407def 106 promises.add(pool.execute())
7c0ba920 107 }
94407def 108 await Promise.all(promises)
14916bf9
JB
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
112 })
113
594bfb84 114 it('Verify that tasks queuing is working', async () => {
d3d4b67d 115 const promises = new Set()
4e377863 116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 117 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 118 promises.add(queuePool.execute())
594bfb84 119 }
d3d4b67d 120 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 121 for (const workerNode of queuePool.workerNodes) {
5bb5be17 122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
124 queuePool.opts.tasksQueueOptions.concurrency
125 )
f59e1027 126 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
129 )
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
132 )
1f0766e7 133 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 134 }
1f0766e7 135 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
136 expect(queuePool.info.executingTasks).toBe(
137 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
138 )
6b27d407 139 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
140 numberOfThreads *
141 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
142 )
143 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
144 numberOfThreads *
145 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 146 )
a1763c54 147 expect(queuePool.info.backPressure).toBe(false)
1f0766e7 148 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
149 await Promise.all(promises)
150 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
151 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
152 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
153 numberOfThreads * maxMultiplier
154 )
4e377863 155 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 156 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
157 expect(workerNode.usage.tasks.maxQueued).toBe(
158 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
159 )
1f0766e7
JB
160 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
161 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
162 numberOfThreads * maxMultiplier
163 )
594bfb84 164 }
1f0766e7
JB
165 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
166 expect(queuePool.info.backPressure).toBe(false)
167 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
168 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
169 numberOfThreads * maxMultiplier
170 )
594bfb84
JB
171 })
172
106744f7 173 it('Verify that is possible to have a worker that return undefined', async () => {
174 const result = await emptyPool.execute()
6db75ad9 175 expect(result).toBeUndefined()
106744f7 176 })
177
178 it('Verify that data are sent to the worker correctly', async () => {
179 const data = { f: 10 }
180 const result = await echoPool.execute(data)
e1ffb94f 181 expect(result).toStrictEqual(data)
106744f7 182 })
183
9ea61037 184 it('Verify that transferable objects are sent to the worker correctly', async () => {
9ea61037
JB
185 let error
186 let result
187 try {
f05314ff
JB
188 result = await pool.execute(undefined, undefined, [
189 new ArrayBuffer(16),
190 new MessageChannel().port1
191 ])
9ea61037
JB
192 } catch (e) {
193 error = e
194 }
195 expect(result).toStrictEqual({ ok: 1 })
196 expect(error).toBeUndefined()
f05314ff
JB
197 try {
198 result = await pool.execute(undefined, undefined, [
199 new SharedArrayBuffer(16)
200 ])
201 } catch (e) {
202 error = e
203 }
204 expect(result).toStrictEqual({ ok: 1 })
205 expect(error).toStrictEqual(
206 new TypeError('Found invalid object in transferList')
207 )
9ea61037
JB
208 })
209
7c0ba920 210 it('Verify that error handling is working properly:sync', async () => {
106744f7 211 const data = { f: 10 }
c726f66c 212 expect(errorPool.emitter.eventNames()).toStrictEqual([])
d46660cd 213 let taskError
041dc05b 214 errorPool.emitter.on(PoolEvents.taskError, e => {
d46660cd
JB
215 taskError = e
216 })
c726f66c 217 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
106744f7 218 let inError
219 try {
220 await errorPool.execute(data)
221 } catch (e) {
222 inError = e
223 }
7c0ba920
JB
224 expect(inError).toBeDefined()
225 expect(inError).toBeInstanceOf(Error)
226 expect(inError.message).toBeDefined()
8620fb25 227 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
228 expect(inError.message).toBe('Error Message from ThreadWorker')
229 expect(taskError).toStrictEqual({
6cd5248f 230 name: DEFAULT_TASK_NAME,
985d0e79
JB
231 message: new Error('Error Message from ThreadWorker'),
232 data
233 })
18482cec
JB
234 expect(
235 errorPool.workerNodes.some(
041dc05b 236 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
237 )
238 ).toBe(true)
7c0ba920
JB
239 })
240
241 it('Verify that error handling is working properly:async', async () => {
242 const data = { f: 10 }
c726f66c 243 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
4f0b85b3 244 let taskError
041dc05b 245 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
4f0b85b3
JB
246 taskError = e
247 })
c726f66c
JB
248 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
249 PoolEvents.taskError
250 ])
7c0ba920
JB
251 let inError
252 try {
253 await asyncErrorPool.execute(data)
254 } catch (e) {
255 inError = e
256 }
257 expect(inError).toBeDefined()
258 expect(inError).toBeInstanceOf(Error)
259 expect(inError.message).toBeDefined()
8620fb25 260 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
261 expect(inError.message).toBe('Error Message from ThreadWorker:async')
262 expect(taskError).toStrictEqual({
6cd5248f 263 name: DEFAULT_TASK_NAME,
985d0e79
JB
264 message: new Error('Error Message from ThreadWorker:async'),
265 data
266 })
18482cec
JB
267 expect(
268 asyncErrorPool.workerNodes.some(
041dc05b 269 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
270 )
271 ).toBe(true)
106744f7 272 })
273
7784f548 274 it('Verify that async function is working properly', async () => {
275 const data = { f: 10 }
15e5141f 276 const startTime = performance.now()
7784f548 277 const result = await asyncPool.execute(data)
15e5141f 278 const usedTime = performance.now() - startTime
e1ffb94f 279 expect(result).toStrictEqual(data)
32d490eb 280 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 281 })
282
506c2a14 283 it('Shutdown test', async () => {
bac873bd 284 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
ef3891a3
JB
285 let poolDestroy = 0
286 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
1f9a5a44 287 await pool.destroy()
bdacc2d2 288 const numberOfExitEvents = await exitPromise
bb9423b7
JB
289 expect(pool.started).toBe(false)
290 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 291 expect(numberOfExitEvents).toBe(numberOfThreads)
ef3891a3 292 expect(poolDestroy).toBe(1)
506c2a14 293 })
294
90082c8c 295 it('Verify that thread pool options are checked', async () => {
f59e1027 296 const workerFilePath = './tests/worker-files/thread/testWorker.js'
0fe39c97
JB
297 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
298 expect(pool.opts.workerOptions).toBeUndefined()
299 await pool.destroy()
300 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
90082c8c
JB
301 workerOptions: {
302 env: { TEST: 'test' },
303 name: 'test'
304 }
305 })
0fe39c97 306 expect(pool.opts.workerOptions).toStrictEqual({
90082c8c
JB
307 env: { TEST: 'test' },
308 name: 'test'
309 })
0fe39c97 310 await pool.destroy()
90082c8c
JB
311 })
312
506c2a14 313 it('Should work even without opts in input', async () => {
7d27ec0d
JB
314 const workerFilePath = './tests/worker-files/thread/testWorker.js'
315 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
0fe39c97 316 const res = await pool.execute()
30b963d4 317 expect(res).toStrictEqual({ ok: 1 })
0e2503fc 318 // We need to clean up the resources after our test
0fe39c97 319 await pool.destroy()
506c2a14 320 })
8d3782fa 321
7d27ec0d
JB
322 it('Verify destroyWorkerNode()', async () => {
323 const workerFilePath = './tests/worker-files/thread/testWorker.js'
324 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
52a23942 325 const workerNodeKey = 0
7d27ec0d 326 let exitEvent = 0
52a23942 327 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
7d27ec0d
JB
328 ++exitEvent
329 })
52a23942 330 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
7d27ec0d
JB
331 expect(exitEvent).toBe(1)
332 expect(pool.workerNodes.length).toBe(numberOfThreads - 1)
333 await pool.destroy()
334 })
335
8d3782fa
JB
336 it('Verify that a pool with zero worker fails', async () => {
337 expect(
338 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
2431bdb4 339 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 340 })
506c2a14 341})