chore: generate documentation
[poolifier.git] / tests / pools / thread / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedThreadPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
506c2a14 5
079de991 6describe('Fixed thread pool test suite', () => {
e1ffb94f 7 const numberOfThreads = 6
4e377863 8 const tasksConcurrency = 2
e1ffb94f
JB
9 const pool = new FixedThreadPool(
10 numberOfThreads,
11 './tests/worker-files/thread/testWorker.js',
12 {
8ebe6c30 13 errorHandler: (e) => console.error(e)
e1ffb94f
JB
14 }
15 )
594bfb84
JB
16 const queuePool = new FixedThreadPool(
17 numberOfThreads,
18 './tests/worker-files/thread/testWorker.js',
19 {
20 enableTasksQueue: true,
21 tasksQueueOptions: {
4e377863 22 concurrency: tasksConcurrency
594bfb84 23 },
8ebe6c30 24 errorHandler: (e) => console.error(e)
594bfb84
JB
25 }
26 )
e1ffb94f
JB
27 const emptyPool = new FixedThreadPool(
28 numberOfThreads,
29 './tests/worker-files/thread/emptyWorker.js',
73bfd59d 30 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
31 )
32 const echoPool = new FixedThreadPool(
33 numberOfThreads,
34 './tests/worker-files/thread/echoWorker.js'
35 )
36 const errorPool = new FixedThreadPool(
37 numberOfThreads,
38 './tests/worker-files/thread/errorWorker.js',
39 {
8ebe6c30 40 errorHandler: (e) => console.error(e)
e1ffb94f
JB
41 }
42 )
43 const asyncErrorPool = new FixedThreadPool(
44 numberOfThreads,
45 './tests/worker-files/thread/asyncErrorWorker.js',
46 {
8ebe6c30 47 errorHandler: (e) => console.error(e)
e1ffb94f
JB
48 }
49 )
50 const asyncPool = new FixedThreadPool(
51 numberOfThreads,
52 './tests/worker-files/thread/asyncWorker.js'
53 )
54
0e2503fc
JB
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool.destroy()
58 await asyncPool.destroy()
59 await errorPool.destroy()
7c0ba920 60 await asyncErrorPool.destroy()
0e2503fc 61 await emptyPool.destroy()
594bfb84 62 await queuePool.destroy()
0e2503fc
JB
63 })
64
506c2a14 65 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9 66 let result = await pool.execute({
dbca3be9 67 function: TaskFunctions.fibonacci
6db75ad9 68 })
024daf59 69 expect(result).toBe(75025)
6db75ad9 70 result = await pool.execute({
dbca3be9 71 function: TaskFunctions.factorial
6db75ad9 72 })
70a4f5ea 73 expect(result).toBe(9.33262154439441e157)
506c2a14 74 })
75
318d4156 76 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 77 const result = await pool.execute()
30b963d4 78 expect(result).toStrictEqual({ ok: 1 })
106744f7 79 })
80
1dcbfefc 81 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
82 const pool1 = new FixedThreadPool(
83 numberOfThreads,
84 './tests/worker-files/thread/testWorker.js',
85 {
8ebe6c30 86 errorHandler: (e) => console.error(e)
079de991
JB
87 }
88 )
89 let poolReady = 0
66293e7d 90 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
3e8611a8 91 await waitPoolEvents(pool1, PoolEvents.ready, 1)
216541b6
JB
92 expect(poolReady).toBe(1)
93 })
94
94407def
JB
95 it("Verify that 'busy' event is emitted", async () => {
96 const promises = new Set()
7c0ba920 97 let poolBusy = 0
aee46736 98 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 99 for (let i = 0; i < numberOfThreads * 2; i++) {
94407def 100 promises.add(pool.execute())
7c0ba920 101 }
94407def 102 await Promise.all(promises)
14916bf9
JB
103 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
104 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
105 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
106 })
107
594bfb84 108 it('Verify that tasks queuing is working', async () => {
d3d4b67d 109 const promises = new Set()
4e377863 110 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 111 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 112 promises.add(queuePool.execute())
594bfb84 113 }
d3d4b67d 114 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 115 for (const workerNode of queuePool.workerNodes) {
5bb5be17 116 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 117 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
118 queuePool.opts.tasksQueueOptions.concurrency
119 )
f59e1027 120 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
121 expect(workerNode.usage.tasks.queued).toBe(
122 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
123 )
124 expect(workerNode.usage.tasks.maxQueued).toBe(
125 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
126 )
1f0766e7 127 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 128 }
1f0766e7 129 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
130 expect(queuePool.info.executingTasks).toBe(
131 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
132 )
6b27d407 133 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
134 numberOfThreads *
135 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
136 )
137 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
138 numberOfThreads *
139 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 140 )
a1763c54 141 expect(queuePool.info.backPressure).toBe(false)
1f0766e7 142 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
143 await Promise.all(promises)
144 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
145 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
146 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
147 numberOfThreads * maxMultiplier
148 )
4e377863 149 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 150 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
151 expect(workerNode.usage.tasks.maxQueued).toBe(
152 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
153 )
1f0766e7
JB
154 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
155 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
156 numberOfThreads * maxMultiplier
157 )
594bfb84 158 }
1f0766e7
JB
159 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
160 expect(queuePool.info.backPressure).toBe(false)
161 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
162 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
163 numberOfThreads * maxMultiplier
164 )
594bfb84
JB
165 })
166
106744f7 167 it('Verify that is possible to have a worker that return undefined', async () => {
168 const result = await emptyPool.execute()
6db75ad9 169 expect(result).toBeUndefined()
106744f7 170 })
171
172 it('Verify that data are sent to the worker correctly', async () => {
173 const data = { f: 10 }
174 const result = await echoPool.execute(data)
e1ffb94f 175 expect(result).toStrictEqual(data)
106744f7 176 })
177
9ea61037 178 it('Verify that transferable objects are sent to the worker correctly', async () => {
9ea61037
JB
179 let error
180 let result
181 try {
f05314ff
JB
182 result = await pool.execute(undefined, undefined, [
183 new ArrayBuffer(16),
184 new MessageChannel().port1
185 ])
9ea61037
JB
186 } catch (e) {
187 error = e
188 }
189 expect(result).toStrictEqual({ ok: 1 })
190 expect(error).toBeUndefined()
f05314ff
JB
191 try {
192 result = await pool.execute(undefined, undefined, [
193 new SharedArrayBuffer(16)
194 ])
195 } catch (e) {
196 error = e
197 }
198 expect(result).toStrictEqual({ ok: 1 })
199 expect(error).toStrictEqual(
200 new TypeError('Found invalid object in transferList')
201 )
9ea61037
JB
202 })
203
7c0ba920 204 it('Verify that error handling is working properly:sync', async () => {
106744f7 205 const data = { f: 10 }
d46660cd 206 let taskError
8ebe6c30 207 errorPool.emitter.on(PoolEvents.taskError, (e) => {
d46660cd
JB
208 taskError = e
209 })
106744f7 210 let inError
211 try {
212 await errorPool.execute(data)
213 } catch (e) {
214 inError = e
215 }
7c0ba920
JB
216 expect(inError).toBeDefined()
217 expect(inError).toBeInstanceOf(Error)
218 expect(inError.message).toBeDefined()
8620fb25 219 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
220 expect(inError.message).toBe('Error Message from ThreadWorker')
221 expect(taskError).toStrictEqual({
ff128cc9 222 name: 'default',
985d0e79
JB
223 message: new Error('Error Message from ThreadWorker'),
224 data
225 })
18482cec
JB
226 expect(
227 errorPool.workerNodes.some(
8ebe6c30 228 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
229 )
230 ).toBe(true)
7c0ba920
JB
231 })
232
233 it('Verify that error handling is working properly:async', async () => {
234 const data = { f: 10 }
4f0b85b3 235 let taskError
8ebe6c30 236 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
4f0b85b3
JB
237 taskError = e
238 })
7c0ba920
JB
239 let inError
240 try {
241 await asyncErrorPool.execute(data)
242 } catch (e) {
243 inError = e
244 }
245 expect(inError).toBeDefined()
246 expect(inError).toBeInstanceOf(Error)
247 expect(inError.message).toBeDefined()
8620fb25 248 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
249 expect(inError.message).toBe('Error Message from ThreadWorker:async')
250 expect(taskError).toStrictEqual({
ff128cc9 251 name: 'default',
985d0e79
JB
252 message: new Error('Error Message from ThreadWorker:async'),
253 data
254 })
18482cec
JB
255 expect(
256 asyncErrorPool.workerNodes.some(
8ebe6c30 257 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
258 )
259 ).toBe(true)
106744f7 260 })
261
7784f548 262 it('Verify that async function is working properly', async () => {
263 const data = { f: 10 }
15e5141f 264 const startTime = performance.now()
7784f548 265 const result = await asyncPool.execute(data)
15e5141f 266 const usedTime = performance.now() - startTime
e1ffb94f 267 expect(result).toStrictEqual(data)
32d490eb 268 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 269 })
270
506c2a14 271 it('Shutdown test', async () => {
bac873bd 272 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
ef3891a3
JB
273 let poolDestroy = 0
274 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
1f9a5a44 275 await pool.destroy()
bdacc2d2
JB
276 const numberOfExitEvents = await exitPromise
277 expect(numberOfExitEvents).toBe(numberOfThreads)
ef3891a3 278 expect(poolDestroy).toBe(1)
506c2a14 279 })
280
90082c8c 281 it('Verify that thread pool options are checked', async () => {
f59e1027 282 const workerFilePath = './tests/worker-files/thread/testWorker.js'
90082c8c
JB
283 let pool1 = new FixedThreadPool(numberOfThreads, workerFilePath)
284 expect(pool1.opts.workerOptions).toBeUndefined()
285 await pool1.destroy()
286 pool1 = new FixedThreadPool(numberOfThreads, workerFilePath, {
287 workerOptions: {
288 env: { TEST: 'test' },
289 name: 'test'
290 }
291 })
292 expect(pool1.opts.workerOptions).toStrictEqual({
293 env: { TEST: 'test' },
294 name: 'test'
295 })
296 await pool1.destroy()
297 })
298
506c2a14 299 it('Should work even without opts in input', async () => {
76b1e974 300 const pool1 = new FixedThreadPool(
e1ffb94f 301 numberOfThreads,
76b1e974
S
302 './tests/worker-files/thread/testWorker.js'
303 )
6db75ad9 304 const res = await pool1.execute()
30b963d4 305 expect(res).toStrictEqual({ ok: 1 })
0e2503fc
JB
306 // We need to clean up the resources after our test
307 await pool1.destroy()
506c2a14 308 })
8d3782fa
JB
309
310 it('Verify that a pool with zero worker fails', async () => {
311 expect(
312 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
2431bdb4 313 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 314 })
506c2a14 315})