refactor: factor out measurement statistics requirements default
[poolifier.git] / tests / pools / thread / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedThreadPool, PoolEvents } = require('../../../lib')
2d2e32c2 3const { WorkerFunctions } = require('../../test-types')
85a3f8a7 4const TestUtils = require('../../test-utils')
506c2a14 5
a35560ba 6describe('Fixed thread pool test suite', () => {
e1ffb94f
JB
7 const numberOfThreads = 6
8 const pool = new FixedThreadPool(
9 numberOfThreads,
10 './tests/worker-files/thread/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
594bfb84
JB
15 const queuePool = new FixedThreadPool(
16 numberOfThreads,
17 './tests/worker-files/thread/testWorker.js',
18 {
19 enableTasksQueue: true,
20 tasksQueueOptions: {
21 concurrency: 2
22 },
23 errorHandler: e => console.error(e)
24 }
25 )
e1ffb94f
JB
26 const emptyPool = new FixedThreadPool(
27 numberOfThreads,
28 './tests/worker-files/thread/emptyWorker.js',
29 { exitHandler: () => console.log('empty pool worker exited') }
30 )
31 const echoPool = new FixedThreadPool(
32 numberOfThreads,
33 './tests/worker-files/thread/echoWorker.js'
34 )
35 const errorPool = new FixedThreadPool(
36 numberOfThreads,
37 './tests/worker-files/thread/errorWorker.js',
38 {
39 errorHandler: e => console.error(e)
40 }
41 )
42 const asyncErrorPool = new FixedThreadPool(
43 numberOfThreads,
44 './tests/worker-files/thread/asyncErrorWorker.js',
45 {
46 errorHandler: e => console.error(e)
47 }
48 )
49 const asyncPool = new FixedThreadPool(
50 numberOfThreads,
51 './tests/worker-files/thread/asyncWorker.js'
52 )
53
0e2503fc
JB
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool.destroy()
57 await asyncPool.destroy()
58 await errorPool.destroy()
7c0ba920 59 await asyncErrorPool.destroy()
0e2503fc 60 await emptyPool.destroy()
594bfb84 61 await queuePool.destroy()
0e2503fc
JB
62 })
63
506c2a14 64 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9
JB
65 let result = await pool.execute({
66 function: WorkerFunctions.fibonacci
67 })
024daf59 68 expect(result).toBe(75025)
6db75ad9
JB
69 result = await pool.execute({
70 function: WorkerFunctions.factorial
71 })
70a4f5ea 72 expect(result).toBe(9.33262154439441e157)
506c2a14 73 })
74
318d4156 75 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 76 const result = await pool.execute()
6db75ad9 77 expect(result).toBe(false)
106744f7 78 })
79
aee46736 80 it("Verify that 'busy' event is emitted", async () => {
7c0ba920 81 let poolBusy = 0
aee46736 82 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 83 for (let i = 0; i < numberOfThreads * 2; i++) {
8cbb82eb 84 pool.execute()
7c0ba920 85 }
14916bf9
JB
86 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
87 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
88 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
89 })
90
594bfb84 91 it('Verify that tasks queuing is working', async () => {
d3d4b67d 92 const promises = new Set()
ee9f5295 93 const maxMultiplier = 2
594bfb84 94 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 95 promises.add(queuePool.execute())
594bfb84 96 }
d3d4b67d 97 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 98 for (const workerNode of queuePool.workerNodes) {
a4e07f72 99 expect(workerNode.workerUsage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
100 queuePool.opts.tasksQueueOptions.concurrency
101 )
a4e07f72 102 expect(workerNode.workerUsage.tasks.executed).toBe(0)
9c16fb4b 103 expect(workerNode.workerUsage.tasks.queued).toBeGreaterThan(0)
df593701 104 expect(workerNode.workerUsage.tasks.maxQueued).toBeGreaterThan(0)
594bfb84 105 }
a4e07f72 106 expect(queuePool.info.executingTasks).toBe(numberOfThreads)
6b27d407
JB
107 expect(queuePool.info.queuedTasks).toBe(
108 numberOfThreads * maxMultiplier - numberOfThreads
109 )
110 expect(queuePool.info.maxQueuedTasks).toBe(
d3d4b67d
JB
111 numberOfThreads * maxMultiplier - numberOfThreads
112 )
594bfb84
JB
113 await Promise.all(promises)
114 for (const workerNode of queuePool.workerNodes) {
a4e07f72
JB
115 expect(workerNode.workerUsage.tasks.executing).toBe(0)
116 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
117 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
118 maxMultiplier
119 )
9c16fb4b 120 expect(workerNode.workerUsage.tasks.queued).toBe(0)
df593701 121 expect(workerNode.workerUsage.tasks.maxQueued).toBe(1)
594bfb84
JB
122 }
123 })
124
106744f7 125 it('Verify that is possible to have a worker that return undefined', async () => {
126 const result = await emptyPool.execute()
6db75ad9 127 expect(result).toBeUndefined()
106744f7 128 })
129
130 it('Verify that data are sent to the worker correctly', async () => {
131 const data = { f: 10 }
132 const result = await echoPool.execute(data)
e1ffb94f 133 expect(result).toStrictEqual(data)
106744f7 134 })
135
7c0ba920 136 it('Verify that error handling is working properly:sync', async () => {
106744f7 137 const data = { f: 10 }
d46660cd
JB
138 let taskError
139 errorPool.emitter.on(PoolEvents.taskError, e => {
140 taskError = e
141 })
106744f7 142 let inError
143 try {
144 await errorPool.execute(data)
145 } catch (e) {
146 inError = e
147 }
7c0ba920
JB
148 expect(inError).toBeDefined()
149 expect(inError).toBeInstanceOf(Error)
150 expect(inError.message).toBeDefined()
8620fb25 151 expect(typeof inError.message === 'string').toBe(true)
0302f8ec 152 expect(inError.message).toBe('Error Message from ThreadWorker')
d46660cd 153 expect(taskError).toStrictEqual({
82f36766
JB
154 message: new Error('Error Message from ThreadWorker'),
155 data
d46660cd 156 })
18482cec
JB
157 expect(
158 errorPool.workerNodes.some(
a4e07f72 159 workerNode => workerNode.workerUsage.tasks.failed === 1
18482cec
JB
160 )
161 ).toBe(true)
7c0ba920
JB
162 })
163
164 it('Verify that error handling is working properly:async', async () => {
165 const data = { f: 10 }
4f0b85b3
JB
166 let taskError
167 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
168 taskError = e
169 })
7c0ba920
JB
170 let inError
171 try {
172 await asyncErrorPool.execute(data)
173 } catch (e) {
174 inError = e
175 }
176 expect(inError).toBeDefined()
177 expect(inError).toBeInstanceOf(Error)
178 expect(inError.message).toBeDefined()
8620fb25 179 expect(typeof inError.message === 'string').toBe(true)
0302f8ec 180 expect(inError.message).toBe('Error Message from ThreadWorker:async')
4f0b85b3 181 expect(taskError).toStrictEqual({
82f36766
JB
182 message: new Error('Error Message from ThreadWorker:async'),
183 data
4f0b85b3 184 })
18482cec
JB
185 expect(
186 asyncErrorPool.workerNodes.some(
a4e07f72 187 workerNode => workerNode.workerUsage.tasks.failed === 1
18482cec
JB
188 )
189 ).toBe(true)
106744f7 190 })
191
7784f548 192 it('Verify that async function is working properly', async () => {
193 const data = { f: 10 }
15e5141f 194 const startTime = performance.now()
7784f548 195 const result = await asyncPool.execute(data)
15e5141f 196 const usedTime = performance.now() - startTime
e1ffb94f 197 expect(result).toStrictEqual(data)
32d490eb 198 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 199 })
200
506c2a14 201 it('Shutdown test', async () => {
2c039e43
JB
202 const exitPromise = TestUtils.waitWorkerEvents(
203 pool,
204 'exit',
205 numberOfThreads
206 )
1f9a5a44 207 await pool.destroy()
bdacc2d2
JB
208 const numberOfExitEvents = await exitPromise
209 expect(numberOfExitEvents).toBe(numberOfThreads)
506c2a14 210 })
211
90082c8c
JB
212 it('Verify that thread pool options are checked', async () => {
213 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
214 let pool1 = new FixedThreadPool(numberOfThreads, workerFilePath)
215 expect(pool1.opts.workerOptions).toBeUndefined()
216 await pool1.destroy()
217 pool1 = new FixedThreadPool(numberOfThreads, workerFilePath, {
218 workerOptions: {
219 env: { TEST: 'test' },
220 name: 'test'
221 }
222 })
223 expect(pool1.opts.workerOptions).toStrictEqual({
224 env: { TEST: 'test' },
225 name: 'test'
226 })
227 await pool1.destroy()
228 })
229
506c2a14 230 it('Should work even without opts in input', async () => {
76b1e974 231 const pool1 = new FixedThreadPool(
e1ffb94f 232 numberOfThreads,
76b1e974
S
233 './tests/worker-files/thread/testWorker.js'
234 )
6db75ad9
JB
235 const res = await pool1.execute()
236 expect(res).toBe(false)
0e2503fc
JB
237 // We need to clean up the resources after our test
238 await pool1.destroy()
506c2a14 239 })
8d3782fa
JB
240
241 it('Verify that a pool with zero worker fails', async () => {
242 expect(
243 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
d4aeae5a 244 ).toThrowError('Cannot instantiate a fixed pool with no worker')
8d3782fa 245 })
506c2a14 246})