test: add tests for default values
[poolifier.git] / tests / pools / thread / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedThreadPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
6cd5248f 5const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
506c2a14 6
079de991 7describe('Fixed thread pool test suite', () => {
e1ffb94f 8 const numberOfThreads = 6
4e377863 9 const tasksConcurrency = 2
e1ffb94f
JB
10 const pool = new FixedThreadPool(
11 numberOfThreads,
12 './tests/worker-files/thread/testWorker.js',
13 {
8ebe6c30 14 errorHandler: (e) => console.error(e)
e1ffb94f
JB
15 }
16 )
594bfb84
JB
17 const queuePool = new FixedThreadPool(
18 numberOfThreads,
19 './tests/worker-files/thread/testWorker.js',
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
4e377863 23 concurrency: tasksConcurrency
594bfb84 24 },
8ebe6c30 25 errorHandler: (e) => console.error(e)
594bfb84
JB
26 }
27 )
e1ffb94f
JB
28 const emptyPool = new FixedThreadPool(
29 numberOfThreads,
30 './tests/worker-files/thread/emptyWorker.js',
73bfd59d 31 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
32 )
33 const echoPool = new FixedThreadPool(
34 numberOfThreads,
35 './tests/worker-files/thread/echoWorker.js'
36 )
37 const errorPool = new FixedThreadPool(
38 numberOfThreads,
39 './tests/worker-files/thread/errorWorker.js',
40 {
8ebe6c30 41 errorHandler: (e) => console.error(e)
e1ffb94f
JB
42 }
43 )
44 const asyncErrorPool = new FixedThreadPool(
45 numberOfThreads,
46 './tests/worker-files/thread/asyncErrorWorker.js',
47 {
8ebe6c30 48 errorHandler: (e) => console.error(e)
e1ffb94f
JB
49 }
50 )
51 const asyncPool = new FixedThreadPool(
52 numberOfThreads,
53 './tests/worker-files/thread/asyncWorker.js'
54 )
55
0e2503fc
JB
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
7c0ba920 61 await asyncErrorPool.destroy()
0e2503fc 62 await emptyPool.destroy()
594bfb84 63 await queuePool.destroy()
0e2503fc
JB
64 })
65
506c2a14 66 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9 67 let result = await pool.execute({
dbca3be9 68 function: TaskFunctions.fibonacci
6db75ad9 69 })
024daf59 70 expect(result).toBe(75025)
6db75ad9 71 result = await pool.execute({
dbca3be9 72 function: TaskFunctions.factorial
6db75ad9 73 })
70a4f5ea 74 expect(result).toBe(9.33262154439441e157)
506c2a14 75 })
76
318d4156 77 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 78 const result = await pool.execute()
30b963d4 79 expect(result).toStrictEqual({ ok: 1 })
106744f7 80 })
81
1dcbfefc 82 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 83 const pool = new FixedThreadPool(
079de991
JB
84 numberOfThreads,
85 './tests/worker-files/thread/testWorker.js',
86 {
8ebe6c30 87 errorHandler: (e) => console.error(e)
079de991
JB
88 }
89 )
90 let poolReady = 0
0fe39c97
JB
91 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
92 await waitPoolEvents(pool, PoolEvents.ready, 1)
216541b6 93 expect(poolReady).toBe(1)
0fe39c97 94 await pool.destroy()
216541b6
JB
95 })
96
94407def
JB
97 it("Verify that 'busy' event is emitted", async () => {
98 const promises = new Set()
7c0ba920 99 let poolBusy = 0
aee46736 100 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 101 for (let i = 0; i < numberOfThreads * 2; i++) {
94407def 102 promises.add(pool.execute())
7c0ba920 103 }
94407def 104 await Promise.all(promises)
14916bf9
JB
105 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
106 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
107 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
108 })
109
594bfb84 110 it('Verify that tasks queuing is working', async () => {
d3d4b67d 111 const promises = new Set()
4e377863 112 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 113 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 114 promises.add(queuePool.execute())
594bfb84 115 }
d3d4b67d 116 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 117 for (const workerNode of queuePool.workerNodes) {
5bb5be17 118 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 119 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
120 queuePool.opts.tasksQueueOptions.concurrency
121 )
f59e1027 122 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
123 expect(workerNode.usage.tasks.queued).toBe(
124 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
125 )
126 expect(workerNode.usage.tasks.maxQueued).toBe(
127 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
128 )
1f0766e7 129 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 130 }
1f0766e7 131 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
132 expect(queuePool.info.executingTasks).toBe(
133 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
134 )
6b27d407 135 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
136 numberOfThreads *
137 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
138 )
139 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
140 numberOfThreads *
141 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 142 )
a1763c54 143 expect(queuePool.info.backPressure).toBe(false)
1f0766e7 144 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
145 await Promise.all(promises)
146 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
147 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
148 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
149 numberOfThreads * maxMultiplier
150 )
4e377863 151 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 152 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
153 expect(workerNode.usage.tasks.maxQueued).toBe(
154 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
155 )
1f0766e7
JB
156 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
157 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
158 numberOfThreads * maxMultiplier
159 )
594bfb84 160 }
1f0766e7
JB
161 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
162 expect(queuePool.info.backPressure).toBe(false)
163 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
164 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
165 numberOfThreads * maxMultiplier
166 )
594bfb84
JB
167 })
168
106744f7 169 it('Verify that is possible to have a worker that return undefined', async () => {
170 const result = await emptyPool.execute()
6db75ad9 171 expect(result).toBeUndefined()
106744f7 172 })
173
174 it('Verify that data are sent to the worker correctly', async () => {
175 const data = { f: 10 }
176 const result = await echoPool.execute(data)
e1ffb94f 177 expect(result).toStrictEqual(data)
106744f7 178 })
179
9ea61037 180 it('Verify that transferable objects are sent to the worker correctly', async () => {
9ea61037
JB
181 let error
182 let result
183 try {
f05314ff
JB
184 result = await pool.execute(undefined, undefined, [
185 new ArrayBuffer(16),
186 new MessageChannel().port1
187 ])
9ea61037
JB
188 } catch (e) {
189 error = e
190 }
191 expect(result).toStrictEqual({ ok: 1 })
192 expect(error).toBeUndefined()
f05314ff
JB
193 try {
194 result = await pool.execute(undefined, undefined, [
195 new SharedArrayBuffer(16)
196 ])
197 } catch (e) {
198 error = e
199 }
200 expect(result).toStrictEqual({ ok: 1 })
201 expect(error).toStrictEqual(
202 new TypeError('Found invalid object in transferList')
203 )
9ea61037
JB
204 })
205
7c0ba920 206 it('Verify that error handling is working properly:sync', async () => {
106744f7 207 const data = { f: 10 }
d46660cd 208 let taskError
8ebe6c30 209 errorPool.emitter.on(PoolEvents.taskError, (e) => {
d46660cd
JB
210 taskError = e
211 })
106744f7 212 let inError
213 try {
214 await errorPool.execute(data)
215 } catch (e) {
216 inError = e
217 }
7c0ba920
JB
218 expect(inError).toBeDefined()
219 expect(inError).toBeInstanceOf(Error)
220 expect(inError.message).toBeDefined()
8620fb25 221 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
222 expect(inError.message).toBe('Error Message from ThreadWorker')
223 expect(taskError).toStrictEqual({
6cd5248f 224 name: DEFAULT_TASK_NAME,
985d0e79
JB
225 message: new Error('Error Message from ThreadWorker'),
226 data
227 })
18482cec
JB
228 expect(
229 errorPool.workerNodes.some(
8ebe6c30 230 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
231 )
232 ).toBe(true)
7c0ba920
JB
233 })
234
235 it('Verify that error handling is working properly:async', async () => {
236 const data = { f: 10 }
4f0b85b3 237 let taskError
8ebe6c30 238 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
4f0b85b3
JB
239 taskError = e
240 })
7c0ba920
JB
241 let inError
242 try {
243 await asyncErrorPool.execute(data)
244 } catch (e) {
245 inError = e
246 }
247 expect(inError).toBeDefined()
248 expect(inError).toBeInstanceOf(Error)
249 expect(inError.message).toBeDefined()
8620fb25 250 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
251 expect(inError.message).toBe('Error Message from ThreadWorker:async')
252 expect(taskError).toStrictEqual({
6cd5248f 253 name: DEFAULT_TASK_NAME,
985d0e79
JB
254 message: new Error('Error Message from ThreadWorker:async'),
255 data
256 })
18482cec
JB
257 expect(
258 asyncErrorPool.workerNodes.some(
8ebe6c30 259 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
260 )
261 ).toBe(true)
106744f7 262 })
263
7784f548 264 it('Verify that async function is working properly', async () => {
265 const data = { f: 10 }
15e5141f 266 const startTime = performance.now()
7784f548 267 const result = await asyncPool.execute(data)
15e5141f 268 const usedTime = performance.now() - startTime
e1ffb94f 269 expect(result).toStrictEqual(data)
32d490eb 270 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 271 })
272
506c2a14 273 it('Shutdown test', async () => {
bac873bd 274 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
ef3891a3
JB
275 let poolDestroy = 0
276 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
1f9a5a44 277 await pool.destroy()
bdacc2d2 278 const numberOfExitEvents = await exitPromise
bb9423b7
JB
279 expect(pool.started).toBe(false)
280 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 281 expect(numberOfExitEvents).toBe(numberOfThreads)
ef3891a3 282 expect(poolDestroy).toBe(1)
506c2a14 283 })
284
90082c8c 285 it('Verify that thread pool options are checked', async () => {
f59e1027 286 const workerFilePath = './tests/worker-files/thread/testWorker.js'
0fe39c97
JB
287 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
288 expect(pool.opts.workerOptions).toBeUndefined()
289 await pool.destroy()
290 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
90082c8c
JB
291 workerOptions: {
292 env: { TEST: 'test' },
293 name: 'test'
294 }
295 })
0fe39c97 296 expect(pool.opts.workerOptions).toStrictEqual({
90082c8c
JB
297 env: { TEST: 'test' },
298 name: 'test'
299 })
0fe39c97 300 await pool.destroy()
90082c8c
JB
301 })
302
506c2a14 303 it('Should work even without opts in input', async () => {
0fe39c97 304 const pool = new FixedThreadPool(
e1ffb94f 305 numberOfThreads,
76b1e974
S
306 './tests/worker-files/thread/testWorker.js'
307 )
0fe39c97 308 const res = await pool.execute()
30b963d4 309 expect(res).toStrictEqual({ ok: 1 })
0e2503fc 310 // We need to clean up the resources after our test
0fe39c97 311 await pool.destroy()
506c2a14 312 })
8d3782fa
JB
313
314 it('Verify that a pool with zero worker fails', async () => {
315 expect(
316 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
2431bdb4 317 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 318 })
506c2a14 319})