fix: fix pool readiness status at startup
[poolifier.git] / tests / pools / thread / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedThreadPool, PoolEvents } = require('../../../lib')
2d2e32c2 3const { WorkerFunctions } = require('../../test-types')
cc3ab78b 4const { waitWorkerEvents } = require('../../test-utils')
506c2a14 5
079de991 6describe('Fixed thread pool test suite', () => {
e1ffb94f
JB
7 const numberOfThreads = 6
8 const pool = new FixedThreadPool(
9 numberOfThreads,
10 './tests/worker-files/thread/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
594bfb84
JB
15 const queuePool = new FixedThreadPool(
16 numberOfThreads,
17 './tests/worker-files/thread/testWorker.js',
18 {
19 enableTasksQueue: true,
20 tasksQueueOptions: {
21 concurrency: 2
22 },
23 errorHandler: e => console.error(e)
24 }
25 )
e1ffb94f
JB
26 const emptyPool = new FixedThreadPool(
27 numberOfThreads,
28 './tests/worker-files/thread/emptyWorker.js',
29 { exitHandler: () => console.log('empty pool worker exited') }
30 )
31 const echoPool = new FixedThreadPool(
32 numberOfThreads,
33 './tests/worker-files/thread/echoWorker.js'
34 )
35 const errorPool = new FixedThreadPool(
36 numberOfThreads,
37 './tests/worker-files/thread/errorWorker.js',
38 {
39 errorHandler: e => console.error(e)
40 }
41 )
42 const asyncErrorPool = new FixedThreadPool(
43 numberOfThreads,
44 './tests/worker-files/thread/asyncErrorWorker.js',
45 {
46 errorHandler: e => console.error(e)
47 }
48 )
49 const asyncPool = new FixedThreadPool(
50 numberOfThreads,
51 './tests/worker-files/thread/asyncWorker.js'
52 )
53
0e2503fc
JB
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool.destroy()
57 await asyncPool.destroy()
58 await errorPool.destroy()
7c0ba920 59 await asyncErrorPool.destroy()
0e2503fc 60 await emptyPool.destroy()
594bfb84 61 await queuePool.destroy()
0e2503fc
JB
62 })
63
506c2a14 64 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9
JB
65 let result = await pool.execute({
66 function: WorkerFunctions.fibonacci
67 })
024daf59 68 expect(result).toBe(75025)
6db75ad9
JB
69 result = await pool.execute({
70 function: WorkerFunctions.factorial
71 })
70a4f5ea 72 expect(result).toBe(9.33262154439441e157)
506c2a14 73 })
74
318d4156 75 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 76 const result = await pool.execute()
30b963d4 77 expect(result).toStrictEqual({ ok: 1 })
106744f7 78 })
79
cc3ab78b 80 it.skip("Verify that 'ready' event is emitted", async () => {
079de991
JB
81 const pool1 = new FixedThreadPool(
82 numberOfThreads,
83 './tests/worker-files/thread/testWorker.js',
84 {
85 errorHandler: e => console.error(e)
86 }
87 )
cc3ab78b 88 let poolInfo
079de991 89 let poolReady = 0
cc3ab78b
JB
90 pool1.emitter.on(PoolEvents.ready, info => {
91 ++poolReady
92 poolInfo = info
93 })
216541b6 94 expect(poolReady).toBe(1)
cc3ab78b 95 expect(poolInfo).toBeDefined()
216541b6
JB
96 })
97
aee46736 98 it("Verify that 'busy' event is emitted", async () => {
7c0ba920 99 let poolBusy = 0
aee46736 100 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 101 for (let i = 0; i < numberOfThreads * 2; i++) {
8cbb82eb 102 pool.execute()
7c0ba920 103 }
14916bf9
JB
104 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
105 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
106 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
107 })
108
594bfb84 109 it('Verify that tasks queuing is working', async () => {
d3d4b67d 110 const promises = new Set()
ee9f5295 111 const maxMultiplier = 2
594bfb84 112 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 113 promises.add(queuePool.execute())
594bfb84 114 }
d3d4b67d 115 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 116 for (const workerNode of queuePool.workerNodes) {
f59e1027 117 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
118 queuePool.opts.tasksQueueOptions.concurrency
119 )
f59e1027
JB
120 expect(workerNode.usage.tasks.executed).toBe(0)
121 expect(workerNode.usage.tasks.queued).toBeGreaterThan(0)
122 expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0)
594bfb84 123 }
a4e07f72 124 expect(queuePool.info.executingTasks).toBe(numberOfThreads)
6b27d407
JB
125 expect(queuePool.info.queuedTasks).toBe(
126 numberOfThreads * maxMultiplier - numberOfThreads
127 )
128 expect(queuePool.info.maxQueuedTasks).toBe(
d3d4b67d
JB
129 numberOfThreads * maxMultiplier - numberOfThreads
130 )
594bfb84
JB
131 await Promise.all(promises)
132 for (const workerNode of queuePool.workerNodes) {
f59e1027
JB
133 expect(workerNode.usage.tasks.executing).toBe(0)
134 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
135 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
136 expect(workerNode.usage.tasks.queued).toBe(0)
137 expect(workerNode.usage.tasks.maxQueued).toBe(1)
594bfb84
JB
138 }
139 })
140
106744f7 141 it('Verify that is possible to have a worker that return undefined', async () => {
142 const result = await emptyPool.execute()
6db75ad9 143 expect(result).toBeUndefined()
106744f7 144 })
145
146 it('Verify that data are sent to the worker correctly', async () => {
147 const data = { f: 10 }
148 const result = await echoPool.execute(data)
e1ffb94f 149 expect(result).toStrictEqual(data)
106744f7 150 })
151
7c0ba920 152 it('Verify that error handling is working properly:sync', async () => {
106744f7 153 const data = { f: 10 }
d46660cd
JB
154 let taskError
155 errorPool.emitter.on(PoolEvents.taskError, e => {
156 taskError = e
157 })
106744f7 158 let inError
159 try {
160 await errorPool.execute(data)
161 } catch (e) {
162 inError = e
163 }
7c0ba920
JB
164 expect(inError).toBeDefined()
165 expect(inError).toBeInstanceOf(Error)
166 expect(inError.message).toBeDefined()
8620fb25 167 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
168 expect(inError.message).toBe('Error Message from ThreadWorker')
169 expect(taskError).toStrictEqual({
ff128cc9 170 name: 'default',
985d0e79
JB
171 message: new Error('Error Message from ThreadWorker'),
172 data
173 })
18482cec
JB
174 expect(
175 errorPool.workerNodes.some(
f59e1027 176 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
177 )
178 ).toBe(true)
7c0ba920
JB
179 })
180
181 it('Verify that error handling is working properly:async', async () => {
182 const data = { f: 10 }
4f0b85b3
JB
183 let taskError
184 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
185 taskError = e
186 })
7c0ba920
JB
187 let inError
188 try {
189 await asyncErrorPool.execute(data)
190 } catch (e) {
191 inError = e
192 }
193 expect(inError).toBeDefined()
194 expect(inError).toBeInstanceOf(Error)
195 expect(inError.message).toBeDefined()
8620fb25 196 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
197 expect(inError.message).toBe('Error Message from ThreadWorker:async')
198 expect(taskError).toStrictEqual({
ff128cc9 199 name: 'default',
985d0e79
JB
200 message: new Error('Error Message from ThreadWorker:async'),
201 data
202 })
18482cec
JB
203 expect(
204 asyncErrorPool.workerNodes.some(
f59e1027 205 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
206 )
207 ).toBe(true)
106744f7 208 })
209
7784f548 210 it('Verify that async function is working properly', async () => {
211 const data = { f: 10 }
15e5141f 212 const startTime = performance.now()
7784f548 213 const result = await asyncPool.execute(data)
15e5141f 214 const usedTime = performance.now() - startTime
e1ffb94f 215 expect(result).toStrictEqual(data)
32d490eb 216 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 217 })
218
506c2a14 219 it('Shutdown test', async () => {
bac873bd 220 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
1f9a5a44 221 await pool.destroy()
bdacc2d2
JB
222 const numberOfExitEvents = await exitPromise
223 expect(numberOfExitEvents).toBe(numberOfThreads)
506c2a14 224 })
225
90082c8c 226 it('Verify that thread pool options are checked', async () => {
f59e1027 227 const workerFilePath = './tests/worker-files/thread/testWorker.js'
90082c8c
JB
228 let pool1 = new FixedThreadPool(numberOfThreads, workerFilePath)
229 expect(pool1.opts.workerOptions).toBeUndefined()
230 await pool1.destroy()
231 pool1 = new FixedThreadPool(numberOfThreads, workerFilePath, {
232 workerOptions: {
233 env: { TEST: 'test' },
234 name: 'test'
235 }
236 })
237 expect(pool1.opts.workerOptions).toStrictEqual({
238 env: { TEST: 'test' },
239 name: 'test'
240 })
241 await pool1.destroy()
242 })
243
506c2a14 244 it('Should work even without opts in input', async () => {
76b1e974 245 const pool1 = new FixedThreadPool(
e1ffb94f 246 numberOfThreads,
76b1e974
S
247 './tests/worker-files/thread/testWorker.js'
248 )
6db75ad9 249 const res = await pool1.execute()
30b963d4 250 expect(res).toStrictEqual({ ok: 1 })
0e2503fc
JB
251 // We need to clean up the resources after our test
252 await pool1.destroy()
506c2a14 253 })
8d3782fa
JB
254
255 it('Verify that a pool with zero worker fails', async () => {
256 expect(
257 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
2431bdb4 258 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 259 })
506c2a14 260})