build: cleanup TS configuration
[poolifier.git] / tests / pools / thread / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedThreadPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
6cd5248f 5const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
506c2a14 6
079de991 7describe('Fixed thread pool test suite', () => {
e1ffb94f 8 const numberOfThreads = 6
4e377863 9 const tasksConcurrency = 2
e1ffb94f
JB
10 const pool = new FixedThreadPool(
11 numberOfThreads,
12 './tests/worker-files/thread/testWorker.js',
13 {
8ebe6c30 14 errorHandler: (e) => console.error(e)
e1ffb94f
JB
15 }
16 )
594bfb84
JB
17 const queuePool = new FixedThreadPool(
18 numberOfThreads,
19 './tests/worker-files/thread/testWorker.js',
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
4e377863 23 concurrency: tasksConcurrency
594bfb84 24 },
8ebe6c30 25 errorHandler: (e) => console.error(e)
594bfb84
JB
26 }
27 )
e1ffb94f
JB
28 const emptyPool = new FixedThreadPool(
29 numberOfThreads,
30 './tests/worker-files/thread/emptyWorker.js',
73bfd59d 31 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
32 )
33 const echoPool = new FixedThreadPool(
34 numberOfThreads,
35 './tests/worker-files/thread/echoWorker.js'
36 )
37 const errorPool = new FixedThreadPool(
38 numberOfThreads,
39 './tests/worker-files/thread/errorWorker.js',
40 {
8ebe6c30 41 errorHandler: (e) => console.error(e)
e1ffb94f
JB
42 }
43 )
44 const asyncErrorPool = new FixedThreadPool(
45 numberOfThreads,
46 './tests/worker-files/thread/asyncErrorWorker.js',
47 {
8ebe6c30 48 errorHandler: (e) => console.error(e)
e1ffb94f
JB
49 }
50 )
51 const asyncPool = new FixedThreadPool(
52 numberOfThreads,
53 './tests/worker-files/thread/asyncWorker.js'
54 )
55
0e2503fc
JB
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
7c0ba920 61 await asyncErrorPool.destroy()
0e2503fc 62 await emptyPool.destroy()
594bfb84 63 await queuePool.destroy()
0e2503fc
JB
64 })
65
506c2a14 66 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9 67 let result = await pool.execute({
dbca3be9 68 function: TaskFunctions.fibonacci
6db75ad9 69 })
024daf59 70 expect(result).toBe(75025)
6db75ad9 71 result = await pool.execute({
dbca3be9 72 function: TaskFunctions.factorial
6db75ad9 73 })
70a4f5ea 74 expect(result).toBe(9.33262154439441e157)
506c2a14 75 })
76
318d4156 77 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 78 const result = await pool.execute()
30b963d4 79 expect(result).toStrictEqual({ ok: 1 })
106744f7 80 })
81
1dcbfefc 82 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
83 const pool1 = new FixedThreadPool(
84 numberOfThreads,
85 './tests/worker-files/thread/testWorker.js',
86 {
8ebe6c30 87 errorHandler: (e) => console.error(e)
079de991
JB
88 }
89 )
90 let poolReady = 0
66293e7d 91 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
3e8611a8 92 await waitPoolEvents(pool1, PoolEvents.ready, 1)
216541b6
JB
93 expect(poolReady).toBe(1)
94 })
95
94407def
JB
96 it("Verify that 'busy' event is emitted", async () => {
97 const promises = new Set()
7c0ba920 98 let poolBusy = 0
aee46736 99 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 100 for (let i = 0; i < numberOfThreads * 2; i++) {
94407def 101 promises.add(pool.execute())
7c0ba920 102 }
94407def 103 await Promise.all(promises)
14916bf9
JB
104 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
105 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
106 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
107 })
108
594bfb84 109 it('Verify that tasks queuing is working', async () => {
d3d4b67d 110 const promises = new Set()
4e377863 111 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 112 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 113 promises.add(queuePool.execute())
594bfb84 114 }
d3d4b67d 115 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 116 for (const workerNode of queuePool.workerNodes) {
5bb5be17 117 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 118 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
119 queuePool.opts.tasksQueueOptions.concurrency
120 )
f59e1027 121 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
122 expect(workerNode.usage.tasks.queued).toBe(
123 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
124 )
125 expect(workerNode.usage.tasks.maxQueued).toBe(
126 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
127 )
1f0766e7 128 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 129 }
1f0766e7 130 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
131 expect(queuePool.info.executingTasks).toBe(
132 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
133 )
6b27d407 134 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
135 numberOfThreads *
136 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
137 )
138 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
139 numberOfThreads *
140 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 141 )
a1763c54 142 expect(queuePool.info.backPressure).toBe(false)
1f0766e7 143 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
144 await Promise.all(promises)
145 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
146 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
147 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
148 numberOfThreads * maxMultiplier
149 )
4e377863 150 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 151 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
152 expect(workerNode.usage.tasks.maxQueued).toBe(
153 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
154 )
1f0766e7
JB
155 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
156 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
157 numberOfThreads * maxMultiplier
158 )
594bfb84 159 }
1f0766e7
JB
160 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
161 expect(queuePool.info.backPressure).toBe(false)
162 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
163 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
164 numberOfThreads * maxMultiplier
165 )
594bfb84
JB
166 })
167
106744f7 168 it('Verify that is possible to have a worker that return undefined', async () => {
169 const result = await emptyPool.execute()
6db75ad9 170 expect(result).toBeUndefined()
106744f7 171 })
172
173 it('Verify that data are sent to the worker correctly', async () => {
174 const data = { f: 10 }
175 const result = await echoPool.execute(data)
e1ffb94f 176 expect(result).toStrictEqual(data)
106744f7 177 })
178
9ea61037 179 it('Verify that transferable objects are sent to the worker correctly', async () => {
9ea61037
JB
180 let error
181 let result
182 try {
f05314ff
JB
183 result = await pool.execute(undefined, undefined, [
184 new ArrayBuffer(16),
185 new MessageChannel().port1
186 ])
9ea61037
JB
187 } catch (e) {
188 error = e
189 }
190 expect(result).toStrictEqual({ ok: 1 })
191 expect(error).toBeUndefined()
f05314ff
JB
192 try {
193 result = await pool.execute(undefined, undefined, [
194 new SharedArrayBuffer(16)
195 ])
196 } catch (e) {
197 error = e
198 }
199 expect(result).toStrictEqual({ ok: 1 })
200 expect(error).toStrictEqual(
201 new TypeError('Found invalid object in transferList')
202 )
9ea61037
JB
203 })
204
7c0ba920 205 it('Verify that error handling is working properly:sync', async () => {
106744f7 206 const data = { f: 10 }
d46660cd 207 let taskError
8ebe6c30 208 errorPool.emitter.on(PoolEvents.taskError, (e) => {
d46660cd
JB
209 taskError = e
210 })
106744f7 211 let inError
212 try {
213 await errorPool.execute(data)
214 } catch (e) {
215 inError = e
216 }
7c0ba920
JB
217 expect(inError).toBeDefined()
218 expect(inError).toBeInstanceOf(Error)
219 expect(inError.message).toBeDefined()
8620fb25 220 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
221 expect(inError.message).toBe('Error Message from ThreadWorker')
222 expect(taskError).toStrictEqual({
6cd5248f 223 name: DEFAULT_TASK_NAME,
985d0e79
JB
224 message: new Error('Error Message from ThreadWorker'),
225 data
226 })
18482cec
JB
227 expect(
228 errorPool.workerNodes.some(
8ebe6c30 229 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
230 )
231 ).toBe(true)
7c0ba920
JB
232 })
233
234 it('Verify that error handling is working properly:async', async () => {
235 const data = { f: 10 }
4f0b85b3 236 let taskError
8ebe6c30 237 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
4f0b85b3
JB
238 taskError = e
239 })
7c0ba920
JB
240 let inError
241 try {
242 await asyncErrorPool.execute(data)
243 } catch (e) {
244 inError = e
245 }
246 expect(inError).toBeDefined()
247 expect(inError).toBeInstanceOf(Error)
248 expect(inError.message).toBeDefined()
8620fb25 249 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
250 expect(inError.message).toBe('Error Message from ThreadWorker:async')
251 expect(taskError).toStrictEqual({
6cd5248f 252 name: DEFAULT_TASK_NAME,
985d0e79
JB
253 message: new Error('Error Message from ThreadWorker:async'),
254 data
255 })
18482cec
JB
256 expect(
257 asyncErrorPool.workerNodes.some(
8ebe6c30 258 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
259 )
260 ).toBe(true)
106744f7 261 })
262
7784f548 263 it('Verify that async function is working properly', async () => {
264 const data = { f: 10 }
15e5141f 265 const startTime = performance.now()
7784f548 266 const result = await asyncPool.execute(data)
15e5141f 267 const usedTime = performance.now() - startTime
e1ffb94f 268 expect(result).toStrictEqual(data)
32d490eb 269 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 270 })
271
506c2a14 272 it('Shutdown test', async () => {
bac873bd 273 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
ef3891a3
JB
274 let poolDestroy = 0
275 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
1f9a5a44 276 await pool.destroy()
bdacc2d2
JB
277 const numberOfExitEvents = await exitPromise
278 expect(numberOfExitEvents).toBe(numberOfThreads)
ef3891a3 279 expect(poolDestroy).toBe(1)
506c2a14 280 })
281
90082c8c 282 it('Verify that thread pool options are checked', async () => {
f59e1027 283 const workerFilePath = './tests/worker-files/thread/testWorker.js'
90082c8c
JB
284 let pool1 = new FixedThreadPool(numberOfThreads, workerFilePath)
285 expect(pool1.opts.workerOptions).toBeUndefined()
286 await pool1.destroy()
287 pool1 = new FixedThreadPool(numberOfThreads, workerFilePath, {
288 workerOptions: {
289 env: { TEST: 'test' },
290 name: 'test'
291 }
292 })
293 expect(pool1.opts.workerOptions).toStrictEqual({
294 env: { TEST: 'test' },
295 name: 'test'
296 })
297 await pool1.destroy()
298 })
299
506c2a14 300 it('Should work even without opts in input', async () => {
76b1e974 301 const pool1 = new FixedThreadPool(
e1ffb94f 302 numberOfThreads,
76b1e974
S
303 './tests/worker-files/thread/testWorker.js'
304 )
6db75ad9 305 const res = await pool1.execute()
30b963d4 306 expect(res).toStrictEqual({ ok: 1 })
0e2503fc
JB
307 // We need to clean up the resources after our test
308 await pool1.destroy()
506c2a14 309 })
8d3782fa
JB
310
311 it('Verify that a pool with zero worker fails', async () => {
312 expect(
313 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
2431bdb4 314 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 315 })
506c2a14 316})