fix: fix tasks queued count computation
[poolifier.git] / tests / pools / thread / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedThreadPool, PoolEvents } = require('../../../lib')
2d2e32c2 3const { WorkerFunctions } = require('../../test-types')
85a3f8a7 4const TestUtils = require('../../test-utils')
506c2a14 5
a35560ba 6describe('Fixed thread pool test suite', () => {
e1ffb94f
JB
7 const numberOfThreads = 6
8 const pool = new FixedThreadPool(
9 numberOfThreads,
10 './tests/worker-files/thread/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
594bfb84
JB
15 const queuePool = new FixedThreadPool(
16 numberOfThreads,
17 './tests/worker-files/thread/testWorker.js',
18 {
19 enableTasksQueue: true,
20 tasksQueueOptions: {
21 concurrency: 2
22 },
23 errorHandler: e => console.error(e)
24 }
25 )
e1ffb94f
JB
26 const emptyPool = new FixedThreadPool(
27 numberOfThreads,
28 './tests/worker-files/thread/emptyWorker.js',
29 { exitHandler: () => console.log('empty pool worker exited') }
30 )
31 const echoPool = new FixedThreadPool(
32 numberOfThreads,
33 './tests/worker-files/thread/echoWorker.js'
34 )
35 const errorPool = new FixedThreadPool(
36 numberOfThreads,
37 './tests/worker-files/thread/errorWorker.js',
38 {
39 errorHandler: e => console.error(e)
40 }
41 )
42 const asyncErrorPool = new FixedThreadPool(
43 numberOfThreads,
44 './tests/worker-files/thread/asyncErrorWorker.js',
45 {
46 errorHandler: e => console.error(e)
47 }
48 )
49 const asyncPool = new FixedThreadPool(
50 numberOfThreads,
51 './tests/worker-files/thread/asyncWorker.js'
52 )
53
0e2503fc
JB
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool.destroy()
57 await asyncPool.destroy()
58 await errorPool.destroy()
7c0ba920 59 await asyncErrorPool.destroy()
0e2503fc 60 await emptyPool.destroy()
594bfb84 61 await queuePool.destroy()
0e2503fc
JB
62 })
63
506c2a14 64 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9
JB
65 let result = await pool.execute({
66 function: WorkerFunctions.fibonacci
67 })
70a4f5ea 68 expect(result).toBe(121393)
6db75ad9
JB
69 result = await pool.execute({
70 function: WorkerFunctions.factorial
71 })
70a4f5ea 72 expect(result).toBe(9.33262154439441e157)
506c2a14 73 })
74
318d4156 75 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 76 const result = await pool.execute()
6db75ad9 77 expect(result).toBe(false)
106744f7 78 })
79
aee46736 80 it("Verify that 'busy' event is emitted", async () => {
7c0ba920 81 let poolBusy = 0
aee46736 82 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 83 for (let i = 0; i < numberOfThreads * 2; i++) {
8cbb82eb 84 pool.execute()
7c0ba920 85 }
14916bf9
JB
86 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
87 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
88 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
89 })
90
594bfb84 91 it('Verify that tasks queuing is working', async () => {
d3d4b67d 92 const promises = new Set()
ee9f5295 93 const maxMultiplier = 2
594bfb84 94 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 95 promises.add(queuePool.execute())
594bfb84 96 }
d3d4b67d 97 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 98 for (const workerNode of queuePool.workerNodes) {
a4e07f72 99 expect(workerNode.workerUsage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
100 queuePool.opts.tasksQueueOptions.concurrency
101 )
a4e07f72 102 expect(workerNode.workerUsage.tasks.executed).toBe(0)
9c16fb4b 103 expect(workerNode.workerUsage.tasks.queued).toBeGreaterThan(0)
594bfb84 104 }
a4e07f72 105 expect(queuePool.info.executingTasks).toBe(numberOfThreads)
6b27d407
JB
106 expect(queuePool.info.queuedTasks).toBe(
107 numberOfThreads * maxMultiplier - numberOfThreads
108 )
109 expect(queuePool.info.maxQueuedTasks).toBe(
d3d4b67d
JB
110 numberOfThreads * maxMultiplier - numberOfThreads
111 )
594bfb84
JB
112 await Promise.all(promises)
113 for (const workerNode of queuePool.workerNodes) {
a4e07f72
JB
114 expect(workerNode.workerUsage.tasks.executing).toBe(0)
115 expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
116 expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
117 maxMultiplier
118 )
9c16fb4b 119 expect(workerNode.workerUsage.tasks.queued).toBe(0)
594bfb84
JB
120 }
121 })
122
106744f7 123 it('Verify that is possible to have a worker that return undefined', async () => {
124 const result = await emptyPool.execute()
6db75ad9 125 expect(result).toBeUndefined()
106744f7 126 })
127
128 it('Verify that data are sent to the worker correctly', async () => {
129 const data = { f: 10 }
130 const result = await echoPool.execute(data)
e1ffb94f 131 expect(result).toStrictEqual(data)
106744f7 132 })
133
7c0ba920 134 it('Verify that error handling is working properly:sync', async () => {
106744f7 135 const data = { f: 10 }
d46660cd
JB
136 let taskError
137 errorPool.emitter.on(PoolEvents.taskError, e => {
138 taskError = e
139 })
106744f7 140 let inError
141 try {
142 await errorPool.execute(data)
143 } catch (e) {
144 inError = e
145 }
7c0ba920
JB
146 expect(inError).toBeDefined()
147 expect(inError).toBeInstanceOf(Error)
148 expect(inError.message).toBeDefined()
8620fb25 149 expect(typeof inError.message === 'string').toBe(true)
0302f8ec 150 expect(inError.message).toBe('Error Message from ThreadWorker')
d46660cd 151 expect(taskError).toStrictEqual({
82f36766
JB
152 message: new Error('Error Message from ThreadWorker'),
153 data
d46660cd 154 })
18482cec
JB
155 expect(
156 errorPool.workerNodes.some(
a4e07f72 157 workerNode => workerNode.workerUsage.tasks.failed === 1
18482cec
JB
158 )
159 ).toBe(true)
7c0ba920
JB
160 })
161
162 it('Verify that error handling is working properly:async', async () => {
163 const data = { f: 10 }
4f0b85b3
JB
164 let taskError
165 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
166 taskError = e
167 })
7c0ba920
JB
168 let inError
169 try {
170 await asyncErrorPool.execute(data)
171 } catch (e) {
172 inError = e
173 }
174 expect(inError).toBeDefined()
175 expect(inError).toBeInstanceOf(Error)
176 expect(inError.message).toBeDefined()
8620fb25 177 expect(typeof inError.message === 'string').toBe(true)
0302f8ec 178 expect(inError.message).toBe('Error Message from ThreadWorker:async')
4f0b85b3 179 expect(taskError).toStrictEqual({
82f36766
JB
180 message: new Error('Error Message from ThreadWorker:async'),
181 data
4f0b85b3 182 })
18482cec
JB
183 expect(
184 asyncErrorPool.workerNodes.some(
a4e07f72 185 workerNode => workerNode.workerUsage.tasks.failed === 1
18482cec
JB
186 )
187 ).toBe(true)
106744f7 188 })
189
7784f548 190 it('Verify that async function is working properly', async () => {
191 const data = { f: 10 }
15e5141f 192 const startTime = performance.now()
7784f548 193 const result = await asyncPool.execute(data)
15e5141f 194 const usedTime = performance.now() - startTime
e1ffb94f 195 expect(result).toStrictEqual(data)
32d490eb 196 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 197 })
198
506c2a14 199 it('Shutdown test', async () => {
2c039e43
JB
200 const exitPromise = TestUtils.waitWorkerEvents(
201 pool,
202 'exit',
203 numberOfThreads
204 )
1f9a5a44 205 await pool.destroy()
bdacc2d2
JB
206 const numberOfExitEvents = await exitPromise
207 expect(numberOfExitEvents).toBe(numberOfThreads)
506c2a14 208 })
209
90082c8c
JB
210 it('Verify that thread pool options are checked', async () => {
211 const workerFilePath = './tests/worker-files/cluster/testWorker.js'
212 let pool1 = new FixedThreadPool(numberOfThreads, workerFilePath)
213 expect(pool1.opts.workerOptions).toBeUndefined()
214 await pool1.destroy()
215 pool1 = new FixedThreadPool(numberOfThreads, workerFilePath, {
216 workerOptions: {
217 env: { TEST: 'test' },
218 name: 'test'
219 }
220 })
221 expect(pool1.opts.workerOptions).toStrictEqual({
222 env: { TEST: 'test' },
223 name: 'test'
224 })
225 await pool1.destroy()
226 })
227
506c2a14 228 it('Should work even without opts in input', async () => {
76b1e974 229 const pool1 = new FixedThreadPool(
e1ffb94f 230 numberOfThreads,
76b1e974
S
231 './tests/worker-files/thread/testWorker.js'
232 )
6db75ad9
JB
233 const res = await pool1.execute()
234 expect(res).toBe(false)
0e2503fc
JB
235 // We need to clean up the resources after our test
236 await pool1.destroy()
506c2a14 237 })
8d3782fa
JB
238
239 it('Verify that a pool with zero worker fails', async () => {
240 expect(
241 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
d4aeae5a 242 ).toThrowError('Cannot instantiate a fixed pool with no worker')
8d3782fa 243 })
506c2a14 244})