chore: v2.6.10
[poolifier.git] / tests / pools / thread / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedThreadPool, PoolEvents } = require('../../../lib')
2d2e32c2 3const { WorkerFunctions } = require('../../test-types')
216541b6 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
506c2a14 5
216541b6 6describe('Fixed thread pool test suite', async () => {
e1ffb94f
JB
7 const numberOfThreads = 6
8 const pool = new FixedThreadPool(
9 numberOfThreads,
10 './tests/worker-files/thread/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
216541b6
JB
15 let poolReady = 0
16 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
17 await waitPoolEvents(pool, PoolEvents.ready, 1)
594bfb84
JB
18 const queuePool = new FixedThreadPool(
19 numberOfThreads,
20 './tests/worker-files/thread/testWorker.js',
21 {
22 enableTasksQueue: true,
23 tasksQueueOptions: {
24 concurrency: 2
25 },
26 errorHandler: e => console.error(e)
27 }
28 )
e1ffb94f
JB
29 const emptyPool = new FixedThreadPool(
30 numberOfThreads,
31 './tests/worker-files/thread/emptyWorker.js',
32 { exitHandler: () => console.log('empty pool worker exited') }
33 )
34 const echoPool = new FixedThreadPool(
35 numberOfThreads,
36 './tests/worker-files/thread/echoWorker.js'
37 )
38 const errorPool = new FixedThreadPool(
39 numberOfThreads,
40 './tests/worker-files/thread/errorWorker.js',
41 {
42 errorHandler: e => console.error(e)
43 }
44 )
45 const asyncErrorPool = new FixedThreadPool(
46 numberOfThreads,
47 './tests/worker-files/thread/asyncErrorWorker.js',
48 {
49 errorHandler: e => console.error(e)
50 }
51 )
52 const asyncPool = new FixedThreadPool(
53 numberOfThreads,
54 './tests/worker-files/thread/asyncWorker.js'
55 )
56
0e2503fc
JB
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool.destroy()
60 await asyncPool.destroy()
61 await errorPool.destroy()
7c0ba920 62 await asyncErrorPool.destroy()
0e2503fc 63 await emptyPool.destroy()
594bfb84 64 await queuePool.destroy()
0e2503fc
JB
65 })
66
506c2a14 67 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9
JB
68 let result = await pool.execute({
69 function: WorkerFunctions.fibonacci
70 })
024daf59 71 expect(result).toBe(75025)
6db75ad9
JB
72 result = await pool.execute({
73 function: WorkerFunctions.factorial
74 })
70a4f5ea 75 expect(result).toBe(9.33262154439441e157)
506c2a14 76 })
77
318d4156 78 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 79 const result = await pool.execute()
30b963d4 80 expect(result).toStrictEqual({ ok: 1 })
106744f7 81 })
82
216541b6
JB
83 it("Verify that 'ready' event is emitted", async () => {
84 expect(poolReady).toBe(1)
85 })
86
aee46736 87 it("Verify that 'busy' event is emitted", async () => {
7c0ba920 88 let poolBusy = 0
aee46736 89 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 90 for (let i = 0; i < numberOfThreads * 2; i++) {
8cbb82eb 91 pool.execute()
7c0ba920 92 }
14916bf9
JB
93 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
94 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
95 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
96 })
97
594bfb84 98 it('Verify that tasks queuing is working', async () => {
d3d4b67d 99 const promises = new Set()
ee9f5295 100 const maxMultiplier = 2
594bfb84 101 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 102 promises.add(queuePool.execute())
594bfb84 103 }
d3d4b67d 104 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 105 for (const workerNode of queuePool.workerNodes) {
f59e1027 106 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
107 queuePool.opts.tasksQueueOptions.concurrency
108 )
f59e1027
JB
109 expect(workerNode.usage.tasks.executed).toBe(0)
110 expect(workerNode.usage.tasks.queued).toBeGreaterThan(0)
111 expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0)
594bfb84 112 }
a4e07f72 113 expect(queuePool.info.executingTasks).toBe(numberOfThreads)
6b27d407
JB
114 expect(queuePool.info.queuedTasks).toBe(
115 numberOfThreads * maxMultiplier - numberOfThreads
116 )
117 expect(queuePool.info.maxQueuedTasks).toBe(
d3d4b67d
JB
118 numberOfThreads * maxMultiplier - numberOfThreads
119 )
594bfb84
JB
120 await Promise.all(promises)
121 for (const workerNode of queuePool.workerNodes) {
f59e1027
JB
122 expect(workerNode.usage.tasks.executing).toBe(0)
123 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
124 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
125 expect(workerNode.usage.tasks.queued).toBe(0)
126 expect(workerNode.usage.tasks.maxQueued).toBe(1)
594bfb84
JB
127 }
128 })
129
106744f7 130 it('Verify that is possible to have a worker that return undefined', async () => {
131 const result = await emptyPool.execute()
6db75ad9 132 expect(result).toBeUndefined()
106744f7 133 })
134
135 it('Verify that data are sent to the worker correctly', async () => {
136 const data = { f: 10 }
137 const result = await echoPool.execute(data)
e1ffb94f 138 expect(result).toStrictEqual(data)
106744f7 139 })
140
7c0ba920 141 it('Verify that error handling is working properly:sync', async () => {
106744f7 142 const data = { f: 10 }
d46660cd
JB
143 let taskError
144 errorPool.emitter.on(PoolEvents.taskError, e => {
145 taskError = e
146 })
106744f7 147 let inError
148 try {
149 await errorPool.execute(data)
150 } catch (e) {
151 inError = e
152 }
7c0ba920
JB
153 expect(inError).toBeDefined()
154 expect(inError).toBeInstanceOf(Error)
155 expect(inError.message).toBeDefined()
8620fb25 156 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
157 expect(inError.message).toBe('Error Message from ThreadWorker')
158 expect(taskError).toStrictEqual({
985d0e79
JB
159 message: new Error('Error Message from ThreadWorker'),
160 data
161 })
18482cec
JB
162 expect(
163 errorPool.workerNodes.some(
f59e1027 164 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
165 )
166 ).toBe(true)
7c0ba920
JB
167 })
168
169 it('Verify that error handling is working properly:async', async () => {
170 const data = { f: 10 }
4f0b85b3
JB
171 let taskError
172 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
173 taskError = e
174 })
7c0ba920
JB
175 let inError
176 try {
177 await asyncErrorPool.execute(data)
178 } catch (e) {
179 inError = e
180 }
181 expect(inError).toBeDefined()
182 expect(inError).toBeInstanceOf(Error)
183 expect(inError.message).toBeDefined()
8620fb25 184 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
185 expect(inError.message).toBe('Error Message from ThreadWorker:async')
186 expect(taskError).toStrictEqual({
985d0e79
JB
187 message: new Error('Error Message from ThreadWorker:async'),
188 data
189 })
18482cec
JB
190 expect(
191 asyncErrorPool.workerNodes.some(
f59e1027 192 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
193 )
194 ).toBe(true)
106744f7 195 })
196
7784f548 197 it('Verify that async function is working properly', async () => {
198 const data = { f: 10 }
15e5141f 199 const startTime = performance.now()
7784f548 200 const result = await asyncPool.execute(data)
15e5141f 201 const usedTime = performance.now() - startTime
e1ffb94f 202 expect(result).toStrictEqual(data)
32d490eb 203 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 204 })
205
506c2a14 206 it('Shutdown test', async () => {
bac873bd 207 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
1f9a5a44 208 await pool.destroy()
bdacc2d2
JB
209 const numberOfExitEvents = await exitPromise
210 expect(numberOfExitEvents).toBe(numberOfThreads)
506c2a14 211 })
212
90082c8c 213 it('Verify that thread pool options are checked', async () => {
f59e1027 214 const workerFilePath = './tests/worker-files/thread/testWorker.js'
90082c8c
JB
215 let pool1 = new FixedThreadPool(numberOfThreads, workerFilePath)
216 expect(pool1.opts.workerOptions).toBeUndefined()
217 await pool1.destroy()
218 pool1 = new FixedThreadPool(numberOfThreads, workerFilePath, {
219 workerOptions: {
220 env: { TEST: 'test' },
221 name: 'test'
222 }
223 })
224 expect(pool1.opts.workerOptions).toStrictEqual({
225 env: { TEST: 'test' },
226 name: 'test'
227 })
228 await pool1.destroy()
229 })
230
506c2a14 231 it('Should work even without opts in input', async () => {
76b1e974 232 const pool1 = new FixedThreadPool(
e1ffb94f 233 numberOfThreads,
76b1e974
S
234 './tests/worker-files/thread/testWorker.js'
235 )
6db75ad9 236 const res = await pool1.execute()
30b963d4 237 expect(res).toStrictEqual({ ok: 1 })
0e2503fc
JB
238 // We need to clean up the resources after our test
239 await pool1.destroy()
506c2a14 240 })
8d3782fa
JB
241
242 it('Verify that a pool with zero worker fails', async () => {
243 expect(
244 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
2431bdb4 245 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 246 })
506c2a14 247})