fix: fix back pressure detection
[poolifier.git] / tests / pools / thread / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedThreadPool, PoolEvents } = require('../../../lib')
dbca3be9 3const { TaskFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
506c2a14 5
079de991 6describe('Fixed thread pool test suite', () => {
e1ffb94f 7 const numberOfThreads = 6
4e377863 8 const tasksConcurrency = 2
e1ffb94f
JB
9 const pool = new FixedThreadPool(
10 numberOfThreads,
11 './tests/worker-files/thread/testWorker.js',
12 {
8ebe6c30 13 errorHandler: (e) => console.error(e)
e1ffb94f
JB
14 }
15 )
594bfb84
JB
16 const queuePool = new FixedThreadPool(
17 numberOfThreads,
18 './tests/worker-files/thread/testWorker.js',
19 {
20 enableTasksQueue: true,
21 tasksQueueOptions: {
4e377863 22 concurrency: tasksConcurrency
594bfb84 23 },
8ebe6c30 24 errorHandler: (e) => console.error(e)
594bfb84
JB
25 }
26 )
e1ffb94f
JB
27 const emptyPool = new FixedThreadPool(
28 numberOfThreads,
29 './tests/worker-files/thread/emptyWorker.js',
73bfd59d 30 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
31 )
32 const echoPool = new FixedThreadPool(
33 numberOfThreads,
34 './tests/worker-files/thread/echoWorker.js'
35 )
36 const errorPool = new FixedThreadPool(
37 numberOfThreads,
38 './tests/worker-files/thread/errorWorker.js',
39 {
8ebe6c30 40 errorHandler: (e) => console.error(e)
e1ffb94f
JB
41 }
42 )
43 const asyncErrorPool = new FixedThreadPool(
44 numberOfThreads,
45 './tests/worker-files/thread/asyncErrorWorker.js',
46 {
8ebe6c30 47 errorHandler: (e) => console.error(e)
e1ffb94f
JB
48 }
49 )
50 const asyncPool = new FixedThreadPool(
51 numberOfThreads,
52 './tests/worker-files/thread/asyncWorker.js'
53 )
54
0e2503fc
JB
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool.destroy()
58 await asyncPool.destroy()
59 await errorPool.destroy()
7c0ba920 60 await asyncErrorPool.destroy()
0e2503fc 61 await emptyPool.destroy()
594bfb84 62 await queuePool.destroy()
0e2503fc
JB
63 })
64
506c2a14 65 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9 66 let result = await pool.execute({
dbca3be9 67 function: TaskFunctions.fibonacci
6db75ad9 68 })
024daf59 69 expect(result).toBe(75025)
6db75ad9 70 result = await pool.execute({
dbca3be9 71 function: TaskFunctions.factorial
6db75ad9 72 })
70a4f5ea 73 expect(result).toBe(9.33262154439441e157)
506c2a14 74 })
75
318d4156 76 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 77 const result = await pool.execute()
30b963d4 78 expect(result).toStrictEqual({ ok: 1 })
106744f7 79 })
80
1dcbfefc 81 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
82 const pool1 = new FixedThreadPool(
83 numberOfThreads,
84 './tests/worker-files/thread/testWorker.js',
85 {
8ebe6c30 86 errorHandler: (e) => console.error(e)
079de991
JB
87 }
88 )
89 let poolReady = 0
66293e7d 90 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
1dcbfefc 91 await waitPoolEvents(pool1, 'ready', 1)
216541b6
JB
92 expect(poolReady).toBe(1)
93 })
94
1dcbfefc 95 it("Verify that 'busy' event is emitted", () => {
7c0ba920 96 let poolBusy = 0
aee46736 97 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 98 for (let i = 0; i < numberOfThreads * 2; i++) {
8cbb82eb 99 pool.execute()
7c0ba920 100 }
14916bf9
JB
101 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
102 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
103 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
104 })
105
594bfb84 106 it('Verify that tasks queuing is working', async () => {
d3d4b67d 107 const promises = new Set()
4e377863 108 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 109 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 110 promises.add(queuePool.execute())
594bfb84 111 }
d3d4b67d 112 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 113 for (const workerNode of queuePool.workerNodes) {
5bb5be17 114 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 115 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
116 queuePool.opts.tasksQueueOptions.concurrency
117 )
f59e1027 118 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
119 expect(workerNode.usage.tasks.queued).toBe(
120 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
121 )
122 expect(workerNode.usage.tasks.maxQueued).toBe(
123 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
124 )
594bfb84 125 }
4e377863
JB
126 expect(queuePool.info.executingTasks).toBe(
127 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
128 )
6b27d407 129 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
130 numberOfThreads *
131 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
132 )
133 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
134 numberOfThreads *
135 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 136 )
a1763c54 137 expect(queuePool.info.backPressure).toBe(false)
594bfb84
JB
138 await Promise.all(promises)
139 for (const workerNode of queuePool.workerNodes) {
f59e1027 140 expect(workerNode.usage.tasks.executing).toBe(0)
4e377863 141 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 142 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
143 expect(workerNode.usage.tasks.maxQueued).toBe(
144 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
145 )
594bfb84
JB
146 }
147 })
148
106744f7 149 it('Verify that is possible to have a worker that return undefined', async () => {
150 const result = await emptyPool.execute()
6db75ad9 151 expect(result).toBeUndefined()
106744f7 152 })
153
154 it('Verify that data are sent to the worker correctly', async () => {
155 const data = { f: 10 }
156 const result = await echoPool.execute(data)
e1ffb94f 157 expect(result).toStrictEqual(data)
106744f7 158 })
159
9ea61037 160 it('Verify that transferable objects are sent to the worker correctly', async () => {
9ea61037
JB
161 let error
162 let result
163 try {
f05314ff
JB
164 result = await pool.execute(undefined, undefined, [
165 new ArrayBuffer(16),
166 new MessageChannel().port1
167 ])
9ea61037
JB
168 } catch (e) {
169 error = e
170 }
171 expect(result).toStrictEqual({ ok: 1 })
172 expect(error).toBeUndefined()
f05314ff
JB
173 try {
174 result = await pool.execute(undefined, undefined, [
175 new SharedArrayBuffer(16)
176 ])
177 } catch (e) {
178 error = e
179 }
180 expect(result).toStrictEqual({ ok: 1 })
181 expect(error).toStrictEqual(
182 new TypeError('Found invalid object in transferList')
183 )
9ea61037
JB
184 })
185
7c0ba920 186 it('Verify that error handling is working properly:sync', async () => {
106744f7 187 const data = { f: 10 }
d46660cd 188 let taskError
8ebe6c30 189 errorPool.emitter.on(PoolEvents.taskError, (e) => {
d46660cd
JB
190 taskError = e
191 })
106744f7 192 let inError
193 try {
194 await errorPool.execute(data)
195 } catch (e) {
196 inError = e
197 }
7c0ba920
JB
198 expect(inError).toBeDefined()
199 expect(inError).toBeInstanceOf(Error)
200 expect(inError.message).toBeDefined()
8620fb25 201 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
202 expect(inError.message).toBe('Error Message from ThreadWorker')
203 expect(taskError).toStrictEqual({
ff128cc9 204 name: 'default',
985d0e79
JB
205 message: new Error('Error Message from ThreadWorker'),
206 data
207 })
18482cec
JB
208 expect(
209 errorPool.workerNodes.some(
8ebe6c30 210 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
211 )
212 ).toBe(true)
7c0ba920
JB
213 })
214
215 it('Verify that error handling is working properly:async', async () => {
216 const data = { f: 10 }
4f0b85b3 217 let taskError
8ebe6c30 218 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
4f0b85b3
JB
219 taskError = e
220 })
7c0ba920
JB
221 let inError
222 try {
223 await asyncErrorPool.execute(data)
224 } catch (e) {
225 inError = e
226 }
227 expect(inError).toBeDefined()
228 expect(inError).toBeInstanceOf(Error)
229 expect(inError.message).toBeDefined()
8620fb25 230 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
231 expect(inError.message).toBe('Error Message from ThreadWorker:async')
232 expect(taskError).toStrictEqual({
ff128cc9 233 name: 'default',
985d0e79
JB
234 message: new Error('Error Message from ThreadWorker:async'),
235 data
236 })
18482cec
JB
237 expect(
238 asyncErrorPool.workerNodes.some(
8ebe6c30 239 (workerNode) => workerNode.usage.tasks.failed === 1
18482cec
JB
240 )
241 ).toBe(true)
106744f7 242 })
243
7784f548 244 it('Verify that async function is working properly', async () => {
245 const data = { f: 10 }
15e5141f 246 const startTime = performance.now()
7784f548 247 const result = await asyncPool.execute(data)
15e5141f 248 const usedTime = performance.now() - startTime
e1ffb94f 249 expect(result).toStrictEqual(data)
32d490eb 250 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 251 })
252
506c2a14 253 it('Shutdown test', async () => {
bac873bd 254 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
ef3891a3
JB
255 let poolDestroy = 0
256 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
1f9a5a44 257 await pool.destroy()
bdacc2d2
JB
258 const numberOfExitEvents = await exitPromise
259 expect(numberOfExitEvents).toBe(numberOfThreads)
ef3891a3 260 expect(poolDestroy).toBe(1)
506c2a14 261 })
262
90082c8c 263 it('Verify that thread pool options are checked', async () => {
f59e1027 264 const workerFilePath = './tests/worker-files/thread/testWorker.js'
90082c8c
JB
265 let pool1 = new FixedThreadPool(numberOfThreads, workerFilePath)
266 expect(pool1.opts.workerOptions).toBeUndefined()
267 await pool1.destroy()
268 pool1 = new FixedThreadPool(numberOfThreads, workerFilePath, {
269 workerOptions: {
270 env: { TEST: 'test' },
271 name: 'test'
272 }
273 })
274 expect(pool1.opts.workerOptions).toStrictEqual({
275 env: { TEST: 'test' },
276 name: 'test'
277 })
278 await pool1.destroy()
279 })
280
506c2a14 281 it('Should work even without opts in input', async () => {
76b1e974 282 const pool1 = new FixedThreadPool(
e1ffb94f 283 numberOfThreads,
76b1e974
S
284 './tests/worker-files/thread/testWorker.js'
285 )
6db75ad9 286 const res = await pool1.execute()
30b963d4 287 expect(res).toStrictEqual({ ok: 1 })
0e2503fc
JB
288 // We need to clean up the resources after our test
289 await pool1.destroy()
506c2a14 290 })
8d3782fa
JB
291
292 it('Verify that a pool with zero worker fails', async () => {
293 expect(
294 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
2431bdb4 295 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 296 })
506c2a14 297})